Kafka分區機制介紹與示例

zhongfangjie 8年前發布 | 128K 次閱讀 Kafka 消息系統

來自: http://lxw1234.com/archives/2015/10/538.htm


Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾, 以”topicName_partitionIndex”的命名方式命名,該文件夾下存儲這個分區的所有消息(.log)和索引文件(.index),這 使得Kafka的吞吐率可以水平擴展。

生產者在生產數據的時候,可以為每條消息指定Key,這樣消息被發送到broker時,會根據分區規則選擇被存儲到哪一個分區中,如果分區規則設置 的合理,那么所有的消息將會被均勻的分布到不同的分區中,這樣就實現了負載均衡和水平擴展。另外,在消費者端,同一個消費組可以多線程并發的從多個分區中 同時消費數據(后續將介紹這塊)。

上面所說的分區規則,是實現了kafka.producer.Partitioner接口的類,可以自定義。比如,下面的代碼 SimplePartitioner中,將消息的key做了hashcode,然后和分區數(numPartitions)做模運算,使得每一個key都 可以分布到一個分區中

 

package com.lxw1234.kafka; 
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties; 
public class SimplePartitioner implements Partitioner {        
public SimplePartitioner (VerifiableProperties props) { }       
@Override   public int partition(Object key, int numPartitions) {     
int partition = 0;       
String k = (String)key;      
partition = Math.abs(k.hashCode()) % numPartitions;     
return partition;  
}   
}

在創建Topic時候可以使用–partitions <numPartitions>指定分區數。也可以在server.properties配置文件中配置參數num.partitions來指定默認的分區數。

但有一點需要注意,為Topic創建分區時,分區數最好是broker數量的整數倍,這樣才能是一個Topic的分區均勻的分布在整個Kafka集群中,假設我的Kafka集群由4個broker組成,以下圖為例:

 

現在創建一個topic “lxw1234”,為該topic指定4個分區,那么這4個分區將會在每個broker上各分布一個:

./kafka-topics.sh 
--create 
--zookeeper zk1:2181,zk2:2181,zk3:2181 
--replication-factor 1
--partitions 4 
--topic lxw1234

這樣所有的分區就均勻分布在集群中,如果創建topic時候指定了3個分區,那么就有一個broker上沒有該topic的分區。

帶分區規則的生產者

現在用一個生產者示例(PartitionerProducer),向Topic lxw1234中發送消息。該生產者使用的分區規則,就是上面的SimplePartitioner。從0-10一共11條消息,每條消息的key 為”key”+index,消息內容為”key”+index+”–value”+index。比如:key0–value0、key1– value1、、、key10–value10。

package com.lxw1234.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; 
public class PartitionerProducer {   
public static void main(String[] args) 
{   Properties props = new Properties();    
    props.put("serializer.class", "kafka.serializer.StringEncoder");   
    props.put("metadata.broker.list", "127.0.0.17:9091,127.0.0.17:9092,127.0.0.102:9091,127.0.0.102:9092");        
    props.put("partitioner.class", "com.lxw1234.kafka.SimplePartitioner");     Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); 
        String topic = "lxw1234";       
         for(int i=0; i<=10; i++) {         
         String k = "key" + i;      
              String v = k + "--value" + i;            
              producer.send(new KeyedMessage<String, String>(topic,k,v)); 
                  }     
                    producer.close();   
                    }
                    }

理論上來說,生產者在發送消息的時候,會按照SimplePartitioner的規則,將key0做hashcode,然后和分區數(4)做模運算,得到分區索引:

hashcode(”key0”) % 4 = 1

hashcode(”key1”) % 4 = 2

hashcode(”key2”) % 4 = 3

hashcode(”key3”) % 4 = 0

         ……

對應的消息將會被發送至相應的分區中。

統計各分區消息的消費者

下面的消費者代碼用來驗證,在消費數據時,打印出消息所在的分區及消息內容:

package com.lxw1234.kafka; 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties; 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata; 
public class MyConsumer {    
public static void main(String[] args)
 { String topic = "lxw1234";    
   ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    
    topicCountMap.put(topic, new Integer(1));   
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);       
    ConsumerIterator<byte[], byte[]> it = stream.iterator();    
   while(it.hasNext()) {          
   MessageAndMetadata<byte[], byte[]> mam = it.next();
   System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");
   private static ConsumerConfig createConsumerConfig() {  
    Properties props = new Properties();  
      props.put("group.id","group1"); 
      props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");
      props.put("zookeeper.session.timeout.ms", "400")       
      props.put("zookeeper.sync.time.ms", "200");        
      props.put("auto.commit.interval.ms", "1000");      
      props.put("auto.offset.reset", "smallest");        
      return new ConsumerConfig(props);   
      }
      }      
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();      
      topicCountMap.put(topic, new Integer(1));     
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);       
      KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);     
      ConsumerIterator<byte[], byte[]> it = stream.iterator();      
      while(it.hasNext()) 
          {           
          MessageAndMetadata<byte[], byte[]> mam = it.next();           
          System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");       }             }       
          private static ConsumerConfig createConsumerConfig() {  
          Properties props = new Properties();    
          props.put("group.id","group1");
          props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");    
          props.put("zookeeper.session.timeout.ms", "400");  
          props.put("zookeeper.sync.time.ms", "200");       
          props.put("auto.commit.interval.ms", "1000");  
          props.put("auto.offset.reset", "smallest");      
           return new ConsumerConfig(props);   
             }
             }

 

 本文由用戶 zhongfangjie 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!