kafka入門例子 for java

jopen 10年前發布 | 121K 次閱讀 消息系統 Kafka

1,生產者

    import java.util.Properties;  

    import kafka.javaapi.producer.Producer;  
    import kafka.producer.KeyedMessage;  
    import kafka.producer.ProducerConfig;  

    public class TestProducer {    

            public static void main(String[] args) {    
                Properties props = new Properties();    
                props.setProperty("metadata.broker.list","10.XX.XX.XX:9092");    
                props.setProperty("serializer.class","kafka.serializer.StringEncoder");    
                props.put("request.required.acks","1");    
                ProducerConfig config = new ProducerConfig(props);    
                Producer<String, String> producer = new Producer<String, String>(config);    
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");    
                try {    
                    int i =1;  
                    while(i < 1000){  

                        producer.send(data);    
                    }  
                } catch (Exception e) {    
                    e.printStackTrace();    
                }    
                producer.close();    
            }    
    }  
2,消費者
    import java.util.HashMap;  
    import java.util.List;    
    import java.util.Map;    
    import java.util.Properties;    

    import kafka.consumer.ConsumerConfig;    
    import kafka.consumer.ConsumerIterator;    
    import kafka.consumer.KafkaStream;    
    import kafka.javaapi.consumer.ConsumerConnector;   

    public class TestConsumer extends Thread{    
            private final ConsumerConnector consumer;    
            private final String topic;    

            public static void main(String[] args) {    
                TestConsumer consumerThread = new TestConsumer("mykafka");    
                consumerThread.start();    
            }    
            public TestConsumer(String topic) {    
                consumer =kafka.consumer.Consumer    
                        .createJavaConsumerConnector(createConsumerConfig());    
                this.topic =topic;    
            }    

        private static ConsumerConfig createConsumerConfig() {    
            Properties props = new Properties();    
            props.put("zookeeper.connect","10.XX.XX.XX:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");    
            props.put("group.id", "0");    
            props.put("zookeeper.session.timeout.ms","10000");    
            return new ConsumerConfig(props);    
        }    

        public void run(){    
            Map<String,Integer> topickMap = new HashMap<String, Integer>();    
            topickMap.put(topic, 1);    
            Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);    
            KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);    
            ConsumerIterator<byte[],byte[]> it =stream.iterator();    
            System.out.println("*********Results********");    
            while(true){    
                if(it.hasNext()){  

                    System.err.println("get data:" +new String(it.next().message()));    
                }  
                try {    
                    Thread.sleep(1000);    
                } catch (InterruptedException e) {    
                    e.printStackTrace();    
                }    
            }    
        }    
    }    

3,分別啟動生產者和消費者,在消費者輸出中看到下日志即成功
 
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).  
log4j:WARN Please initialize the log4j system properly.  
*********Results********  
get data:test-kafka


4,啟動生產者如果報錯如下:
    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).  
    log4j:WARN Please initialize the log4j system properly.  
    kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.  
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)  
        at kafka.producer.Producer.send(Producer.scala:76)  
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)  
        at ProducerTest.main(TestProducer.java:21)  

需要改動config文件夾下的server.properties中的以下兩個屬性
zookeeper.connect=localhost:2181改成zookeeper.connect=10.0.30.221:2181   
以及默認注釋掉的  
#host.name=localhost改成host.name=10.0.30.221  
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!