[Apache Kafka]Kafka運維

zrae6871 8年前發布 | 22K 次閱讀 Kafka ZooKeeper 消息系統

來自: http://www.cnblogs.com/w1991/p/5161625.html

Kafka管理工具

Kafka集群管理工具

Kafka集群管理內容包括服務器啟停、leader均衡、復制、集群鏡像、集群擴展等。

添加服務器

向Kafka集群中添加服務器時,需要分配一個唯一的broker ID給新服務器。這時添加新服務器不會自動分配數據分區。重分配工具kafka-reassign-partitions.sh用于在broker之間移動partition。Kafka將新服務器當成待遷移的partition的follower,這樣新服務器可以完全復制該partition內的所有數據。一旦復制完成,新服務器成為in-sync replica之一,之前的副本中的某一個將會刪除該partition數據。

kafka-reassign-partitions.sh有三種模式:

  • --generate:生成候選的重分配方案
  • --execute:執行重分配計劃
  • --verify:驗證最后一次--execute的結果

partition重分配工具可用于將topic從當前broker集中移到新添加的broker中。使用時指定需要移動的topic清單和目標broker的ID清單。該工具會平均地將toipc分配到新的broker中,同時將topic的副本也帶過去。

#topics-for-new-server.json
{"partitions":
    [{"topic": "kafkatopic",
     {"topic": "kafkatopic1"}],
 "version":1
}

生成topic重分配計劃文件:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-for-new-server.json --broker-list "4,5" -–generate new-topic-reassignment.json

執行topic重分配:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --execute

也可以可選地移動分區:

#partitions-reassignment.json
{"partitions":
    [{"topic": "kafkatopic",
      "partition": 1,
      "replicas": [1,2,4] }],   
      }],
  "version":1
}
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-reassignment.json --execute

重分配完成后,可以驗證一下操作:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --verify

output like this:

Status of partition reassignment: Reassignment of partition [kafkatopic,0] completed successfully Reassignment of partition [kafkatopic,1] is in progress Reassignment of partition [kafkatopic,2] completed successfully Reassignment of partition [kafkatopic1,0] completed successfully Reassignment of partition [kafkatopic1,1] completed successfully Reassignment of partition [kafkatopic1,2] is in progress</pre>

從集群中移除服務器時,需要先將該服務器(broker)上所有分區的副本移除,均勻地分配到剩下的broker中。

增加復制因子:

#increase-replication-factor.json
{"partitions":[{"topic":"kafkatopic","partition":0,"replicas":[2,3]}],
 "version":1
}
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

管理topic

創建topic,指定復制因子和分區數量:

bin/kafka-topics.sh --create --zookeeper localhost:2181/chroot --replication-factor 3 --partitions 10 --topic kafkatopic

復制因子指定每個消息保存副本的數量,復制因子為3表示有兩臺服務器失效時仍不會丟失消息。分區數量指定topic分片的數量。一個分區只能在一個單獨的服務器上。分區數量為10表示所有的數據會被除了副本服務器外的不超過10臺服務器處理。

kafka-topics.sh還可以修改topic。目前不支持減少分區數量。例如,將分區數量增加到20:

bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --partitions 20 --topic kafkatopic

刪除topic:

bin/kafka-topics.sh --delete --zookeeper localhost:2181/chroot --topic kafkatopic

添加topic配置:

bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic kafkatopic --config <key>=<value>

刪除topic配置:

bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic kafkatopic --deleteconfig <key>=<value>

列出所有topic信息。可選參數有under-replicated-partitions和unavailable-partitions:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Kafka集群鏡像

Kafka提供了將現有集群鏡像到一個新集群的工具。

如圖中所示,鏡像工具的任務是從源集群消費消息,使用內嵌的producer重新發布到目標集群。

鏡像源集群時,需要構建好目標集群,并啟動MirrorMaker:

bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

啟動MirrorMaker的最少參數有:一個或多個consumer配置文件、一個producer配置文件、基于標準Java正則表達式的白名單或黑名單。例如,鏡像名為A和B的topic使用 --whitelist 'A|B' ,鏡像所有topic使用 --whitelist '*' 。同時還需要MirrorMaker的consumer連接到源集群的ZooKeeper,producer連接到目標集群的ZooKeeper或broker.list參數。

出于高吞吐量的考慮,內嵌的異步producer使用阻塞模式,這確保了消息不會丟失producer在隊列滿了的情況下會等待所有的消息寫入目標集群。如果producer的隊列始終是滿的則說明MirrorMaker是集群鏡像的瓶頸。 --num.producers 選項可用于指定MirrorMaker中producer pool中consumer的數量來增加吞吐量, --num.streams 指定consumer線程的數量。

通常,MirrorMaker的consumer配置中的socket.buffersize和目標集群broker配置中的socket.send.buffer配置為很大的值,而且MirrorMaker的consumer配置中的fetch.size大于socket.buffersize。如果producer配置中指定了broker.list且使用了硬件負載均衡,可以配置一個producer失效時的重試次數。

Kafka還提供了檢查consumer位置的工具。該工具可查詢一個consumer group中所有consumer的位置。參數 --zkconnect 指向目標集群的ZooKeeper,參數 --topic 是可選的,不指定時將輸出指定consumer group下所有topic信息。

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group MirrorGroup --zkconnect localhost:2181 --topic kafkatopic

輸出結果

Group Topic Pid Offset logSize Lag Owner MirrorGroup kafkatopic 0 5 5 0 none MirrorGroup kafkatopic 1 3 4 1 none MirrorGroup kafkatopic 2 6 9 3 none</pre>

與其他工具集成

Kafka ecosystem tools

參考資料

Learing Apache Kafka-Second Edition

</div>

 本文由用戶 zrae6871 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!