Kafka+Log4j實現日志集中管理
引言
前段時間寫的《Spring+Log4j+ActiveMQ實現遠程記錄日志——實戰+分析》得到了許多同學的認可,在認可的同時,也有同學提出可以使用Kafka來集中管理日志,于是今天就來學習一下。
特別說明,由于網絡上關于Kafka+Log4j的完整例子并不多,我也是一邊學習一邊使用,因此如果有解釋得不好或者錯誤的地方,歡迎批評指正,如果你有好的想法,也歡迎留言探討。
第一部分 搭建Kafka環境
安裝Kafka
下載:http://kafka.apache.org/downloads.html
tar zxf kafka-<VERSION>.tgz cd kafka-<VERSION>
?啟動Zookeeper
啟動Zookeeper前需要配置一下config/zookeeper.properties:?
接下來啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動Kafka Server
啟動Kafka Server前需要配置一下config/server.properties。主要配置以下幾項,內容就不說了,注釋里都很詳細:
然后啟動Kafka Server:
bin/kafka-server-start.sh config/server.properties
創建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看創建的Topic
>bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動控制臺Producer,向Kafka發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message ^C
啟動控制臺Consumer,消費剛剛發送的消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message This is another message
刪除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
注:只有當delete.topic.enable=true時,該操作才有效
配置Kafka集群(單臺機器上)
首先拷貝server.properties文件為多份(這里演示4個節點的Kafka集群,因此還需要拷貝3份配置文件):
cp config/server.properties config/server1.properties cp config/server.properties config/server2.properties cp config/server.properties config/server3.properties
修改server1.properties的以下內容:
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
同理修改server2.properties和server3.properties的這些內容,并保持所有配置文件的zookeeper.connect屬性都指向運行在本機的zookeeper地址localhost:2181。注意,由于這幾個Kafka節點都將運行在同一臺機器上,因此需要保證這幾個值不同,這里以累加的方式處理。例如在server2.properties上:
broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
把server3.properties也配置好以后,依次啟動這些節點:
bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties & bin/kafka-server-start.sh config/server3.properties &
Topic & Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。
現在在Kafka集群上創建備份因子為3,分區數為4的Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka
說明:備份因子replication-factor越大,則說明集群容錯性越強,就是當集群down掉后,數據恢復的可能性越大。所有的分區數里的內容共同組成了一份數據,分區數partions越大,則該topic的消息就越分散,集群中的消息分布就越均勻。
然后使用kafka-topics.sh的--describe參數查看一下Topic為kafka的詳情:
輸出的第一行是所有分區的概要,接下來的每一行是一個分區的描述。可以看到Topic為kafka的消息,PartionCount=4,ReplicationFactor=3正是我們創建時指定的分區數和備份因子。
另外:Leader是指負責這個分區所有讀寫的節點;Replicas是指這個分區所在的所有節點(不論它是否活著);ISR是Replicas的子集,代表存有這個分區信息而且當前活著的節點。
拿partition:0這個分區來說,該分區的Leader是server0,分布在id為0,1,2這三個節點上,而且這三個節點都活著。
再來看下Kafka集群的日志:
其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此類推。
從上面的配置可知,id為?0,1,2,3?的節點分別對應server0, server1, server2, server3。而上例中的partition:0分布在id為0, 1, 2這三個節點上,因此可以在server0, server1, server2這三個節點上看到有kafka-0這個文件夾。這個kafka-0就代表Topic為kafka的partion0。
第二部分 Kafka+Log4j項目整合
先來看下Maven項目結構圖:
作為Demo,文件不多。先看看pom.xml引入了哪些jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
重要的內容是log4j.properties:
log4j.rootLogger=INFO,console # for package com.demo.kafka, log would be sent to kafka appender. log4j.logger.com.demo.kafka=DEBUG,kafka # appender kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=kafka # multiple brokers are separated by comma ",". log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=true log4j.appender.kafka.layout=org.apache.log4j.PatternLayout log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
App.java里面就很簡單啦,主要是通過log4j輸出日志:
package com.demo.kafka;
import org.apache.log4j.Logger;
public class App {
private static final Logger LOGGER = Logger.getLogger(App.class);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
LOGGER.info("Info [" + i + "]");
Thread.sleep(1000);
}
}
} MyConsumer.java用于消費kafka中的信息:
package com.demo.kafka;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class MyConsumer {
private static final String ZOOKEEPER = "localhost:2181";
//groupName可以隨意給,因為對于kafka里的每條消息,每個group都會完整的處理一遍
private static final String GROUP_NAME = "test_group";
private static final String TOPIC_NAME = "kafka";
private static final int CONSUMER_NUM = 4;
private static final int PARTITION_NUM = 4;
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zookeeper.connect", ZOOKEEPER);
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", GROUP_NAME);
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4
// threads to consume
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
consumerConnector.createMessageStreams(
ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
// consume the messages in the threads
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println(new String(msgAndMetadata.message()));
}
}
});
}
}
}
MyProducer.java用于向Kafka發送消息,但不通過log4j的appender發送。此案例中可以不要。但是我還是放在這里:
package com.demo.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class MyProducer {
private static final String TOPIC = "kafka";
private static final String CONTENT = "This is a single message";
private static final String BROKER_LIST = "localhost:9092";
private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
public static void main(String[] args) {
Properties props = new Properties();
props.put("serializer.class", SERIALIZER_CLASS);
props.put("metadata.broker.list", BROKER_LIST);
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
//Send one message.
KeyedMessage<String, String> message =
new KeyedMessage<String, String>(TOPIC, CONTENT);
producer.send(message);
//Send multiple messages.
List<KeyedMessage<String,String>> messages =
new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < 5; i++) {
messages.add(new KeyedMessage<String, String>
(TOPIC, "Multiple message at a time. " + i));
}
producer.send(messages);
}
} 到這里,代碼就結束了。
第三部分 運行與驗證
先運行MyConsumer,使其處于監聽狀態。同時,還可以啟動Kafka自帶的ConsoleConsumer來驗證是否跟MyConsumer的結果一致。最后運行App.java。
先來看看MyConsumer的輸出:
再來看看ConsoleConsumer的輸出:
可以看到,盡管發往Kafka的消息去往了不同的地方,但是內容是一樣的,而且一條也不少。最后再來看看Kafka的日志。
我們知道,Topic為kafka的消息有4個partion,從之前的截圖可知這4個partion均勻分布在4個kafka節點上,于是我對每一個partion隨機選取一個節點查看了日志內容。
上圖中黃色選中部分依次代表在server0上查看partion0,在server1上查看partion1,以此類推。
而紅色部分是日志內容,由于在創建Topic時準備將20條日志分成4個區存儲,可以很清楚的看到,這20條日志確實是很均勻的存儲在了幾個partion上。
摘一點Infoq上的話:每個日志文件都是一個log entrie序列,每個log entrie包含一個4字節整型數值(值為N+5),1個字節的"magic value",4個字節的CRC校驗碼,其后跟N個字節的消息體。每條消息都有一個當前Partition下唯一的64字節的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:
message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes
這里我們看到的日志文件的每一行,就是一個log entrie,每一行前面無法顯示的字符(藍色選中部分),就是(message length + magic value + crc)了。而log entrie的后部分,則是消息體的內容了。
問題:
1. 如果要使用此種方式,有一種場景是提取某天或者某小時的日志,那么如何設計Topic呢?是不是要在Topic上帶入日期或者小時數?還有更好的設計方案嗎?
2. 假設按每小時設計Topic,那么如何在使用諸如logger.info()這樣的方法時,自動根據時間去改變Topic呢?有類似的例子嗎?
----歡迎交流,共同進步。
樣例下載:百度網盤
鏈接: http://pan.baidu.com/s/1i400DZv 密碼: f25c
參考頁面:
http://kafka.apache.org/07/quickstart.html
http://kafka.apache.org/documentation.html#quickstart
http://www.infoq.com/cn/articles/kafka-analysis-part-1
來自:http://my.oschina.net/itblog/blog/540918