集群計算平臺:Spark

jopen 10年前發布 | 89K 次閱讀 Spark 集群/負載均衡

Spark是發源于美國加州大學伯克利分校AMPLab的集群計算平臺。它立足于內存計算,從多迭代批量處理出發,兼收并蓄數據倉庫、流處理和圖計算等多種計算范式,是罕見的全能選手。

Spark已正式申請加入Apache孵化器,從靈機一閃的實驗室“電火花”成長為大數據技術平臺中異軍突起的新銳。本文主要講述Spark的設計思想。Spark如其名,展現了大數據不常見的“電光石火”。具體特點概括為“輕、快、靈和巧”。集群計算平臺:Spark

  • :Spark 0.6核心代碼有2萬行,Hadoop 1.0為9萬行,2.0為22萬行。一方面,感謝Scala語言的簡潔和豐富表達力;另一方面,Spark很好地利用了Hadoop和Mesos(伯克利 另一個進入孵化器的項目,主攻集群的動態資源管理)的基礎設施。雖然很輕,但在容錯設計上不打折扣。主創人Matei聲稱:“不把錯誤當特例處理。”言下 之意,容錯是基礎設施的一部分。

  • :Spark對小數據集能達到亞秒級的延遲,這對于Hadoop MapReduce(以下簡稱MapReduce)是無法想象的(由于“心跳”間隔機制,僅任務啟動就有數秒的延遲)。就大數據集而言,對典型的迭代機器 學習、即席查詢(ad-hoc query)、圖計算等應用,Spark版本比基于MapReduce、Hive和Pregel的實現快上十倍到百倍。其中內存計算、數據本地性 (locality)和傳輸優化、調度優化等該居首功,也與設計伊始即秉持的輕量理念不無關系。

  • :Spark提供了不同層面的靈活性。在實現層,它完美演繹了Scala trait動態混入(mixin)策略(如可更換的集群調度器、序列化庫);在原語(Primitive)層,它允許擴展新的數據算子 (operator)、新的數據源(如HDFS之外支持DynamoDB)、新的language bindings(Java和Python);在范式(Paradigm)層,Spark支持內存計算、多迭代批量處理、即席查詢、流處理和圖計算等多種 范式。

  • :巧在借勢和借力。Spark借Hadoop之勢,與Hadoop無縫結合;接著 Shark(Spark上的數據倉庫實現)借了Hive的勢;圖計算借 用Pregel和PowerGraph的API以及PowerGraph的點分割思想。一切的一切,都借助了Scala(被廣泛譽為Java的未來取代 者)之勢:Spark編程的Look'n'Feel就是原汁原味的Scala,無論是語法還是API。在實現上,又能靈巧借力。為支持交互式編 程,Spark只需對Scala的Shell小做修改(相比之下,微軟為支持JavaScript Console對MapReduce交互式編程,不僅要跨越Java和JavaScript的思維屏障,在實現上還要大動干戈)。

說了一大堆好處,還是要指出Spark未臻完美。它有先天的限制,不能很好地支持細粒度、異步的數據處理;也有后天的原因,即使有很棒的基因,畢竟還剛剛起步,在性能、穩定性和范式的可擴展性上還有很大的空間。

計算范式和抽象

Spark首先是一種粗粒度數據并行(data parallel)的計算范式。

數據并行跟任務并行(task parallel)的區別體現在以下兩方面。

  • 計算的主體是數據集合,而非個別數據。集合的長度視實現而定,如SIMD(單指令多數據)向量指令一般是4到64,GPU的SIMT(單 指令多線程)一般 是32,SPMD(單程序多數據)可以更寬。Spark處理的是大數據,因此采用了粒度很粗的集合,叫做Resilient Distributed Datasets(RDD)。

  • 集合內的所有數據都經過同樣的算子序列。數據并行可編程性好,易于獲得高并行性(與數據規模相關,而非與程序邏輯的并行性相關),也易于 高效地映射到底層 的并行或分布式硬件上。傳統的array/vector編程語言、SSE/AVX intrinsics、CUDA/OpenCL、Ct(C++ for throughput),都屬于此類。不同點在于,Spark的視野是整個集群,而非單個節點或并行處理器。

數據并行的范式決定了 Spark無法完美支持細粒度、異步更新的操作。圖計算就有此類操作,所以此時Spark不如GraphLab(一個大規模圖計算框架);還有一些應用, 需要細粒度的日志更新和數據檢查點,它也不如RAMCloud(斯坦福的內存存儲和計算研究項目)和Percolator(Google增量計算技術)。 反過來,這也使Spark能夠精心耕耘它擅長的應用領域,試圖粗細通吃的Dryad(微軟早期的大數據平臺)反而不甚成功。

Spark的RDD,采用了Scala集合類型的編程風格。它同樣采用了函數式語義(functional semantics):一是閉包,二是RDD的不可修改性。邏輯上,每一個RDD算子都生成新的RDD,沒有副作用,所以算子又被稱為是確定性的;由于所 有算子都是冪等的,出現錯誤時只需把算子序列重新執行即可。

Spark的計算抽象是數據流,而且是帶有工作集(working set)的數據流。流處理是一種數據流模型,MapReduce也是,區別在于MapReduce需要在多次迭代中維護工作集。工作集的抽象很普遍,如多 迭代機器學習、交互式數據挖掘和圖計算。為保證容錯,MapReduce采用了穩定存儲(如HDFS)來承載工作集,代價是速度慢。HaLoop采用循環 敏感的調度器,保證前次迭代的Reduce輸出和本次迭代的Map輸入數據集在同一臺物理機上,這樣可以減少網絡開銷,但無法避免磁盤I/O的瓶頸。

Spark的突破在于,在保證容錯的前提下,用內存來承載工作集。內存的存取速度快于磁盤多個數量級,從而可以極大提升性能。關鍵是實現容錯,傳統 上有兩種方法:日 志和檢查點。考慮到檢查點有數據冗余和網絡通信的開銷,Spark采用日志數據更新。細粒度的日志更新并不便宜,而且前面講過,Spark也不擅長。 Spark記錄的是粗粒度的RDD更新,這樣開銷可以忽略不計。鑒于Spark的函數式語義和冪等特性,通過重放日志更新來容錯,也不會有副作用。

編程模型

來看一段代碼:textFile算子從HDFS讀取日志文件,返回“file”(RDD);filter算子篩出帶“ERROR”的行,賦給 “errors”(新RDD);cache算子把它緩存下來以備未來使用;count算子返回“errors”的行數。RDD看起來與Scala集合類型 沒有太大差別,但它們的數據和運行模型大相迥異。

集群計算平臺:Spark

圖1給出了RDD數據模型,并將上例中用到的四個算子映射到四種算子類型。Spark程序工作在兩個空間中:Spark RDD空間和Scala原生數據空間。在原生數據空間里,數據表現為標量(scalar,即Scala基本類型,用橘色小方塊表示)、集合類型(藍色虛線 框)和持久存儲(紅色圓柱)。

集群計算平臺:Spark

圖1 兩個空間的切換,四類不同的RDD算子

輸入算子(橘色箭頭)將Scala集合類型或存儲中的數據吸入RDD空間,轉為RDD(藍色實線框)。輸入算子的輸入大致有兩類:一類針對 Scala集合類型,如parallelize;另一類針對存儲數據,如上例中的textFile。輸入算子的輸出就是Spark空間的RDD。

因為函數語義,RDD經過變換(transformation)算子(藍色箭頭)生成新的RDD。變換算子的輸入和輸出都是RDD。RDD會被劃分 成很多的分區 (partition)分布到集群的多個節點中,圖1用藍色小方塊代表分區。注意,分區是個邏輯概念,變換前后的新舊分區在物理上可能是同一塊內存或存 儲。這是很重要的優化,以防止函數式不變性導致的內存需求無限擴張。有些RDD是計算的中間結果,其分區并不一定有相應的內存或存儲與之對應,如果需要 (如以備未來使用),可以調用緩存算子(例子中的cache算子,灰色箭頭表示)將分區物化(materialize)存下來(灰色方塊)。

一部分變換算子視RDD的元素為簡單元素,分為如下幾類:

  • 輸入輸出一對一(element-wise)的算子,且結果RDD的分區結構不變,主要是map、flatMap(map后展平為一維RDD);

  • 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union(兩個RDD合為一個)、coalesce(分區減少);

  • 從輸入中選擇部分元素的算子,如filter、distinct(去除冗余元素)、subtract(本RDD有、它RDD無的元素留下來)和sample(采樣)。

另一部分變換算子針對Key-Value集合,又分為:

  • 對單個RDD做element-wise運算,如mapValues(保持源RDD的分區方式,這與map不同);

  • 對單個RDD重排,如sort、partitionBy(實現一致性的分區劃分,這個對數據本地性優化很重要,后面會講);

  • 對單個RDD基于key進行重組和reduce,如groupByKey、reduceByKey;

  • 對兩個RDD基于key進行join和重組,如join、cogroup。

后三類操作都涉及重排,稱為shuffle類操作。

從RDD到RDD的變換算子序列,一直在RDD空間發生。這里很重要的設計是lazy evaluation:計算并不實際發生,只是不斷地記錄到元數據。元數據的結構是DAG(有向無環圖),其中每一個“頂點”是RDD(包括生產該RDD 的算子),從父RDD到子RDD有“邊”,表示RDD間的依賴性。Spark給元數據DAG取了個很酷的名字,Lineage(世系)。這個 Lineage也是前面容錯設計中所說的日志更新。

Lineage一直增長,直到遇上行動(action)算子(圖1中的綠色箭頭),這時 就要evaluate了,把剛才累積的所有算子一次性執行。行動算子的輸入是RDD(以及該RDD在Lineage上依賴的所有RDD),輸出是執行后生 成的原生數據,可能是Scala標量、集合類型的數據或存儲。當一個算子的輸出是上述類型時,該算子必然是行動算子,其效果則是從RDD空間返回原生數據 空間。

行動算子有如下幾類:生成標量,如count(返回RDD中元素的個數)、reduce、fold/aggregate(見 Scala同名算子文檔);返回幾個標量,如take(返回前幾個元素);生成Scala集合類型,如collect(把RDD中的所有元素倒入 Scala集合類型)、lookup(查找對應key的所有值);寫入存儲,如與前文textFile對應的saveAsText-File。還有一個檢 查點算子checkpoint。當Lineage特別長時(這在圖計算中時常發生),出錯時重新執行整個序列要很長時間,可以主動調用 checkpoint把當前數據寫入穩定存儲,作為檢查點。

這里有兩個設計要點。首先是lazy evaluation。熟悉編譯的都知道,編譯器能看到的scope越大,優化的機會就越多。Spark雖然沒有編譯,但調度器實際上對DAG做了線性復 雜度的優化。尤其是當Spark上面有多種計算范式混合時,調度器可以打破不同范式代碼的邊界進行全局調度和優化。下面的例子中把Shark的SQL代碼 和Spark的機器學習代碼混在了一起。各部分代碼翻譯到底層RDD后,融合成一個大的DAG,這樣可以獲得更多的全局優化機會。

集群計算平臺:Spark

另一個要點是一旦行動算子產生原生數據,就必須退出RDD空間。因為目前Spark只能夠跟蹤RDD的計算,原生數據的計算對它來說是不可見的(除 非以后 Spark會提供原生數據類型操作的重載、wrapper或implicit conversion)。這部分不可見的代碼可能引入前后RDD之間的依賴,如下面的代碼:

集群計算平臺:Spark

第三行filter對errors.count()的依賴是由(cnt-1)這個原生數據運算產生的,但調度器看不到這個運算,那就會出問題了。

由于Spark并不提供控制流,在計算邏輯需要條件分支時,也必須回退到Scala的空間。由于Scala語言對自定義控制流的支持很強,不排除未來Spark也會支持。

Spark 還有兩個很實用的功能。一個是廣播(broadcast)變量。有些數據,如lookup表,可能會在多個作業間反復用到;這些數據比RDD要小得多,不 宜像RDD那樣在節點之間劃分。解決之道是提供一個新的語言結構——廣播變量,來修飾此類數據。Spark運行時把廣播變量修飾的內容發到各個節點,并保 存下來,未來再用時無需再送。相比Hadoop的distributed cache,廣播內容可以跨作業共享。Spark提交者Mosharaf師從P2P的老法師Ion Stoica,采用了BitTorrent(沒錯,就是下載電影的那個BT)的簡化實現。有興趣的讀者可以參考SIGCOMM'11的論文 Orchestra。另一個功能是Accumulator(源于MapReduce的counter):允許Spark代碼中加入一些全局變量做 bookkeeping,如記錄當前的運行指標。

運行和調度

圖2顯示了Spark程序的運行場景。它由客戶端啟動,分兩個階段:第一階段記錄變換算子序列、增量構建DAG圖;第二階段由行動算子觸 發,DAGScheduler把DAG圖轉化為作業及其任務集。Spark支持本地單節點運行(開發調試有用)或集群運行。對于后者,客戶端運行于 master節點上,通過Cluster manager把劃分好分區的任務集發送到集群的worker/slave節點上執行。

集群計算平臺:Spark

圖2 Spark程序運行過程

Spark 傳統上與Mesos“焦不離孟”,也可支持Amazon EC2和YARN。底層任務調度器的基類是個trait,它的不同實現可以混入實際的執行。例如,在Mesos上有兩種調度器實現,一種把每個節點的所有 資源分給Spark,另一種允許Spark作業與其他作業一起調度、共享集群資源。worker節點上有任務線程(task thread)真正運行DAGScheduler生成的任務;還有塊管理器(block manager)負責與master上的block manager master通信(完美使用了Scala的Actor模式),為任務線程提供數據塊。

最有趣的部分是DAGScheduler。下面詳解它的工作過程。RDD的數據結構里很重要的一個域是對父RDD的依賴。如圖3所示,有兩類依賴:窄(Narrow)依賴和寬(Wide)依賴。

集群計算平臺:Spark

圖3 窄依賴和寬依賴

窄依賴指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區,和兩個父RDD的分區對應于一個 子RDD 的分區。圖3中,map/filter和union屬于第一類,對輸入進行協同劃分(co-partitioned)的join屬于第二類。

寬依賴指子RDD的分區依賴于父RDD的所有分區,這是因為shuffle類操作,如圖3中的groupByKey和未經協同劃分的join。

窄依賴對優化很有利。邏輯上,每個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個并行任務的 barrier): 把計算fork到每個分區,算完后join,然后fork/join下一個RDD的算子。如果直接翻譯到物理實現,是很不經濟的:一是每一個RDD(即使 是中間結果)都需要物化到內存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分區到 父RDD的分區是窄依賴,就可以實施經典的fusion優化,把兩個fork/join合為一個;如果連續的變換算子序列都是窄依賴,就可以把很多個 fork/join并為一個,不但減少了大量的全局barrier,而且無需物化很多中間結果RDD,這將極大地提升性能。Spark把這個叫做流水線 (pipeline)優化。

變換算子序列一碰上shuffle類操作,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前算子往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的算子序列。在這個stage里,可以安全地實 施流水線優化。然后,又從那個寬依賴開始繼續回溯,生成下一個stage。

要深究兩個問題:一,分區如何劃分;二,分區該放到集群內哪個節點。這正好對應于RDD結構中另外兩個域:分區劃分器(partitioner)和首選位置(preferred locations)。

分區劃分對于shuffle類操作很關鍵,它決定了該操作的父RDD和子RDD之間的依賴類型。上文提到,同一個join算子,如果協同劃分的話, 兩個父 RDD之間、父RDD與子RDD之間能形成一致的分區安排,即同一個key保證被映射到同一個分區,這樣就能形成窄依賴。反之,如果沒有協同劃分,導致寬 依賴。

所謂協同劃分,就是指定分區劃分器以產生前后一致的分區安排。Pregel和HaLoop把這個作為系統內置的一部分;而Spark 默認提供兩種劃分器:HashPartitioner和RangePartitioner,允許程序通過partitionBy算子指定。注 意,HashPartitioner能夠發揮作用,要求key的hashCode是有效的,即同樣內容的key產生同樣的hashCode。這對 String是成立的,但對數組就不成立(因為數組的hashCode是由它的標識,而非內容,生成)。這種情況下,Spark允許用戶自定義 ArrayHashPartitioner。

第二個問題是分區放置的節點,這關乎數據本地性:本地性好,網絡通信就少。有些RDD產生時就 有首選位置,如HadoopRDD分區的首選位置就是HDFS塊所在的節點。有些RDD或分區被緩存了,那計算就應該送到緩存分區所在的節點進行。再不 然,就回溯RDD的lineage一直找到具有首選位置屬性的父RDD,并據此決定子RDD的放置。

寬/窄依賴的概念不止用在調度中,對容錯也很有用。如果一個節點宕機了,而且運算是窄依賴,那只要把丟失的父RDD分區重算即可,跟其他節點沒有依 賴。而寬依賴需要父RDD的所有分區都存在, 重算就很昂貴了。所以如果使用checkpoint算子來做檢查點,不僅要考慮lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加檢查點是最物 有所值的。

結語

因為篇幅所限,本文只能介紹Spark的基本概念和設計思想,內容來自Spark的多篇論文(以NSDI'12 “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”為主),也有我和同事研究Spark的心得,以及多年來從事并行/分布式系統研究的感悟。Spark核心成員/Shark主創者辛湜 對本文作了審閱和修改,特此致謝!

Spark站在一個很高的起點上,有著高尚的目標,但它的征程還剛剛開始。Spark致力于構建開放的生態系統( http://spark-project.org/ https://wiki.apache.org/incubator/SparkProposal),愿與大家一起為之努力!

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!