Kafka分區機制介紹與示例
來自: http://lxw1234.com/archives/2015/10/538.htm
Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾, 以”topicName_partitionIndex”的命名方式命名,該文件夾下存儲這個分區的所有消息(.log)和索引文件(.index),這 使得Kafka的吞吐率可以水平擴展。
生產者在生產數據的時候,可以為每條消息指定Key,這樣消息被發送到broker時,會根據分區規則選擇被存儲到哪一個分區中,如果分區規則設置 的合理,那么所有的消息將會被均勻的分布到不同的分區中,這樣就實現了負載均衡和水平擴展。另外,在消費者端,同一個消費組可以多線程并發的從多個分區中 同時消費數據(后續將介紹這塊)。
上面所說的分區規則,是實現了kafka.producer.Partitioner接口的類,可以自定義。比如,下面的代碼 SimplePartitioner中,將消息的key做了hashcode,然后和分區數(numPartitions)做模運算,使得每一個key都 可以分布到一個分區中
package com.lxw1234.kafka; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } @Override public int partition(Object key, int numPartitions) { int partition = 0; String k = (String)key; partition = Math.abs(k.hashCode()) % numPartitions; return partition; } }
在創建Topic時候可以使用–partitions <numPartitions>指定分區數。也可以在server.properties配置文件中配置參數num.partitions來指定默認的分區數。
但有一點需要注意,為Topic創建分區時,分區數最好是broker數量的整數倍,這樣才能是一個Topic的分區均勻的分布在整個Kafka集群中,假設我的Kafka集群由4個broker組成,以下圖為例:
現在創建一個topic “lxw1234”,為該topic指定4個分區,那么這4個分區將會在每個broker上各分布一個:
./kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 1 --partitions 4 --topic lxw1234
這樣所有的分區就均勻分布在集群中,如果創建topic時候指定了3個分區,那么就有一個broker上沒有該topic的分區。
帶分區規則的生產者
現在用一個生產者示例(PartitionerProducer),向Topic lxw1234中發送消息。該生產者使用的分區規則,就是上面的SimplePartitioner。從0-10一共11條消息,每條消息的key 為”key”+index,消息內容為”key”+index+”–value”+index。比如:key0–value0、key1– value1、、、key10–value10。
package com.lxw1234.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class PartitionerProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "127.0.0.17:9091,127.0.0.17:9092,127.0.0.102:9091,127.0.0.102:9092"); props.put("partitioner.class", "com.lxw1234.kafka.SimplePartitioner"); Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); String topic = "lxw1234"; for(int i=0; i<=10; i++) { String k = "key" + i; String v = k + "--value" + i; producer.send(new KeyedMessage<String, String>(topic,k,v)); } producer.close(); } }
理論上來說,生產者在發送消息的時候,會按照SimplePartitioner的規則,將key0做hashcode,然后和分區數(4)做模運算,得到分區索引:
hashcode(”key0”) % 4 = 1
hashcode(”key1”) % 4 = 2
hashcode(”key2”) % 4 = 3
hashcode(”key3”) % 4 = 0
……
對應的消息將會被發送至相應的分區中。
統計各分區消息的消費者
下面的消費者代碼用來驗證,在消費數據時,打印出消息所在的分區及消息內容:
package com.lxw1234.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class MyConsumer { public static void main(String[] args) { String topic = "lxw1234"; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) { MessageAndMetadata<byte[], byte[]> mam = it.next(); System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] .."); private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("group.id","group1"); props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183"); props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); return new ConsumerConfig(props); } } Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) { MessageAndMetadata<byte[], byte[]> mam = it.next(); System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] .."); } } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("group.id","group1"); props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); return new ConsumerConfig(props); } }