Spark與Hadoop的結合
Spark可以直接對HDFS進行數據的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運行于同集群中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive完全兼容。
Spark 的核心概念
1、Resilient Distributed Dataset (RDD)彈性分布數據集
RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核 心的東西,它表示已被分區,不可變的并能夠被并行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache 到內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這 對于迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升比較大。
RDD的特點:
- 它是在集群節點上的不可變的、已分區的集合對象。
- 通過并行轉換的方式來創建如(map, filter, join, etc)。
- 失敗自動重建。
- 可以控制存儲級別(內存、磁盤等)來進行重用。
- 必須是可序列化的。
- 是靜態類型的。
RDD的好處:
- RDD只能從持久存儲或通過Transformations操作產生,相比于分布式共享內存(DSM)可以更高效實現容錯,對于丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
- RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。
- RDD的數據分區特性,可以通過數據的本地性來提高性能,這與Hadoop MapReduce是一樣的。
- RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現在的MapReduce。
RDD的存儲與分區:
- 用戶可以選擇不同的存儲級別存儲RDD以便重用。
- 當前RDD默認是存儲于內存,但當內存不足時,RDD會spill到disk。
- RDD在需要進行分區把數據分布于集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
RDD的內部表示:
- 分區列表(數據塊列表)
- 計算每個分片的函數(根據父RDD計算出此RDD)
- 對父RDD的依賴列表
- 對key-value RDD的Partitioner【可選】
- 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)【可選】
RDD的存儲級別:RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別。RDD定義了各種操作,不同類型的數據由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現。
RDD有兩種創建方式:
- 從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)創建。
- 從父RDD轉換得到新RDD。
2、Spark On Mesos
Spark支持Local調用和Mesos集群兩種模式,在Spark上開發算法程序,可以在本地模式調試成功后,直接改用Mesos集群運行, 除了文件的保存位置需要考慮以外,算法理論上不需要做任何修改。Spark的本地模式支持多線程,有一定的單機并發處理能力。但是不算很強勁。本地模式可 以保存結果在本地或者分布式文件系統,而Mesos模式一定需要保存在分布式或者共享文件系統。
為了在Mesos框架上運行,安裝Mesos的規范和設計,Spark實現兩個類,一個是SparkScheduler,在Spark中類名是 MesosScheduler;一個是SparkExecutor,在Spark中類名是Executor。有了這兩個類,Spark就可以通過 Mesos進行分布式的計算。Spark會將RDD和MapReduce函數,進行一次轉換,變成標準的Job和一系列的Task。提交給 SparkScheduler,SparkScheduler會把Task提交給Mesos Master,由Master分配給不同的Slave,最終由Slave中的Spark Executor,將分配到的Task一一執行,并且返回,組成新的RDD,或者直接寫入到分布式文件系統。

3、Transformations & Actions
對于RDD可以有兩種計算方式:轉換(返回值還是一個RDD)與操作(返回值不是一個RDD)。
- 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到 Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
- 操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
它們本質區別是:Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個 RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個 值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。關于這兩個動作,在Spark開發指南中會有就進一步的詳細介紹,它們是基于 Spark開發的核心。這里將Spark的官方ppt中的一張圖略作改造,闡明一下兩種動作的區別。

4、Lineage(血統)
利用內存加快數據加載,在眾多的其它的In-Memory類數據庫或Cache類系統中也有實現,Spark的主要區別在于它處理分布式運算環境 下的數據容錯性(節點實效/數據丟失)問題時采用的方案。為了保證RDD中數據的魯棒性,RDD數據集通過所謂的血統關系(Lineage)記住了它是如 何從其它RDD中演變過來的。相比其它系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換 (Transformation)操作(filter, map, join etc.)行為。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限 制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提升。
RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決數據容錯的高效性。
- Narrow Dependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。
- Wide Dependencies是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與 Wide Dependencies,這種計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點宕機時,通過重新計算,這種情況下,這 種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對于數據的重算開銷要遠小于Wide Dependencies的數據重算開銷。
在RDD計算,通過checkpint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現容錯,默認是logging the updates方式,通過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每個RDD的lineage(血統)來重新計算生成 丟失的分區數據。
Spark 的資源管理與作業調度
Spark對于資源管理與作業調度可以使用本地模式,Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。Spark on Yarn在Spark0.6時引用,但真正可用是在現在的branch-0.8版本。Spark on Yarn遵循YARN的官方規范實現,得益于Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就非常容 易,Spark on Yarn的大致框架圖。

讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。
編程接口
Spark通過與編程語言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個數據集都表示為RDD對象,對數據集 的操作就表示成對RDD對象的操作。Spark主要的編程語言是Scala,選擇Scala是因為它的簡潔性(Scala可以很方便在交互式下使用)和性 能(JVM上的靜態強類型語言)。
Spark和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節點)組成。 用戶編寫的Spark程序被稱為Driver程序,Dirver程序會連接master并定義了對各RDD的轉換與操作,而對RDD的轉換與操作通過 Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操作發送到各Workers節點。 Workers存儲著數據分塊和享有集群內存,是運行在工作節點上的守護進程,當它收到對RDD的操作時,根據數據分片信息進行本地化數據操作,生成新的 數據分片、返回結果或把RDD寫入存儲系統。

- Scala:Spark使用Scala開發,默認使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創 建或使用(SparkContext)實例,使用SparkContext創建RDD,然后就是對RDD進行操作。
- Java:Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,Scala與Java可以互操作,Java編程接口其實就是對Scala的封裝。如:
- Python:現在Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操作,從而實現使用 python編寫Spark程序。Spark也同樣提供了pyspark,一個Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。
Spark 生態系統
- Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive一樣的H iveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過配置Shark參數,Shark可以自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark 通過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一起,最大化RDD的重復使用。
- Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部 分數據。Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用于實時計算,另一方面相比基于Record的其它 處理框架(如Storm),RDD數據集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數據處理的邏輯和算法。方便了一些需 要歷史數據和實時數據聯合分析的特定應用場合。
- Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank算法。
Spark 的適用場景
- Spark是基于內存的迭代計算框架,適用于需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小
- 由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
- 總的來說Spark的適用面比較廣泛且比較通用。
在業界的使用
Spark項目在2009年啟動,2010年開源, 現在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。
參考資料: http://spark.apache.org/