Spark Streaming 結合 Kafka 兩種不同的數據接收方式比較

DirectKafkaInputDStream 只在 driver 端接收數據,所以繼承了 InputDStream,是沒有 receivers 的

在結合 Spark Streaming 及 Kafka 的實時應用中,我們通常使用以下兩個 API 來獲取最初的 DStream(這里不關心這兩個 API 的重載):

KafkaUtils#createDirectStream

KafkaUtils#createStream

這兩個 API 除了要傳入的參數不同外,接收 kafka 數據的節點、拉取數據的時機也完全不同。本文將分別就兩者進行詳細分析。

KafkaUtils#createStream

先來分析 createStream ,在該函數中,會新建一個 KafkaInputDStream 對象, KafkaInputDStream 繼承于 ReceiverInputDStream 。

  1. 繼承ReceiverInputDStream的類需要重載 getReceiver 函數以提供用于接收數據的 receiver
  2. recever 會調度到某個 executor 上并啟動,不間斷的接收數據并將收到的數據交由 ReceiverSupervisor 存成 block 作為 RDD 輸入數據

KafkaInputDStream當然也實現了getReceiver方法,如下:

def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      //< 不啟用 WAL
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      //< 啟用 WAL
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
  }

根據是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver。 

  1. receiver 是如何被分發啟動的
  2. receiver 接受數據后數據的流轉過程
    并在 揭開Spark Streaming神秘面紗③ - 動態生成 job 一文中詳細介紹了
  3. receiver 接受的數據存儲為 block 后,如何將 blocks 作為 RDD 的輸入數據
  4. 動態生成 job

以上兩篇文章并沒有具體介紹 receiver 是如何接收數據的,當然每個重載了 ReceiverInputDStream 的類的 receiver 接收數據方式都不相同。下圖描述了 KafkaReceiver 接收數據的具體流程:

KafkaUtils#createDirectStream

在 揭開Spark Streaming神秘面紗③ - 動態生成 job 中,介紹了在生成每個 batch 的過程中,會去取這個 batch 對應的 RDD,若未生成該 RDD,則會取該 RDD 對應的 blocks 數據來生成 RDD,最終會調用到 DStream#compute(validTime: Time) 函數,在 KafkaUtils#createDirectStream 調用中,會新建 DirectKafkaInputDStream , DirectKafkaInputDStream#compute(validTime: Time) 會從 kafka 拉取數據并生成 RDD,流程如下:

如上圖所示,該函數主要做了以下三個事情:

  1. 確定要接收的 partitions 的 offsetRange,以作為第2步創建的 RDD 的數據來源
  2. 創建 RDD 并執行 count 操作,使 RDD 真實具有數據
  3. 以 streamId、數據條數,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中

進一步看 KafkaRDD 的 getPartitions 實現:

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }

從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數據與 Kafka topic 的某個 partition 的 o.fromOffset 至 o.untilOffset 數據是相對應的,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對應的

通過以上分析,我們可以對這兩種方式的區別做一個總結:

  1. createStream會使用 Receiver;而createDirectStream不會
  2. createStream使用的 Receiver 會分發到某個 executor 上去啟動并接受數據;而createDirectStream直接在 driver 上接收數據
  3. createStream使用 Receiver 源源不斷的接收數據并把數據交給 ReceiverSupervisor 處理最終存儲為 blocks 作為 RDD 的輸入,從 kafka 拉取數據與計算消費數據相互獨立;而createDirectStream會在每個 batch 拉取數據并就地消費,到下個 batch 再次拉取消費,周而復始,從 kafka 拉取數據與計算消費數據是連續的,沒有獨立開
  4. createStream中創建的KafkaInputDStream 每個 batch 所對應的 RDD 的 partition 不與 Kafka partition 一一對應;而createDirectStream中創建的 DirectKafkaInputDStream 每個 batch 所對應的 RDD 的 partition 與 Kafka partition 一一對應

 

來自:http://www.jianshu.com/p/60344796f8a5

 

 本文由用戶 MilBrinkley 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!