Spark 內核研究
1、Spark介紹
Spark是起源于美國加州大學伯克利分校AMPLab的大數據計算平臺,在2010年開源,目前是Apache軟件基金會的頂級項目。隨著Spark在大數據計算領域的暫露頭角,越來越多的企業開始關注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節點數,把100TB數據的排序時間從72分鐘提高到了23分鐘。
Spark在架構上包括內核部分和4個官方子模塊--Spark SQL、Spark Streaming、機器學習庫MLlib和圖計算庫GraphX。圖1所示為Spark在伯克利的數據分析軟件棧BDAS(Berkeley Data Analytics Stack)中的位置。可見Spark專注于數據的計算,而數據的存儲在生產環境中往往還是由Hadoop分布式文件系統HDFS承擔。
圖1 Spark在BDAS中的位置
Spark被設計成支持多場景的通用大數據計算平臺,它可以解決大數據計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數據源的讀取數據,并且擁有不斷發展的機器學習庫和圖計算庫供開發者使用。數據和計算在Spark內核及Spark的子模塊中是打通的,這就意味著Spark內核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內核為基礎,進一步支持更多的計算場景,例如使用Spark SQL讀入的數據可以作為機器學習庫MLlib的輸入。表1列舉了一些在Spark平臺上的計算場景。
表1 Spark的應用場景舉例
在本文寫作是,Spark的最新版本為1.2.0,文中的示例代碼也來自于這個版本。
2、Spark內核介紹
相信大數據工程師都非常了解Hadoop MapReduce一個最大的問題是在很多應用場景中速度非常慢,只適合離線的計算任務。這是由于MapReduce需要將任務劃分成map和reduce兩個階段,map階段產生的中間結果要寫回磁盤,而在這兩個階段之間需要進行shuffle操作。Shuffle操作需要從網絡中的各個節點進行數據拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時間耗費在網絡磁盤IO中而不是用于計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。
那Spark是如果設計分布式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數據集(Resilient Distributed Dataset),也就是RDD。
2.1 彈性分布數據集RDD
RDD是Spark中對數據和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的并能夠被并行操作的數據集合。對RDD的操作分為兩種transformation和action。Transformation操作是通過轉換從一個或多個RDD生成新的RDD。Action操作是從RDD生成最后的計算結果。在Spark最新的版本中,提供豐富的transformation和action操作,比起MapReduce計算模型中僅有的兩種操作,會大大簡化程序開發的難度。
RDD的生成方式只有兩種,一是從數據源讀入,另一種就是從其它RDD通過transformation操作轉換。一個典型的Spark程序就是通過Spark上下文環境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,最后通過調用最終RDD的action方法輸出結果。
每個RDD都可以用下面5個特性來表示,其中后兩個為可選的:
- 分片列表(數據塊列表)
- 計算每個分片的函數
- 對父RDD的依賴列表
- 對key-value類型的RDD的分片器(Partitioner)(可選)
- 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)(可選)
雖然Spark是基于內存的計算,但RDD不光可以存儲在內存中,根據useDisk、useMemory、useOffHeap, deserialized、replication五個參數的組合Spark提供了12種存儲級別,在后面介紹RDD的容錯機制時,我們會進一步理解。值得注意的是當StorageLevel設置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基于內存的分布式文件系統,目前正在快速發展,本文不做詳細介紹,可以通過其官方網站進一步了解。
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean private var _replication: Int = 1) extends Externalizable { //… } val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false)
2.2 DAG、Stage與任務的生成
Spark的計算發生在RDD的action操作,而對action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。
Spark內核會在需要計算發生的時刻繪制一張關于計算路徑的有向無環圖,也就是DAG。舉個例子,在圖2中,從輸入中邏輯上生成A和C兩個RDD,經過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因為這時候計算沒有發生,Spark內核做的事情只是記錄了RDD的生成和依賴關系。當F要進行輸出時,也就是F進行了action操作,Spark會根據RDD的依賴生成DAG,并從起點開始真正的計算。
圖2 邏輯上的計算過程:DAG
有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分成任務集,也就是Stage,這樣可以將任務提交到計算節點進行真正的計算。Spark計算的中間結果默認是保存在內存中的,Spark在劃分Stage的時候會充分考慮在分布式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據就是RDD的依賴類型。根據不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴于父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴于父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有groupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對于窄依賴,Spark會將其盡量劃分在同一個stage中,因為它們可以進行流水線計算。
圖3 RDD的寬依賴和窄依賴
我們再通過圖4詳細解釋一下Spark中的Stage劃分。我們從HDFS中讀入數據生成3個不同的RDD,通過一系列transformation操作后再將計算結果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內核會以此為邊界將其前后劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。
圖4 Spark中的Stage劃分
Spark在運行時會把Stage包裝成任務提交,有父Stage的Spark會先提交父Stage。弄清楚了Spark劃分計算的原理,我們再結合源碼看一看這其中的過程。下面的代碼是DAGScheduler中的得到一個RDD父Stage的函數,可以看到寬依賴為劃分Stage的邊界。
/** * Get or create the list of parent stages for a given RDD. The stages will be assigned the * provided jobId if they haven't already been created with a lower jobId. */ private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, jobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (!waitingForVisit.isEmpty) { visit(waitingForVisit.pop()) } parents.toList }
上面提到Spark的計算是從RDD調用action操作時候觸發的,我們來看一個action的代碼
RDD的collect方法是一個action操作,作用是將RDD中的數據返回到一個數組中。可以看到,在此action中,會觸發Spark上下文環境SparkContext中的runJob方法,這是一系列計算的起點。
abstract class RDD[T: ClassTag]( @transient private var sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { //…. /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } }
SparkContext擁有DAGScheduler的實例,在runJob方法中會進一步調用DAGScheduler的runJob方法。在此時,DAGScheduler會生成DAG和Stage,將Stage提交給TaskScheduler。TaskSchduler將Stage包裝成TaskSet,發送到Worker節點進行真正的計算,同時還要監測任務狀態,重試失敗和長時間無返回的任務。整個過程如圖5所示。
圖5 Spark中任務的生成
2.3 RDD的緩存與容錯
上文提到,Spark的計算是從action開始觸發的,如果在action操作之前邏輯上很多transformation操作,一旦中間發生計算失敗,Spark會重新提交任務,這在很多場景中代價過大。還有一些場景,如有些迭代算法,計算的中間結果會被重復使用,重復計算同樣增加計算時間和造成資源浪費。因此,在提高計算效率和更好支持容錯,Spark提供了基于RDDcache機制和checkpoint機制。
我們可以通過RDD的toDebugString來查看其遞歸的依賴信息,圖6展示了在spark shell中通過調用這個函數來查看wordCount RDD的依賴關系,也就是它的Lineage.
圖6 RDD wordCount的lineage
如果發現Lineage過長或者里面有被多次重復使用的RDD,我們就可以考慮使用cache機制或checkpoint機制了。
我們可以通過在程序中直接調用RDD的cache方法將其保存在內存中,這樣這個RDD就可以被多個任務共享,避免重復計算。另外,RDD還提供了更為靈活的persist方法,可以指定存儲級別。從源碼中可以看到RDD.cache就是簡單的調用了RDD.persist(StorageLevel.MEMORY_ONLY)。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) def cache(): this.type = persist()
同樣,我們可以調用RDD的checkpoint方法將其保存到磁盤。我們需要在SparkContext中設置checkpoint的目錄,否則調用會拋出異常。值得注意的是,在調用checkpoint之前建議先調用cache方法將RDD放入內存,否則將RDD保存到文件的時候需要重新計算。
/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() } }
Cache機制和checkpoint機制的差別在于cache將RDD保存到內存,并保留Lineage,如果緩存失效RDD還可以通過Lineage重建。而checkpoint將RDD落地到磁盤并切斷Lineage,由文件系統保證其重建。
2.4 Spark任務的部署
Spark的集群部署分為Standalone、Mesos和Yarn三種模式,我們以Standalone模式為例,簡單介紹Spark程序的部署。如圖7示,集群中的Spark程序運行時分為3種角色,driver, master和worker(slave)。在集群啟動前,首先要配置master和worker節點。啟動集群后,worker節點會向master節點注冊自己,master節點會維護worker節點的心跳。Spark程序都需要先創建Spark上下文環境,也就是SparkContext。創建SparkContext的進程就成為了driver角色,上一節提到的DAGScheduler和TaskScheduler都在driver中運行。Spark程序在提交時要指定master的地址,這樣可以在程序啟動時向master申請worker的計算資源。Driver,master和worker之間的通信由Akka支持。Akka 也使用 Scala 編寫,用于構建可容錯的、高可伸縮性的Actor 模型應用。關于Akka,可以訪問其官方網站進行進一步了解,本文不做詳細介紹。
圖7 Spark任務部署
3、更深一步了解Spark內核
了解了Spark內核的基本概念和實現后,更深一步理解其工作原理的最好方法就是閱讀源碼。最新的Spark源碼可以從Spark官方網站下載。源碼推薦使用IntelliJ IDEA閱讀,會自動安裝Scala插件。讀者可以從core工程,也就是Spark內核工程開始閱讀,更可以設置斷點嘗試跟蹤一個任務的執行。另外,讀者還可以通過分析Spark的日志來進一步理解Spark的運行機制,Spark使用log4j記錄日志,可以在啟動集群前修改log4j的配置文件來配置日志輸出和格式。
來源:明略數據 原文:http://www.mininglamp.com 作者: 明略數據科學家 孟嘉