Spark:比Hadoop更強大的分布式數據計算項目

jopen 10年前發布 | 46K 次閱讀 Spark 分布式/云計算/大數據

Spark是一個由加州大學伯克利分校(UC Berkeley AMP)開發的一個分布式數據快速分析項目。它的核心技術是彈性分布式數據集(Resilient distributed datasets),提供了比Hadoop更加豐富的MapReduce模型,可以快速在內存中對數據集進行多次迭代,來支持復雜的數據挖掘算法和圖計算 算法。

Spark使用Scala開發,使用Mesos作為底層的調度框架,可以和hadoop和Ec2緊密集成,直接讀取hdfs或S3的文件進行計算并把結果寫回hdfs或S3,是Hadoop和Amazon云計算生態圈的一部分。Spark是一個小巧玲瓏的項目,項目的core部分的代碼只有63個Scala文件,充分體現了精簡之美。

Spark:比Hadoop更強大的分布式數據計算項目

Spark之依賴

  • Map Reduce模型:作為一個分布式計算框架,Spark采用了MapReduce模型。在它身上,Google的Map Reduce和Hadoop的痕跡很重,很明顯,它并非一個大的創新,而是微創新。在基礎理念不變的前提下,它借鑒,模仿并依賴了先輩,加入了一點改進, 極大的提升了MapReduce的效率。
  • 函數式編程:Spark由Scala寫就,而支持的語言亦是Scala。其原因之一就是Scala支持函數式編程。這一來造就了Spark的代碼 簡潔,二來使得基于Spark開發的程序,也特別的簡潔。一次完整的MapReduce,Hadoop中需要創建一個Mapper類和Reduce類,而 Spark只需要創建相應的一個map函數和reduce函數即可,代碼量大大降低。
  • Mesos:Spark將分布式運行的需要考慮的事情,都交給了Mesos,自己不Care,這也是它代碼能夠精簡的原因之一。
  • HDFS和S3:Spark支持2種分布式存儲系統:HDFS和S3。應該算是目前最主流的兩種了。對文件系統的讀取和寫入功能是Spark自己提供的,借助Mesos分布式實現。

Spark與Hadoop的對比

  • Spark的中間數據放到內存中,對于迭代運算效率更高。Spark更適合于迭代運算比較多的ML和DM運算。因為在Spark里面,有RDD的抽象概念。
  • Spark比Hadoop更通用。
    • Spark提供的數據集操作類型有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如 map,filter,flatMap,sample,groupByKey,reduceByKey,union,join,cogroup,mapValues,sort,partionBy 等多種操作類型,Spark把這些操作稱為Transformations。同時還提供 Count,collect,reduce,lookup,save等多種actions操作。
    • 這些多種多樣的數據集操作類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結果的存儲、分區等。可以說編程模型比Hadoop更靈活。
    • 不過由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
  • 容錯性。在分布式數據集計算時通過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現容錯。
  • 可用性。Spark通過提供豐富的Scala, Java,Python API及交互式Shell來提高可用性。

Spark與Hadoop的結合

Spark可以直接對HDFS進行數據的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運行于同集群中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive完全兼容。

Spark的核心概念

1、Resilient Distributed Dataset (RDD)彈性分布數據集

RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心 的東西,它表示已被分區,不可變的并能夠被并行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到 內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對 于迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升比較大。

RDD的特點:

  • 它是在集群節點上的不可變的、已分區的集合對象。
  • 通過并行轉換的方式來創建如(map, filter, join, etc)。
  • 失敗自動重建。
  • 可以控制存儲級別(內存、磁盤等)來進行重用。
  • 必須是可序列化的。
  • 是靜態類型的。

RDD的好處:

  • RDD只能從持久存儲或通過Transformations操作產生,相比于分布式共享內存(DSM)可以更高效實現容錯,對于丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
  • RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。
  • RDD的數據分區特性,可以通過數據的本地性來提高性能,這與Hadoop MapReduce是一樣的。
  • RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現在的MapReduce。

RDD的存儲與分區:

  • 用戶可以選擇不同的存儲級別存儲RDD以便重用。
  • 當前RDD默認是存儲于內存,但當內存不足時,RDD會spill到disk。
  • RDD在需要進行分區把數據分布于集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。

RDD的內部表示:

  • 分區列表(數據塊列表)
  • 計算每個分片的函數(根據父RDD計算出此RDD)
  • 對父RDD的依賴列表
  • 對key-value RDD的Partitioner【可選】
  • 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)【可選】

RDD的存儲級別:RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別。RDD定義了各種操作,不同類型的數據由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現。

RDD有兩種創建方式:

  • 從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)創建。
  • 從父RDD轉換得到新RDD。

2、Spark On Mesos

Spark支持Local調用和Mesos集群兩種模式,在Spark上開發算法程序,可以在本地模式調試成功后,直接改用Mesos集群運行,除 了文件的保存位置需要考慮以外,算法理論上不需要做任何修改。Spark的本地模式支持多線程,有一定的單機并發處理能力。但是不算很強勁。本地模式可以 保存結果在本地或者分布式文件系統,而Mesos模式一定需要保存在分布式或者共享文件系統。

為了在Mesos框架上運行,安裝Mesos的規范和設計,Spark實現兩個類,一個是SparkScheduler,在Spark中類名是 MesosScheduler;一個是SparkExecutor,在Spark中類名是Executor。有了這兩個類,Spark就可以通過 Mesos進行分布式的計算。Spark會將RDD和MapReduce函數,進行一次轉換,變成標準的Job和一系列的Task。提交給 SparkScheduler,SparkScheduler會把Task提交給Mesos Master,由Master分配給不同的Slave,最終由Slave中的Spark Executor,將分配到的Task一一執行,并且返回,組成新的RDD,或者直接寫入到分布式文件系統。

Spark:比Hadoop更強大的分布式數據計算項目

3、Transformations & Actions

對于RDD可以有兩種計算方式:轉換(返回值還是一個RDD)與操作(返回值不是一個RDD)。

  • 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到 Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
  • 操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。

它們本質區別是:Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個 RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個 值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。關于這兩個動作,在Spark開發指南中會有就進一步的詳細介紹,它們是基于 Spark開發的核心。這里將Spark的官方ppt中的一張圖略作改造,闡明一下兩種動作的區別。

Spark:比Hadoop更強大的分布式數據計算項目

4、Lineage(血統)

利用內存加快數據加載,在眾多的其它的In-Memory類數據庫或Cache類系統中也有實現,Spark的主要區別在于它處理分布式運算環境下 的數據容錯性(節點實效/數據丟失)問題時采用的方案。為了保證RDD中數據的魯棒性,RDD數據集通過所謂的血統關系(Lineage)記住了它是如何 從其它RDD中演變過來的。相比其它系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換 (Transformation)操作(filter, map, join etc.)行為。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限 制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提升。

RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決數據容錯的高效性。

  • Narrow Dependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。
  • Wide Dependencies是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與 Wide Dependencies,這種計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點宕機時,通過重新計算,這種情況下,這 種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對于數據的重算開銷要遠小于Wide Dependencies的數據重算開銷。

在RDD計算,通過checkpint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現容錯,默認是logging the updates方式,通過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每個RDD的lineage(血統)來重新計算生成 丟失的分區數據。

Spark的Shuffle過程介紹

1.Shuffle Writer

Spark豐富了任務類型,有些任務之間數據流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞數據,比如wide dependency的group by key。

Spark中需要Shuffle輸出的Map任務會為每個Reduce創建對應的bucket,Map產生的結果會根據設置的 partitioner得到對應的bucketId,然后填充到相應的bucket中去。每個Map的輸出結果可能包含所有的Reduce所需要的數據, 所以每個Map會創建R個bucket(R是reduce的個數),M個Map總共會創建M*R個bucket。

Map創建的bucket其實對應磁盤上的一個文件,Map的結果寫到每個bucket中其實就是寫到那個磁盤文件中,這個文件也被稱為 blockFile,是Disk Block Manager管理器通過文件名的Hash值對應到本地目錄的子目錄中創建的。每個Map要在節點上創建R個磁盤文件用于結果輸出,Map的結果是直接輸 出到磁盤文件上的,100KB的內存緩沖是用來創建Fast Buffered OutputStream輸出流。這種方式一個問題就是Shuffle文件過多。

Spark:比Hadoop更強大的分布式數據計算項目

針對上述Shuffle過程產生的文件過多問題,Spark有另外一種改進的Shuffle過程:consolidation Shuffle,以期顯著減少Shuffle文件的數量。在consolidation Shuffle中每個bucket并非對應一個文件,而是對應文件中的一個segment部分。Job的map在某個節點上第一次執行,為每個 reduce創建bucket對應的輸出文件,把這些文件組織成ShuffleFileGroup,當這次map執行完之后,這個 ShuffleFileGroup可以釋放為下次循環利用;當又有map在這個節點上執行時,不需要創建新的bucket文件,而是在上次的 ShuffleFileGroup中取得已經創建的文件繼續追加寫一個segment;當前次map還沒執行完,ShuffleFileGroup還沒有 釋放,這時如果有新的map在這個節點上執行,無法循環利用這個ShuffleFileGroup,而是只能創建新的bucket文件組成新的 ShuffleFileGroup來寫輸出。

Spark:比Hadoop更強大的分布式數據計算項目

比如一個Job有3個Map和2個reduce:(1) 如果此時集群有3個節點有空槽,每個節點空閑了一個core,則3個Map會調度到這3個節點上執行,每個Map都會創建2個Shuffle文件,總共創 建6個Shuffle文件;(2) 如果此時集群有2個節點有空槽,每個節點空閑了一個core,則2個Map先調度到這2個節點上執行,每個Map都會創建2個Shuffle文件,然后其 中一個節點執行完Map之后又調度執行另一個Map,則這個Map不會創建新的Shuffle文件,而是把結果輸出追加到之前Map創建的Shuffle 文件中;總共創建4個Shuffle文件;(3) 如果此時集群有2個節點有空槽,一個節點有2個空core一個節點有1個空core,則一個節點調度2個Map一個節點調度1個Map,調度2個Map的 節點上,一個Map創建了Shuffle文件,后面的Map還是會創建新的Shuffle文件,因為上一個Map還正在寫,它創建的 ShuffleFileGroup還沒有釋放;總共創建6個Shuffle文件。

2.Shuffle Fetcher

Reduce去拖Map的輸出數據,Spark提供了兩套不同的拉取數據框架:通過socket連接去取數據;使用netty框架去取數據。

每個節點的Executor會創建一個BlockManager,其中會創建一個BlockManagerWorker用于響應請求。當 Reduce的GET_BLOCK的請求過來時,讀取本地文件將這個blockId的數據返回給Reduce。如果使用的是Netty框 架,BlockManager會創建ShuffleSender用于發送Shuffle數據。并不是所有的數據都是通過網絡讀取,對于在本節點的Map數 據,Reduce直接去磁盤上讀取而不再通過網絡框架。

Reduce拖過來數據之后以什么方式存儲呢?Spark Map輸出的數據沒有經過排序,Spark Shuffle過來的數據也不會進行排序,Spark認為Shuffle過程中的排序不是必須的,并不是所有類型的Reduce需要的數據都需要排序,強 制地進行排序只會增加Shuffle的負擔。Reduce拖過來的數據會放在一個HashMap中,HashMap中存儲的也是對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark將 Shuffle取過來的每一個對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在內存中。

Shuffle取過來的數據全部存放在內存中,對于數據量比較小或者已經在Map端做過合并處理的Shuffle數據,占用內存空間不會太大,但是 對于比如group by key這樣的操作,Reduce需要得到key對應的所有value,并將這些value組一個數組放在內存中,這樣當數據量較大時,就需要較多內存。

當內存不夠時,要不就失敗,要不就用老辦法把內存中的數據移到磁盤上放著。Spark意識到在處理數據規模遠遠大于內存空間時所帶來的不足,引入了 一個具有外部排序的方案。Shuffle過來的數據先放在內存中,當內存中存儲的對超過1000并且內存使用超過70%時,判斷節點上可用內存如果還足夠,則把內存緩沖區大小翻倍,如果可用內存不再夠了,則把內存中 的對排序然后寫到磁盤文件中。最后把內存緩沖區中的數據排序之后和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數據,這個和 MapReduce中的merge過程類似。

3.MapReduce和Spark的Shuffle過程對比

Spark:比Hadoop更強大的分布式數據計算項目

Spark的資源管理與作業調度

Spark對于資源管理與作業調度可以使用本地模式,Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。Spark on Yarn在Spark0.6時引用,但真正可用是在現在的branch-0.8版本。Spark on Yarn遵循YARN的官方規范實現,得益于Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就非常容 易,Spark on Yarn的大致框架圖。

Spark:比Hadoop更強大的分布式數據計算項目

讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。

編程接口

Spark通過與編程語言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個數據集都表示為RDD對象,對數據集的 操作就表示成對RDD對象的操作。Spark主要的編程語言是Scala,選擇Scala是因為它的簡潔性(Scala可以很方便在交互式下使用)和性能 (JVM上的靜態強類型語言)。

Spark和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節點)組成。 用戶編寫的Spark程序被稱為Driver程序,Dirver程序會連接master并定義了對各RDD的轉換與操作,而對RDD的轉換與操作通過 Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操作發送到各Workers節點。 Workers存儲著數據分塊和享有集群內存,是運行在工作節點上的守護進程,當它收到對RDD的操作時,根據數據分片信息進行本地化數據操作,生成新的 數據分片、返回結果或把RDD寫入存儲系統。

Spark:比Hadoop更強大的分布式數據計算項目

  • Scala:Spark使用Scala開發,默認使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創 建或使用(SparkContext)實例,使用SparkContext創建RDD,然后就是對RDD進行操作。
  • Java:Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,Scala與Java可以互操作,Java編程接口其實就是對Scala的封裝。如:
  • Python:現在Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操作,從而實現使用 python編寫Spark程序。Spark也同樣提供了pyspark,一個Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。

Spark生態系統

  • Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive一樣的H iveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過配置Shark參數,Shark可以自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark 通過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一起,最大化RDD的重復使用。
  • Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部 分數據。Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用于實時計算,另一方面相比基于Record的其它 處理框架(如Storm),RDD數據集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數據處理的邏輯和算法。方便了一些需 要歷史數據和實時數據聯合分析的特定應用場合。
  • Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank算法。

Spark的適用場景

  • Spark是基于內存的迭代計算框架,適用于需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小
  • 由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
  • 總的來說Spark的適用面比較廣泛且比較通用。

在業界的使用

Spark項目在2009年啟動,2010年開源, 現在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。

參考資料:http://spark.apache.org/

引用地址:http://www.biaodianfu.com/spark.html
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!