深入理解 Spark RDD 抽象模型和編寫 RDD 函數
深入理解Spark RDD抽象模型和編寫RDD函數
Spark revolves around the concept of a resilient distributed dataset (RDD), which is an immutable , fault-tolerant , partitioned collection of elements that can be operated on in parallel.
第二篇筆記介紹RDD,整個Spark項目的精髓所在,也是理解Spark源碼的金鑰匙。RDD是一個很棒的分布式計算抽象模型,它提供了通用的數據處理方法和高效的分布式容錯機制,Spark是它的一種實現。
Spark基礎知識
開始正題之前,先簡單介紹一些Spark的基本概念,以便深入介紹RDD。Spark的api運算函數分為兩大類, Transformation 和 Action :Transformations是 lazy evaluation 的,調用他們只會被記錄而不會被真正執行,只有遇到Actions,之前的Transformations才會被依次執行,這樣的 Delay Scheduling ,Spark可以看到完整的計算流程圖(以DAG有向無環圖的形式表示),從而做更多的優化,Actions會返回結果給Driver或者保存結果到external storages。Spark的基本工作流程是,用戶提交程序給cluster,用戶的main函數會在 Driver 上面運行,根據用戶的程序Spark會產生很多的 Jobs ,原則是遇到一個 Action 就產生一個 Job ,以DAG圖的方式記錄RDD之間的依賴關系,每一個Job又會根據這些依賴關系被DAGScheduler分成不同的 Stages ,每一個Stage是一個 TaskSet ,以TaskSet為單位,TaskScheduler通過Cluster Manager一批一批地調度到不同node上運行,同一個TaskSet里面的Task都做同樣的運算,一個Partition對應一個Task。這個工作流程其實就是Spark的Scheduling Process,附上超經典的示意圖,不要糾結這圖上的細節,后面會有專門的筆記詳細介紹這張 藏寶圖 :
RDD抽象模型
設計動機
當初設計RDD主要是為了解決三個問題:
-
Fast: Spark之前的Hadoop用的是MapReduce的編程模型,沒有很好的利用分布式內存系統,中間結果都需要保存到external disk,運行效率很低。RDD模型是in-memory computing的,中間結果不需要被物化(materialized),它的 persistence 機制,可以保存中間結果重復使用,對需要迭代運算的機器學習應用和交互式數據挖掘應用,加速顯著。Spark快還有一個原因是開頭提到過的 Delay Scheduling 機制,它得益于RDD的Dependency設計。
-
General: MapReduce編程模型只能提供有限的運算種類(Map和Reduce),RDD希望支持更廣泛更多樣的operators(map,flatMap,filter等等),然后用戶可以任意地組合他們。
The ability of RDDs to accommodate computing needs that were previously met only by introducing new frameworks is, we believe, the most credible evidence of the power of the RDD abstraction.
-
Fault tolerance: 其他的in-memory storage on clusters,基本單元是可變的,用細粒度更新( fine-grained updates )方式改變狀態,如改變table/cell里面的值,這種模型的容錯只能通過復制多個數據copy,需要傳輸大量的數據,容錯效率低下。而RDD是 不可變的(immutable) ,通過粗粒度變換( coarse-grained transformations ),比如map,filter和join,可以把相同的運算同時作用在許多數據單元上,這樣的變換只會產生新的RDD而不改變舊的RDD。這種模型可以讓Spark用 Lineage 很高效地容錯(后面會有介紹)。
定義和特性
在開頭的引用中,Spark官網給RDD的定義是:RDD is an immutable , fault-tolerant , partitioned collection of elements that can be operated on in
parallel 。可見RDD有以下幾個特點:
- immutable:任何操作都不會改變RDD本身,只會創造新的RDD
- fault-tolerant:通過Lineage可以高效容錯
- partitioned:RDD以partition作為最小存儲和計算單元,分布在cluster的不同nodes上,一個node可以有多個partitions,一個partition只能在一個node上
- in parallel:一個Task對應一個partition,Tasks之間相互獨立可以并行計算
另外還有兩點
- persistence:用戶可以把會被重復使用的RDDs保存到storage上(內存或者磁盤)
- partitioning:用戶可以選擇RDD元素被partitioned的方式來優化計算,比如兩個需要被join的數據集可以用相同的方式做hash-partitioned,這樣可以減少shuffle提高性能
RDD可以通過兩種方式產生,讀取外部數據集(比如HadoopRDD)和通過Transformations從其他RDDs轉變而來(比如FilteredRDD)。
Lineage和Dependencies
Spark會記錄用于產生某個RDD的所有transformations,稱為 Lineage ,更加直白地理解就是一個RDD知道它是如果從它的父輩RDD們轉換而來的,下圖是一個RDD Lineage Graph,它是一種DAG Graph:
Lineage是Spark高效容錯的法寶,因為有了Lineage信息,RDD可以重新單獨計算某個丟失的partition,而不用計算整個RDD。
Lineage基于RDD的Dependencies,所以Lineage Graph也可以說成Dependency Graph。RDD的Dependencies可以分為兩類:窄依賴( Narrow Dependencies )和寬依賴( Wide Dependencies )。下圖就是Dependency Graphs, 每個大方框是一個RDD,藍色的小矩陣是對應的Partitions:
如果輸入RDD的每個Partition只指向(用于計算)輸出RDD的一個Partition時,是窄依賴,否則的話就是寬依賴
Spark的DAGScheduler就是基于Dependencies來劃分stages的(之后會有筆記介紹),因為窄依賴里面的運算符可以被pipeline在一起(不需要shuffle),用一個Task直接計算。
Spark的RDD源碼閱讀
有了前面的理論準備,現在可以開始閱讀Spark的RDD模型源碼實現了。用Intellij IDEA打開了Spark項目,并且編譯通過,然后
command + o, 輸入RDD,打開org.apache.spark.rdd.RDD (RDD.scala文件)
RDD編程接口
Spark的RDD編程接口是一個抽象類,實例化需要輸入SparkContext和RDD的Dependencies,解釋兩點:
- RDD[T: ClassTag] : T代表了RDD每個元素的type,比如RDD[Int]代表每個元素是Int型。
- Seq[Dependency[_]] :代表RDD的父輩RDD們的類型是可以任意的,而不是一定要和這個RDD一樣,比如 RDD[Int] 通過 map(f: Int => String) 運算可以變成 RDD[String] 。
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
...
}
為了實現前一節介紹的那些RDD特性,Spark實現的RDD模型包含五個主要的屬性:
- A list of partitions
- A function for computing each partition
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each partition on (e.g. block locations for an HDFS file)
每一個RDD子類都必須實現這些屬性(在源碼里面是函數):
比如
-
HadoopRDD
- partitions = one per HDFS block
- dependencies = none
- compute(partition) = read corresponding block
- preferredLocations(part) = HDFS block location
- partitioner = none
-
FilteredRDD
- partitions = same as parent RDD
- dependencies = “one-to-one” on parent
- compute(partition) = compute parent and filter it
- preferredLocations(part) = none (ask parent)
- partitioner = none </ul> </li>
-
JoinedRDD
- partitions = one per reduce task
- dependencies = “shuffle” on each parent
- compute(partition) = read and join shuffled data
- preferredLocations(part) = none
- partitioner = HashPartitioner(numTasks) </ul> </li> </ul>
- 必須被子類實現的方法
- 所有RDD都擁有的方法和屬性
- persistence相關的函數
- 用于表示RDD的接口函數
- Transformations(返回新的RDD)
- Actions (啟動一個job來計算結果)
- 其他內部使用的方法和屬性
現在可以先閱讀Transformations函數源碼,其他的會在后面的筆記里面涉及,因為Actions都會調用 sc.runJon() 來運行job,深入理解Spark Scheduling之后再閱讀會比較容易理解。 這里以 flatMap 為例:
/**
- Return a new RDD by first applying a function to all elements of this
- RDD, and then flattening the results.
*/
def flatMapU: ClassTag: RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDDU, T => iter.flatMap(cleanF))
}</code></pre>
flatMap的功能是對RDD[T]類型的RDD上的每一個元素運行函數f,返回類型為RDD[U]的RDD。 withScope{body} 是為了確保運行body代碼塊產生的所有RDDs都在同一個scope里面。 val cleanF = sc.clean(f) 是為了確保這個f函數(專業稱為closure)是可以被序列化的,因為它需要被序列化后發送到不同的node上。想要了解他們的實現細節可以用IDE代碼跳轉功能:
在Intellij里面你可以選擇一個函數或者變量,然后Command + b,跳轉到它的實現細節,command + [ 可以返回原來的地方
這一行代碼是關鍵所在,實例化了一個MapPartitionsRDD,返回的是RDD[U]類型:
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
選擇MapPartitionsRDD,command + b, 跳轉到它的實現
MapPartitionsRDD[U, T]是RDD[U]的子類,它的源碼很好的詮釋了如何使用RDD編程接口:
/**
An RDD that applies the provided function to every partition of the parent RDD. */ private[spark] class MapPartitionsRDDU: ClassTag, T: ClassTag => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDDU {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() { super.clearDependencies() prev = null } }</code></pre>
MapPartitionsRDD[U, T]的partitioner和它的第一個parent RDD的partitioner保持一致(如果需要保留partitioner的話),它的partitions就是它的firstParent的partitions。它的compute函數只是調用了flatMap實例化它時輸入的函數。 再次聚焦需要傳遞的函數的函數類型, 實際傳遞的函數和調用它的代碼:
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
(context, pid, iter) => iter.flatMap(cleanF)
f(context, split.index, firstParent[T].iterator(split, context))</code></pre>
iter:Iterator[T]是一個Partition上的元素迭代器,用來遍歷RDD[T]的第pid個partition上的所有元素。 firstParent[T].iterator(split, context) 就是返回parentRDD的對應partition的迭代器iter:Iterator[T]: 如果已經保存了就直接讀取,否則重新計算(可以跳轉看它的實現)。有了這個迭代器iter之后,然后用 iter.flatMap(cleanF) 來產生新的迭代器,返回類型是Iterator[U],這個就是最終返回的RDD: RDD[U]的partition的迭代器啦。 iter.flatMap(cleanF) 只是純粹的scala flatMap計算,在不同的node上運行。 這部分內容,可能比較繞,只有看著源碼不斷的跳轉,才能比較好的理解。高度總結一下,Transformations函數的實現,就是對RDD上每一個partition的iterator進行相應的操作。
RDD隱式轉換
在RDD.scala文件的最后有一個object RDD,這個很有意思,也體現了scala implicit的強大之處:
它的作用就是在編譯代碼的時候,scala編譯器會自動識別特定類型的RDD,然后調用對應的implicit function來擴展這種特定類型的RDD的api函數,比如scala編譯器看到RDD[K, V],就會調用這個函數
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }
然后這個RDD就擁有PairRDDFunctions類里面的專屬函數啦:
編寫RDD API函數
只說不練假把式,終于到了動手環節了,打起精神啦!任務是編寫一個和scala的flatten函數類似的RDD API函數:
scala> Seq(Seq(1,2), Seq(3)).flatten res0: Seq[Int] = List(1, 2, 3)
雖然它可以用 flatMap(x => x) 來代替(這也是為什么spark RDD api函數里面沒有它),但是自己動手實現一下這個簡單的api函數,可以加深對RDD模型的理解,也能熟悉Spark開發流程,甚至貢獻代碼給社區。
難點和突破口
看到這個任務最直觀的想法,就是修改一下spark的flatMap,比如:
def flatten[U: ClassTag](): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatten) }
但是問題沒有那么簡單,在Intellij里面你會看到iter: Iterator[T]沒有flatten這個函數,因為可以flatten的數據集的元素必定是可以遍歷的類型,比如Seq[Int],所以Iterator[T]的變量沒有flatten,只有類似Iterator[Seq[T]]]的變量才有。 然后第二想法,是用flatMap(x => x):
def flatten[U: ClassTag](): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap((x => x)) }
再一次,失敗了,同樣因為x是T類型的,編譯器認為它是不可遍歷的,而flatMap必須接受可遍歷類型作為返回。 到這里我們基本認清了問題的難點所在和關鍵突破口,是不是激起了你的挑戰欲望,你可以自己試試。下一節開始講可行的解決方案。
開發和測試
我們可以參考Scala flatten的實現源碼和Spark flatMap的實現源碼:
def flattenB: CC[B] = { val b = genericBuilder[B] for (xs <- sequential) b ++= asTraversable(xs).seq b.result() }
def flatMapU: ClassTag: RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDDU, T => iter.flatMap(cleanF)) }</code></pre>
然后編寫spark flatten api函數如下:
def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { val f = (x: T) => asTraversable(x) val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
其中asTraversable是隱式函數,是為了把type T隱式地轉換為可以遍歷的TraversableOnce[U]類型(是Seq[Int]的父類,深入可以去了解scala的type system)。為什么要這樣轉換,就如第一節講的,因為Spark代碼里面,RDD[T]的元素(也是 iter 迭代器的元素)類型是T,編譯器并不知道T是否是可遍歷類型,也就不能進一步flatten啦。
開始測試代碼了,打開你的Terminal,macOS上我用 iTerm2
git checkout -b SN-add-rdd-flatten build/sbt clean package // 復制flatten的實現到RDD.scala里面,比如flatMap函數下面 export SPARK_PREPEND_CLASSES=true ./build/sbt compile
SPARK_PREPEND_CLASSES 是用于迭代式開發,我們已經用 sbt package 打包好了所有的依賴,只要讓Spark編譯你修改的部分,而不用編譯全部的依賴,加速開發。參考 Useful Developer Tools
然后在spark-shell里面就能看到你的flatten函數了,結果如下,如果你也成功了,就給我點star吧,哈哈:
完整的開發流程,還需要有測試:
command + o, 跳轉到RDDSuite.scala
在 test("basic operations") {} 下面添加:
test("flatten") { val nestedNums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2) assert(nestedNums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6)) }
然后確認測試通過:
到了這一步,算是大功告成了,你甚至已經可以給spark社區提交pr啦,哈哈。 因為有了test,我們可以隨意重構我們的代碼,比如不想用flatMap:
def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => { var newIter: Iterator[U] = Iterator.empty for (x <- iter) newIter ++= asTraversable(x) newIter }) }
或者用更簡潔的foldLeft:
def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => { val emptyIter: Iterator[U] = Iterator.empty iter.foldLeft[Iterator[U]](emptyIter)((newIter, x) => newIter ++ asTraversable(x)) }) }
使用RDD隱式轉換
上面的實現,個人覺得已經OK,但是有一個不是”問題”的問題,如果你把flatten作用到一個不適用的RDD上面:
scala> val rdd = sc.parallelize(Seq(1,2,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.flatten <console>:27: error: No implicit view available from Int => scala.collection.TraversableOnce[U]. rdd.flatten ^</code></pre>
我說它不是“問題“,因為scala也是這樣的:
scala> val l = List(1,2,3) l: List[Int] = List(1, 2, 3)
scala> l.flatten <console>:13: error: No implicit view available from Int => scala.collection.GenTraversableOnce[B]. l.flatten ^</code></pre>
那么真要解決它,也就是在不適用的RDD上面,不要顯示這個函數,應該怎么做?用RDD隱式轉換!這種方法也有一些limitation,因為RDD是invariant的(一個很復雜的topic,這里不講了),不能做到像上面的方法那樣generic,但是它的思想是很值得學習的,也是很”spark“的解決方案:
第一步:參考PairRDDFunctions.scala和DoubleRDDFunctions.scala,在和RDD.scala相同的目錄下創建TraversableRDDFunctions.scala:
package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
/**
Extra functions available on RDDs of TraversableOnce through an implicit conversion. */ class TraversableRDDFunctionsU(implicit et: ClassTag[U]) extends Logging with Serializable {
def flattenX: RDD[U] = self.withScope { self.mapPartitions(iter => iter.flatten) // iter.foldLeft(Iterator.empty)((a,b) => a ++ b.toIterator) ) } }</code></pre>
第二步,添加隱式轉換函數到object RDD里面:
implicit def rddToTraversableRDDFunctions[U](rdd: RDD[Seq[U]]) (implicit et: ClassTag[U]): TraversableRDDFunctions[U] = { new TraversableRDDFunctions(rdd) }
第三步,編譯:
./build/sbt compile
第四步,運行spark-shell,可以看到flattenX函數只出現在RDD[Seq[Int]]上:
現在可以簡單示意一下,為什么這種方法,有局限性,對RDD[List[Int]],flatten照常工作,flattenX卻不能了,因為RDD is invariant:
List[Int]是Seq[?]的子類,但是RDD[List[Int]]不是RDD[Set[?]]的子類:
來自:https://github.com/linbojin/spark-notes/blob/master/rdd-abstraction.md
RDD函數
RDD抽象類的源碼可以分成幾大部分: