Kafka實戰-Flume到Kafka

jopen 9年前發布 | 511K 次閱讀 Kafka 消息系統

1.概述

前面給大家介紹了整個Kafka項目的開發流程,今天給大家分享Kafka如何獲取數據源,即Kafka生產數據。下面是今天要分享的目錄:

  • 數據來源
  • Flume到Kafka
  • 數據源加載
  • 預覽
  • </ul>

    下面開始今天的分享內容。

    2.數據來源

    Kafka生產的數據,是由Flume的Sink提供的,這里我們需要用到Flume集群,通過Flume集群將Agent的日志收集分發到 Kafka(供實時計算處理)和HDFS(離線計算處理)。關于Flume集群的Agent部署,這里就不多做贅述了,不清楚的同學可以參考《高可用Hadoop平臺-Flume NG實戰圖解篇》一文中的介紹,下面給大家介紹數據來源的流程圖,如下圖所示:

     Kafka實戰-Flume到Kafka

    這里,我們使用Flume作為日志收集系統,將收集到的數據輸送到Kafka中間件,以供Storm去實時消費計算,整個流程從各個Web節點 上,通過Flume的Agent代理收集日志,然后匯總到Flume集群,在由Flume的Sink將日志輸送到Kafka集群,完成數據的生產流程。

    3.Flume到Kafka

    從圖,我們已經清楚了數據生產的流程,下面我們來看看如何實現Flume到Kafka的輸送過程,下面我用一個簡要的圖來說明,如下圖所示:

     Kafka實戰-Flume到Kafka

    這個表達了從Flume到Kafka的輸送工程,下面我們來看看如何實現這部分。

    首先,在我們完成這部分流程時,需要我們將Flume集群和Kafka集群都部署完成,在完成部署相關集群后,我們來配置Flume的Sink數據流向,配置信息如下所示:

    • 首先是配置spooldir方式,內容如下所示:
    • </ul>

      producer.sources.s.type = spooldir
      producer.sources.s.spoolDir = /home/hadoop/dir/logdfs

      • 當然,Flume的數據發送方類型也是多種類型的,有:Console、Text、HDFS、RPC等,這里我們系統所使用的是Kafka中間件來接收,配置內容如下所示:
      • </ul>

        producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
        producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
        producer.sinks.r.partition.key=0
        producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
        producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
        producer.sinks.r.request.required.acks=0
        producer.sinks.r.max.message.size=1000000
        producer.sinks.r.producer.type=sync
        producer.sinks.r.custom.encoding=UTF-8
        producer.sinks.r.custom.topic.name=test

        這樣,我們就在Flume的Sink端配置好了數據流向接受方。

        4.數據加載

        在完成配置后,接下來我們開始加載數據,首先我們在Flume的spooldir端生產日志,以供Flume去收集這些日志。然后,我們通過Kafka的KafkaOffsetMonitor監控工具,去監控數據生產的情況,下面我們開始加載。

        • 啟動ZK集群,內容如下所示:
        • </ul>

          zkServer.sh start

          注意:分別在ZK的節點上啟動。

          • 啟動Kafka集群
          • </ul>

            kafka-server-start.sh config/server.properties &

            在其他的Kafka節點輸入同樣的命令,完成啟動。

            • 啟動Kafka監控工具
            • </ul>

              java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
               com.quantifind.kafka.offsetapp.OffsetGetterWeb \
               --zk dn1:2181,dn2:2181,dn3:2181 \
               --port 8089 \
               --refresh 10.seconds \
               --retain 1.days

              • 啟動Flume集群
              • </ul>

                flume-ng agent -n producer -c conf -f flume-kafka-sink.properties -Dflume.root.logger=ERROR,console

                然后,我在/home/hadoop/dir/logdfs目錄下上傳log日志,這里我只抽取了一少部分日志進行上傳,如下圖所示,表示日志上傳成功。

                 Kafka實戰-Flume到Kafka

                5.預覽

                下面,我們通過Kafka的監控工具,來預覽我們上傳的日志記錄,有沒有在Kafka中產生消息數據,如下所示:

                • 啟動Kafka集群,為生產消息截圖預覽
                • </ul>

                   Kafka實戰-Flume到Kafka

                  • 通過Flume上傳日志,在Kafka中產生消息數據
                  • </ul>

                     Kafka實戰-Flume到Kafka

                    6.總結

                    本篇文章給大家講述了Kafka的消息產生流程,后續會在Kafka實戰系列中為大家講述Kafka的消息消費流程等一整套流程,這里只是為后續的Kafka實戰編碼打下一個基礎,讓大家先對Kafka的消息生產有個整體的認識。

                    來自:http://www.cnblogs.com/smartloli/p/4615908.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!