[Apache Kafka]構建Kafka集群
現在我們構建Kafka消息訂閱發布系統。Kafka支持多種集群方式,例如:
- 單節點單broker集群
- 單節點多broker集群
- 多節點多broker集群
一個Kafka集群主要包含以下五個組件:
- topic:topic是producer發布的消息的類別。在Kafka中,topic是分區的,每個partition內是順序不可變的消息序列。Kafka集群維護每個topic的partition日志。partition內每個消息分配了一個被稱為offset的唯一序列ID。
- broker:一個Kafka集群包含一個或多個服務器,每個服務器有一個或多個運行的被稱為broker的服務進程。topic是在broker進程的上下文中創建的。
- zookeeper:Kafka使用ZooKeeper來協調broker和cunsumers。ZooKeeper允許分布式進程通過一個共享的層級數據寄存器命名空間相互協調(這些數據寄存器被稱為znode),就像文件系統一樣。它跟標準文件系統的區別是,每個znode可以擁有關聯的數據,且數據量有限制。ZooKeeper是被設計用來存儲協調數據的,如狀態信息、配置信息、位置信息等。關于ZooKeeper詳見 Hadoop Wiki 。
- producers:producer選擇topic內的合適的partition并向其發布數據。為了負載均衡,可以基于循環的方式或者自定義方法來關聯信息與topic partition。
- consumer:consumer是訂閱topics并處理發布的消息的應用或進程。
單節點單broker集群
在上篇中,我們在單臺機器上部署了Kafka,現在將其設置為單節點單broker集群。架構如圖所示:
啟動ZooKeeper服務:
Kafka提供了一個默認的簡單的ZooKeeper配置文件,可啟動本地的單ZooKeeper實例。
bin/zookeeper-server-start.sh config/zookeeper.properties &
zookeeper.properties配置文件中定義了以下的關鍵屬性:
# Data directory where the zookeeper snapshot is stored. dataDir=/tmp/zookeeper # The port listening for client request clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0
在默認的配置中,ZooKeeper監聽2181端口。關于構建多ZooKeeper服務,詳見 http://zookeeper.apache.org/ .
啟動Kafka broker
bin/kafka-server-start.sh config/server.properties &
server.properties配置文件中定義了以下的關鍵屬性:
# The id of the broker. This must be set to a unique integer for each broker. Broker.id=0 # The port the socket server listens on port=9092 # The directory under which to store log files log.dir=/tmp/kafka8-logs # The default number of log partitions per topic. num.partitions=2 # Zookeeper connection string zookeeper.connect=localhost:2181
創建一個Kafka topic
創建一個名為 kafkatopic 的topic,單partitoin且只有一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
查看topic列表:
bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動producer發送信息
Kafka提供了一個命令行producer,可接收命令行輸入并將其作為消息發布到Kafka集群。默認每一行是一個消息。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
其中,broker-list和topic這兩個參數是必須的,broker-list指定要連接的broker,格式為 node_address:port 。topic是必須的,因為需要發送消息給訂閱了該topic的consumer group。
現在可以在命令行里輸入一些信息,每一行會被作為一個消息。
默認的屬性配置在/config/producer.properties文件中:
# list of brokers used for bootstrapping knowledge about the rest of the format: host1:port1,host2:port2... metadata.broker.list=localhost:9092 # specify the compression codec for all data generated: none , gzip, snappy. compression.codec=none
啟動consumer接收消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
consumer加載完成后,會輸出剛才在producer里輸入消息。
默認的屬性配置在/config/consumer.properties文件中:
# consumer group id (A string that uniquely identifies a set of consumers within the same consumer group) group.id=test-consumer-group
在不同的終端里分別啟動zookeeper,broker,producer,consumer后,在producer終端里輸入消息,消息就會在consumer終端中顯示了。
單節點多broker集群
接下來構建一個單節點多broker集群:
啟動ZooKeeper
同上。
bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動broker
在單節點上配置多個bbroker時,需要為每個broker指定單獨的屬性配置文件,其中 broker.id 、 port 、 log.dir 這三個屬性必須時不同的。
在broker1的配置文件server-1.properties中,定義參數為:
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
在broker2的配置文件server-2.properties中,定義參數為:
broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
我們為每個broker指定了不同的端口,是因為我們在同一臺機器上搭建多個broker,實際生產環境中,broker可能分布在多臺機器上。
啟動每個broker:
bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &
創建topic
創建一個名為 replicated-kafkatopic 的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replicated-kafkatopic
啟動producer
如果一個producer連接多個broker,需要傳遞參數 broker-list :
bin/kafka-console-producer.sh --broker-list localhost:9093, localhost:9094 --topic replicated-kafkatopic
多個producer時,為每個指定 broker-list 。
啟動consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic replicated-kafkatopic
多節點多broker集群
在多節點多broker集群中,每個節點都需要安裝Kafka,且所有的broker都連接到同一個ZooKeeper。
參考資料
Learing Apache Kafka-Second Edition