Spark Streaming 1.6 流式狀態管理分析
來自: http://www.jianshu.com/p/1463bc1d81b5
Spark 1.6發布后,官方聲稱流式狀態管理有10倍性能提升。這篇文章會詳細介紹Spark Streaming里新的流式狀態管理。
</div>
關于狀態管理
在流式計算中,數據是持續不斷來的,有時候我們要對一些數據做跨周期(Duration)的統計,這個時候就不得不維護狀態了。而狀態管理對Spark 的 RDD模型是個挑戰,因為在spark里,任何數據集都需要通過RDD來呈現,而RDD 的定義是一個不變的分布式集合。在狀態管理中,比如Spark Streaming中的word-count 就涉及到更新原有的記錄,比如在batch 1 中 A 出現1次,batch 2中出現3次,則總共出現了4次。這里就有兩種實現:
- 獲取batch 1 中的 狀態RDD 和當前的batch RDD 做co-group 得到一個新的狀態RDD。這種方式完美的契合了RDD的不變性,但是對性能卻會有比較大的影響,因為需要對所有數據做處理,計算量和數據集大小是成線性相關的。這個我們后續會詳細討論。
- 第二種是一種變通的實現。因為沒法變更RDD/Partition等核心概念,所以Spark Streaming在集合元素上做了文章,添加了 MapWithStateRDDRecord 這個東西。其實對應的就是模仿了partition的概念,然后里面維護了一個stateMap,存儲最后的結果。
這兩個方案分別對應了 updateStateByKey/mapWithState 的實現。
前言
在這篇文章中, Apache Spark 1.6發布 ,提到了spark1.6 三塊性能改進:
- Parquet性能
- 自動內存管理模型
- 流式狀態管理10倍性能提升
之前就想系統的對這三塊仔細闡述下。現在總算有了第二篇。
本文會從三個方面展開:
- updateStateByKey的實現;
- mapWithState(1.6新引入的流式狀態管理)的實現
- mapWithState額外內容
updateStateByKey的實現
在關于狀態管理中,我們已經描述了一個大概。該方法可以在 org.apache.spark.streaming.dstream.PairDStreamFunctions 中找到。調用該方法后會構建出一個 org.apache.spark.streaming.dstream.StateDStream 對象。計算的方式也較為簡單,核心邏輯是下面兩行代碼:
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD)
首先將 prevStateRDD 和 parentRDD (新batch 的數據) 做一次cogroup,形成了 (K, Seq[V], Seq[W]) 這樣的結果集。你會發現和updateStateByKey 要求的 (Seq[V], Option[S]) 簽名還是有些類似的。事實上這里的Seq[V] 就是parentRDD的對應K 的新的值。為了適配他兩,Spark 內部會對你傳進來的 updateFunc 做兩次轉換,從而使得你的函數能夠接受 (K, Seq[V], Seq[W]) 這樣的參數。看到這,想必你也就知道為啥updateStateByKey 接受的函數簽名是那樣的了。
前文我們提到,這樣做很漂亮,代碼也少,契合RDD的概念,然而你會發現無論parentRDD里有多少key,哪怕是只有一個,也需要對原有所有的數據做cogroup 并且全部做一遍處理(也就是應用你的update函數)。顯然這是很低效的。很多場景下,新的batch 里只有一小部分數據,但是我們卻不得不對所有的數據都進行計算。
正因為上面的問題,所以Spark Streaming 提出了一個新的API mapWithState ,對應的jira為: Improved state management for Spark Streaming 。除了我前面提到的性能問題,新的API 還提供兩個新的功能:
- 可以為Key 設置TTL(Timeout)
- 用戶可以對返回值進行控制
mapWithState(1.6新引入的流式狀態管理)的實現
前面我們提到,在新的mapWithState API 中,核心思路是創建一個新的MapWithStateRDD,該RDD的元素是 MapWithStateRDDRecord ,每個MapWithStateRDDRecord 記錄某個Partiton下所有key的State。
依然的,你在 org.apache.spark.streaming.dstream.PairDStreamFunctions 可以看到mapWithState 簽名。
@Experimental def mapWithState[StateType: ClassTag, MappedType: ClassTag]( spec: StateSpec[K, V, StateType, MappedType] ): MapWithStateDStream[K, V, StateType, MappedType] = { new MapWithStateDStreamImpl[K, V, StateType, MappedType]( self, spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]] ) }
這一段代碼有三點值得注意:
- 該接口在1.6 中還是 Experimental 狀態
- 接受的不是一函數,而是一個StateSpec 的對象。
- 返回了一個新的DStream
其實StateSpec 只是一個包裹,你在實際操作上依然是定義一個函數,然后通過StateSpec進行包裹一下。以 wordcount 為例:
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output }
接著StateSpec.function(mappingFunc) 包裹一下就可以傳遞給mapWithState。我們看到該函數更加清晰,word 是K,one新值,state 是原始值(本batch之前的狀態值)。這里你需要把state 更新為新值,該實現是做了一個內部狀態維護的,不像updateStateByKey一樣,一切都是現算的。
MapWithStateDStreamImpl 的compute邏輯都委托給了 InternalMapWithStateDStream ,最終要得到 MapWithStateRDD ,基本是通過下面的邏輯來計算的:
val prevStateRDD = getOrCompute(validTime - slideDuration) match ... val dataRDD = parent.getOrCompute(validTime).getOrElse { context.sparkContext.emptyRDD[(K, V)]} ..... val partitionedDataRDD = dataRDD.partitionBy(partitioner) Some(new MapWithStateRDD( prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
這里有個很重要的操作是對dataRDD進行了partition操作,保證和prevStateRDD 按相同的分區規則進行分區。這個在后面做計算時有用。
獲取到prevStateRDD,接著獲取當前batch的數據的RDD,最后組裝成一個新的MapWithStateRDD。MapWithStateRDD 還接受你定義的函數 mappingFunction 以及key的超時時間。
其中MapWithStateRDD 和別的RDD 不同之處在于RDD里的元素是 MapWithStateRDDRecord 對象。其實prevStateRDD 也是個MapWithStateRDD 。
整個實際計算邏輯都在 MapWithStateRDDRecord.updateRecordWithData 方法里。
前面我們提到,MapWithStateRDDRecord 是prevStateRDD 里的元素。有多少個分區,就有多少個MapWithStateRDDRecord 。一個Record 對應一個分區下所有數據的狀態。在 MapWithStateRDDRecord.updateRecordWithData 方法中,第一步是copy 當前record 的狀態。這個copy是非常快的。我們會在 mapWithSate額外內容 那個章節有更詳細的分析。
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() } //prevRecord=MapWithStateRDDRecord[K, S, E]
接著定義了兩個變量,其中mappedData 會作為最后的計算結果返回,wrappedState 類似Hadoop里的 Text,你可以不斷給它賦值,然后獲得一些新的功能,避免返回創建對象。它主要是給state添加了一些方法,比如update,define狀態等。
val mappedData = new ArrayBuffer[E] val wrappedState = new StateImpl[S]()
接著遍歷當前batch 所有的數據,并且應用用戶定義的函數。這里我們看到,我們只對當前batch的數據進行函數計算,而不是針對歷史全集數據進行計算,這是一個很大的性能提升點。接著根據wrappedState的狀態對newStateMap做更新,主要是刪除或者數據的更新。最后將新的結果返回并且放到mappedData 。
dataIterator.foreach { case (key, value) => wrappedState.wrap(newStateMap.get(key)) val returned = mappingFunction(batchTime, key, Some(value), wrappedState) if (wrappedState.isRemoved) { newStateMap.remove(key) } else if (wrappedState.isUpdated || (wrappedState.exists && timeoutThresholdTime.isDefined)) { newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) } mappedData ++= returned }
上面這段邏輯,你會發現一個問題,如果dataIterator 里有重復的數據比如某個K 出現多次,則mappedData也會有多次。以wordcount 為例:
輸入數據 | mapWithState后的結果 | 調用stateSnapshots后的結果 |
---|---|---|
(hello, 1) | (hello, 1) | (hello, 3) |
(hello, 1) | (hello, 2) | (world, 2) |
(world, 1) | (world, 1) | |
(world, 1) | (world, 2) | |
(hello, 1) | (hello, 3) |
hello 出現了三次,所以會加入到mappedData中三次。其實我沒發現這么做的意義,并且我認為會對內存占用造成一定的壓力。
如果你想要最后的結果,需要調用完mapWithState 之后需要再調用一次stateSnapshots,就可以拿到第三欄的計算結果了。
經過上面的計算,我們對parentRDD里的每個分區進行計算,得到了mappedData以及newStateMap,這兩個對象一起構建出MapWithStateRDDRecord,而該Record 則形成一個Partition,最后構成新的MapWithStateRDD。
mapWithState額外內容
MapWithStateRDDRecord 透過stateMap 維護了某個分區下所有key的當前狀態。 在前面的分析中,我們第一步便是clone old stateMap。如果集合非常大,拷貝也是很費時才對,而且還耗費內存。
所以如何實現好stateMap 變得非常重要:
-
實現過程采用的是 增量copy 。也叫deltaMap。 新創建的stateMap 會引用舊的stateMap。新增數據會放到新的stateMap中,而更新,刪除,查找等操作則有可能發生在老得stateMap上。
<p>缺點也是有的,如果stateMap 鏈路太長,則可能會對性能造成一定的影響。我們只要在特定條件下做合并即可。目前是超過DELTA_CHAIN_LENGTH_THRESHOLD=20 時會做合并。</p>
</div> </li>
使用 org.apache.spark.util.collection.OpenHashMap ,該實現比 java.util.HashMap 快5倍,并且占用更少的內存空間。不過該HashMap 無法進行刪除操作。
</ol> </div>
本文由用戶 lh880331 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!