[Apache Kafka]Kafka集成

JoyceFKJ 8年前發布 | 26K 次閱讀 Kafka Hadoop 分布式/云計算/大數據

來自: http://www.cnblogs.com/w1991/p/5155202.html

Storm集成Kafka

Storm簡介

少量數據的實時處理可以使用JMS(Java Messaging Service)這類技術,但是數據量很大時便會出現性能瓶頸。而且這些方案不適合橫向擴展。

Storm 是開源的分布式實時數據處理系統。它可用于很多場景,如實時分析(real-time analytics)、在線機器學習(online machine learning)、連續計算(continuous computation)、數據抽取轉換加載(ETL:Extract Transformation Load)。

Storm中流數據處理相關的組件:

  • Spout:源源不斷的數據流。
  • Bolt:spout將數據傳遞給bolt。所有的數據處理都是由bolt完成,如數據的過濾、聚合、計算、存儲等。

我們可以將Storm看成很多bolt組成的鏈,每個bolt對spout提供的數據流進行一些處理。

  • Tuple:Storm所使用的數據結構。
  • Stream:代表一個tuple序列。
  • Workers:代表Storm process。
  • Executers:worker發起的Storm線程。worker可以運行一個或多個executer,每個executer又可以運行一個或多個job。

Storm集成Kafka

集成Storm與Kafka集群需要使用 storm-kafka spout 。它提供了一些特性,如動態發現Kafka broker、“exactly once” tuple processing。除了常規的針對Kafka的Storm spout,它還提供了針對kafka的Trident spout實現。

Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying.

兩種spout的實現都使用 BrokerHost 接口來跟蹤Kafka broker host-to-partition 映射和 KafkaConfig 參數。 BrokerHost 接口有兩個實現: ZkHosts 和 StaticHosts 。

ZkHosts 用于動態跟蹤Kafka broker host-to-partition 映射:

  • public ZkHosts(String brokerZkStr, String brokerZkPath)
  • public ZkHosts(String brokerZkStr)
    參數 brokerZkStr 可以是 localhost:9092 ;參數 brokerZkPath 是topic和partition信息存儲的根目錄,默認值是 /brokers 。

StaticHosts 用于靜態分區信息:

//localhost:9092. Uses default port as 9092.
Broker brokerPartition0 = new Broker("localhost");
//localhost:9092. Takes the port explicitly
Broker brokerPartition1 = new Broker("localhost", 9092);
//localhost:9092 specified as one string.
Broker brokerPartition2 = new Broker("localhost:9092");
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
//mapping form partition 0 to brokerPartition0
partitionInfo.addPartition(0, brokerPartition0);
//mapping form partition 1 to brokerPartition1
partitionInfo.addPartition(1, brokerPartition1);
//mapping form partition 2 to brokerPartition2
partitionInfo.addPartition(2, brokerPartition2);
StaticHosts hosts = new StaticHosts(partitionInfo);

創建StaticHosts實例時,首先要創建GlobalPartitionInformation實例,其次是KafkaConfig實例用來構造Kafka spout:

  • public KafkaConfig(BrokerHosts hosts, String topic)
  • public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
    參數 BrokerHosts 為Kafka broker列表;參數 topic 為topic名稱;參數 clientId 被用做ZooKeeper路徑的一部分,spout作為consumer在ZooKeeper中存儲當前消費的offset。

KafkaConfig 類還有一些public類型的變量,用于控制應用的行為和spout從Kafka集群獲取消息的方式:

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

Spoutconfig 類擴展了 KafkaConfig 類:

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);

參數 zkRoot 為ZooKeeper的根路徑;參數 id 為spout的唯一標識。

初始化 KafkaSpout 實例的代碼如下:

// Creating instance for BrokerHosts interface implementation
BrokerHosts hosts = new ZkHosts(brokerZkConnString);
/ Creating instance of SpoutConfig
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
// Defines how the byte[] consumed from kafka gets transformed into a storm tuple
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// Creating instance of KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Kafka spout與Storm使用同一個ZooKeeper實例,來存儲offset的狀態和記錄已經消費的segment。這些offset被存儲在ZooKeeper指定的根路徑下。kafka spout在下游故障或超時時使用這些offset來重新處理tuple。spout可以倒回到之前的offset而不是從最后保存的offset開始,Kafka根據指定的時間戳來選擇offset:

spoutConfig.forceStartOffsetTime(TIMESTAMP);

這里,值 -1 強制spout從最新的offset重啟;值 -2 強制spout從最早的offset重啟。

Hadoop集成Kafka

資源共享、穩定性、可用性、可伸縮性是分布式計算的挑戰。現如今有多了一個:TB或PB級數據的處理。

Hadoop簡介

Hadoop是個大規模分布式批處理框架,通過很多節點并行處理數據。

Hadoop基于MapReduce框架,MapReduce提供了并行分布式大規模計算借口。Hadoop有它自己的分布式文件系統HDFS(Hadoop Distributed File System)。在典型的Hadoop集群中,HDFS將數據分成很小的塊(稱為block)分布到所有的節點中,同時也會為每個block建立副本以確保有節點失效時仍能從其他節點讀取到數據。

Hadoop有以下組件:

  • Name Node:這是一個與HDFS交互的單點。name node中存儲數據block在節點中的分布信息。
  • Second Name Node:該節點存儲日志,在name node故障時使用這些日志將HDFS恢復到最后更新的狀態。
  • Data Node:這些節點存儲由name node分配的數據block,以及其他節點中數據的副本。
  • Job Tracker:負責將MapReduce job分割成更小的task。
  • Task Tracker:負責執行job tracker分割的task。

data node和task tracker共享同一臺機器,task的執行需要name node提供數據存儲位置信息。

Hadoop集群有三種:

  • 本地模式(local mode)
  • 偽分布式模式(pseudo distributed mode)
  • 完全分布式模式(fully distributed mode)

本地模式和偽分布式模式工作于單節點集群。本地模式中所有的Hadoop主要組件運行在同一個JVM實例中;而偽分布式模式中每個組件運行在一個單獨的JVM實例中。偽分布式模式主要用于開發環境。完全分布式模式中則是每個組件運行在單獨的節點中。

偽分布式集群搭建步驟如下:

  1. 安裝和配置JDK
  2. 下載 Hadoop Common
  3. 解壓縮后,bin文件夾添加到 PATH 中

    #Assuming your installation directory is /opt/Hadoop
    export HADOOP_HOME=/opt/hadoop
    export PATH=$PATH:$HADOOP HOME/bin
  4. 配置文件 etc/hadoop/core-site.xml :

    <configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
    </configuration>

    配置文件 etc/hadoop/hdfs-site.xml :

    <configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    </configuration>
  5. ssh無密碼連接到localhost:

    ssh localhost

    如果ssh不能無密碼連接localhost,執行以下命令:

    ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa 
    cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
  6. 格式化文件系統

    bin/hdfs namenode -format
  7. 啟動守護進程NameNode和DataNode:

    sbin/start-dfs.sh
  8. Hadoop集群設置完畢,所在Web瀏覽器中通過 http://localhost:50070/ 訪問NameNode。

Hadoop集成Kafka

Kafka的源碼的contrib文件夾中包含Hadoop producer和consumer的示例。

Hadoop producer

Hadoop producer提供了從Hadoop集群向Kafka發布數據的橋梁:

Kafka中topic可看作URI,連接到指定Kafka broker的URI格式為:

kafka://<kafka-broker>/<kafka-topic>

Hadoop consumer有兩種從Hadoop獲取數據的方式:

  • 使用Pig腳本和Avro格式的消息:這種方式中,Kafka producer使用Pig腳本將數據寫成二進制Avro格式,每一行表示一個消息。類 AvroKafkaStorage (Pig類 StoreFunc 的擴展)接受參數Avro Schema并連接到Kafka URI,將數據推送到Kafka集群。使用AvroKafkaStorage producer時可以很容易地在同一個Pig腳本的job中寫多個topic和broker。Pig腳本示例如下:

    REGISTER hadoop-producer_2.8.0-0.8.0.jar;
    REGISTER avro-1.4.0.jar;
    REGISTER piggybank.jar;
    REGISTER kafka-0.8.0.jar;
    REGISTER jackson-core-asl-1.5.5.jar;
    REGISTER jackson-mapper-asl-1.5.5.jar;
    REGISTER scala-library.jar;
    member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray);
    names = FOREACH member_info GENERATE name;
    STORE member_info INTO 'kafka://localhost:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
  • 使用Kafka OutputFormat類:Kafka OutputFormat類是Hadoop的OutputFormat類的擴展。這種方式消息以字節形式發布。Kafka OutputFormat類使用KafkaRecordWriter類(Hadoop類RecordWriter的擴展)將記錄(也就是消息)寫入Hadoop集群。

要想在job中配置producer的參數,在參數前添加前綴 kafka.output 即可。例如配置壓縮格式則使用 kafka.output.compression.codec 。除此之外,Kafka broker信息(kafka.metadata.broker.list)、topic(kafka.output.topic)、schema(kafka.output.schema)被注入到job的配置中。

Hadoop consumer

Hadoop consumer是一個從Kafka broker拉取數據并推送到HDFS中的Hadoop job。

一個Hadoop job并行地將Kafka數據寫到HDFS中,加載數據的mapper數量取決于輸入文件夾中文件的數量。輸出文件夾中包括來自Kafka的數據和更新的topic offset。每個mapper在map task結束時將最后消費的消息offset寫入HDFS。如果job是被或者被重啟,每個mapper只是從HDFS中存儲的offset處開始讀取。

Kafka的源碼的contrib文件夾中包含hadoop-consumer示例。運行示例需要配置一下文件test/test.properties中的參數:

  • kafka.etl.topic:要獲取的topic。
  • kafka.server.uri:Kafka服務器地址。
  • input:輸入文件夾,文件夾內有使用DataGenerator生成的topic offset。文件夾內文件的數量決定了Hadoop mapper的數量。
  • output:輸出文件夾,文件夾內為來自Kafka的數據和更新的topic offset。
  • kafka.request.limit:用于限制取數據事件的數量。

在consumer中,實例KafkaETLRecordReader是與KafkaETLInputFormat相關的record reader。它從Kafka服務器中讀取數據,從input指定的offset開始到最大可用offset或者指定的上限(kafka.request.limit)。KafkaETLJob包含一些輔助函數用于初始化job配置,SimpleKafkaETLJob設置job屬性并提交Hadoop job。一旦job啟動了,SimpleKafkaETLMapper就從Kafka數據寫入ouptut指定的HDFS。

參考資料

Learing Apache Kafka-Second Edition

</div>

 本文由用戶 JoyceFKJ 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!