分布式發布訂閱消息系統Kafka JAVA客戶端代碼示例

jopen 10年前發布 | 57K 次閱讀 消息系統 Kafka

介紹

     http://kafka.apache.org
    kafka是一種高吞吐量的分布式發布訂閱消息系統
    kafka是linkedin用于日志處理的分布式消息隊列,linkedin的日志數據容量大,但對可靠性要求不高,其日志數據主要包括用戶行為(登錄、瀏覽、點擊、分享、喜歡)以及系統運行日志(CPU、內存、磁盤、網絡、系統及進程狀態)

    當前很多的消息隊列服務提供可靠交付保證,并默認是即時消費(不適合離線)。

高可靠交付對linkedin的日志不是必須的,故可通過降低可靠性來提高性能,同時通過構建分布式的集群,允許消息在系統中累積,使得kafka同時支持離線和在線日志處理

測試環境

    kafka_2.10-0.8.1.1 3個節點做的集群

    zookeeper-3.4.5 一個實例節點

代碼示例

消息生產者代碼示例

import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;

/**

  • 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
  • @author Fung / public class ProducerDemo { public static void main(String[] args) {

     Random rnd = new Random();
     int events=100;
    
     // 設置配置屬性
     Properties props = new Properties();
     props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
     props.put("serializer.class", "kafka.serializer.StringEncoder");
     // key.serializer.class默認為serializer.class
     props.put("key.serializer.class", "kafka.serializer.StringEncoder");
     // 可選配置,如果不配置,則使用默認的partitioner
     props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
     // 觸發acknowledgement機制,否則是fire and forget,可能會引起數據丟失
     // 值為0,1,-1,可以參考
     // http://kafka.apache.org/08/configuration.html
     props.put("request.required.acks", "1");
     ProducerConfig config = new ProducerConfig(props);
    
     // 創建producer
     Producer<String, String> producer = new Producer<String, String>(config);
     // 產生并發送消息
     long start=System.currentTimeMillis();
     for (long i = 0; i < events; i++) {
         long runtime = new Date().getTime();
         String ip = "192.168.2." + i;//rnd.nextInt(255);
         String msg = runtime + ",www.example.com," + ip;
         //如果topic不存在,則會自動創建,默認replication-factor為1,partitions為0
         KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                 "page_visits", ip, msg);
         producer.send(data);
     }
     System.out.println("耗時:" + (System.currentTimeMillis() - start));
     // 關閉producer
     producer.close();
    

    } }</pre>

    消息消費者代碼示例

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;

/**

  • 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
  • @author Fung / public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor;

    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {

     consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
     this.topic = a_topic;
    

    }

    public void shutdown() {

     if (consumer != null)
         consumer.shutdown();
     if (executor != null)
         executor.shutdown();
    

    }

    public void run(int numThreads) {

     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(numThreads));
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
             .createMessageStreams(topicCountMap);
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
     // now launch all the threads
     executor = Executors.newFixedThreadPool(numThreads);
    
     // now create an object to consume the messages
     //
     int threadNumber = 0;
     for (final KafkaStream stream : streams) {
         executor.submit(new ConsumerMsgTask(stream, threadNumber));
         threadNumber++;
     }
    

    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper,

         String a_groupId) {
     Properties props = new Properties();
     props.put("zookeeper.connect", a_zookeeper);
     props.put("group.id", a_groupId);
     props.put("zookeeper.session.timeout.ms", "400");
     props.put("zookeeper.sync.time.ms", "200");
     props.put("auto.commit.interval.ms", "1000");
    
     return new ConsumerConfig(props);
    

    }

    public static void main(String[] arg) {

     String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
     String zooKeeper = args[0];
     String groupId = args[1];
     String topic = args[2];
     int threads = Integer.parseInt(args[3]);
    
     ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
     demo.run(threads);
    
     try {
         Thread.sleep(10000);
     } catch (InterruptedException ie) {
    
     }
     demo.shutdown();
    

    } }</pre>

    消息處理類

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;

public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber;

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
    m_threadNumber = threadNumber;
    m_stream = stream;
}

public void run() {
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
        System.out.println("Thread " + m_threadNumber + ": "
                + new String(it.next().message()));
    System.out.println("Shutting down Thread: " + m_threadNumber);
}

}</pre>

Partitioner類示例

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class PartitionerDemo implements Partitioner { public PartitionerDemo(VerifiableProperties props) {

}

@Override
public int partition(Object obj, int numPartitions) {
    int partition = 0;
    if (obj instanceof String) {
        String key=(String)obj;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
        }
    }else{
        partition = obj.toString().length() % numPartitions;
    }

    return partition;
}

}</pre>

參考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

來自:http://my.oschina.net/cloudcoder/blog/299215

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!