Kafka Consumer開發的一些關鍵點
Kafka的consumer是以pull的形式獲取消息數據的。不同于隊列和發布-訂閱模式,kafka采用了consumer group的模式。通常的,一般采用一個consumer中的一個group對應一個業務,配合多個producer提供數據。
一. 消費過的數據無法再次消費
在user level上,一旦消費過topic里的數據,那么就無法再次用同一個groupid消費同一組數據。如果想要再次消費數據,要么換另一個groupid,要么使用鏡像:
此外,low level的api提供了一些機制去設置partion和offset。
二. offset管理
kafka會記錄offset到zk中。但是,zk client api對zk的頻繁寫入是一個低效的操作。0.8.2 kafka引入了native offset storage,將offset管理從zk移出,并且可以做到水平擴展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中。同時Kafka又在內存中維護了的三元組來維護最新的offset信息,consumer來取最新offset信息的時候直接內存里拿即可。當然,kafka允許你快速的checkpoint最新的offset信息到磁盤上。
三. stream
This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
根據官方文檔所說,stream即指的是來自一個或多個服務器上的一個或者多個partition的消息。每一個stream都對應一個單線程處理。因此,client能夠設置滿足自己需求的stream數目。總之,一個stream也許代表了多個服務器partion的消息的聚合,但是每一個 partition都只能到一個stream。
四. consumer和partition
- 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,所以consumer數不要大于partition數
- 如果consumer比partition少,一個consumer會對應于多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻
- 如果consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同
- 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化
- High-level接口中獲取不到數據的時候是會block的
負載低的情況下可以每個線程消費多個partition。但負載高的情況下,Consumer 線程數最好和Partition數量保持一致。如果還是消費不過來,應該再開 Consumer 進程,進程內線程數同樣和分區數一致。(多謝 @shadyxu 指出)
五. high-level的consumer工具
-
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
可以看到當前group offset的狀況。
-
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3個參數, [earliest | latest],表示將offset置到哪里 consumer.properties ,這里是配置文件的路徑 topic,topic名,這里是page_visits
六. SimpleConsumer
kafka的low-level接口,使用場景:
- Read a message multiple times
- Consume only a subset of the partitions in a topic in a process
- Manage transactions to make sure a message is processed once and only once
用這個接口需要注意:
- You must keep track of the offsets in your application to know where you left off consuming.
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes
使用步驟:
- Find an active Broker and find out which Broker is the leader for your topic and partition:你必須知道讀哪個topic的哪個partition
- Determine who the replica Brokers are for your topic and partition: 找到負責該partition的broker leader,從而找到存有該partition副本的那個broker
- Build the request defining what data you are interested in:自己去寫request并fetch數據
- Fetch the data
- Identify and recover from leader changes:還要注意需要識別和處理broker leader的改變
來自:https://github.com/superhj1987/kafka-sudy/blob/master/doc/kafka-consumer.md