Apache Spark:大數據處理統一引擎
工業和研究中數據的大幅增長為計算機科學帶來了巨大的機會與挑戰。由于數據大小超過了單臺機器的能力,用戶需要新的系統將計算擴展到多個節點。因此,針對不同計算工作負載的新集群編程模型已呈爆炸式增長。
這些模型相對專業化。例如支持批處理的MapReduce,支持迭代圖算法的Dreme。在開源Apache Hadoop堆棧中,類似Storm和Impala的系統也是特有的。即使在關系數據庫世界中,“一刀切”系統已越來越少。然而,很多大數據應用需要整合許多不同的處理類型。大數據,顧名思義,代表了數據的多樣性和復雜性。一個典型的管道需要類似MapReduce的系統進行數據載入,使用類似SQL的語言進行查詢。用戶不得不將不同的系統整合在一起,并且有時候引擎也不能對應用的需求都滿足。
有鑒于此,2009年加州大學伯克利分校團隊開始了Apache Spark項目,旨在為分布式數據處理設計一個統一的引擎。 Spark具有類似于MapReduce的編程模型,但是使用稱為“彈性分布式數據集”或RDDs的數據共享抽象擴展。通過這個簡單的擴展,Spark可以輕松應對之前需要單獨引擎處理的高強度工作,包括SQL、流式傳輸、機器學習和圖形處理。Spark使用與專用引擎相同的優化(例如面向列的處理和增量更新),并實現相同的性能,但是編寫更為高效。
Spark的通用性有幾個重要的好處。
首先,應用程序更容易開發,因為它們使用統一的API。
第二,結合處理任務更有效;而先前的系統需要將數據寫入存儲以將其傳遞給另一個引擎,Spark可以在相同的數據(通常在存儲器中)上運行不同的功能。
最后,Spark啟用了以前系統無法實現的新應用程序(如圖形上的交互式查詢和流式計算機學習)。自2010年發布以來,Spark已經發展成為最活躍的開源項目或大數據處理,擁有超過1,000名貢獻者。該項目已在超過1,000個組織中使用,從技術公司到銀行、零售、生物技術和天文學。
隨著并行數據處理變得普遍,處理功能的可組合性將是對可用性和性能的最重要關注之一。許多數據分析是探索性的,用戶希望將庫函數快速組合成一個工作管道。然而,對于“大數據”,特別是在不同系統之間復制數據是對性能不利的。因此,用戶需要共性和可組合的抽象。在本文中,我們將介紹Spark編程模型并解釋為什么它是高度通用的。我們還討論了如何利用這種通用性來構建其它處理任務。最后,我們總結了Spark中常見的應用程序。
編程模型
Spark中的關鍵編程抽象是RDD,它是容錯集合,可以并行處理集群中的對象。用戶通過“轉換”(例如map、filter和groupBy)操作來創建RDD。
Spark通過Scala、Java、Python和R中的函數式編程API來表達RDD,用戶可以簡單地在集群上運行本地函數。 例如,以下Scala代碼通過搜索以ERROR開頭的行來創建日志文件中錯誤消息的RDD,然后打印總錯誤數:
lines = spark.textFile("hdfs://...")
errors = lines.filter(s => s.startsWith("ERROR"))
println("Total errors: "+errors.count())
第一行定義了一個在HDFS上的文本行集合RDD。第二行調用過濾器轉換以從行中導出新的RDD。它的參數是一個Scala函數文字或閉包。最后一行調用count函數。另一種類型的RDD操作稱為“動作”,返回一個結果給程序(這里,RDD中的元素數量),而不是定義一個新的RDD。
Spark評估RDDs延遲,嘗試為用戶運算找到一個有效的計劃。特別的是,變換返回表示計算結果的新RDD對象,但不立即計算它。當一個動作被調用時,Spark查看整個用于創建執行計劃的轉換的圖。例如,如果一行中有多個過濾器或映射操作,Spark可以將它們融合到一個傳遞中,或者如果知道數據是被分區的,它可以避免通過網絡為groupBy進行數據傳遞。因此用戶可以實現程序模塊化,而不會造成性能低下。
最后,RDDs為計算之間的數據共享提供了明確的支持。默認情況下,RDD是“短暫的”,因為它們每次在動作(例如count)使用時被重新計算。但是,用戶還可以將所選的RDD保留在內存中或快速重用。(如果數據不適合內存,Spark還會將其溢出到磁盤。)例如,用戶在HDFS中搜索大量日志數據集來進行錯誤調試時,可以通過調用以下函數來載入不同集群的錯誤信息到內存中:
errors.persist()
隨后,用戶可以在該內存中數據上運行不同的查詢:
// Count errors mentioning MySQL
errors.filter(s => s.contains("MySQL")).count()
// Fetch back the time fields of errors that
// mention PHP, assuming time is field #3:
errors.filter(s => s.contains("PHP")).map(line => line.split('t')(3)).collect()
這種數據共享是Spark和以前的計算模型(如MapReduce)之間的主要區別。
容錯
除了提供數據共享和各種并行操作,RDDs還可以自動從故障中恢復。 傳統上,分布式計算系統通過數據復制或檢查點提供了容錯。 Spark使用一種稱為“lineage”的新方法。每個RDD跟蹤用于構建它的轉換圖,并對基本數據重新運行這些操作,以重建任何丟失的分區。
例如,圖2顯示了我們以前的查詢中的RDD,其中我們通過應用兩個過濾器和一個映射來獲取錯誤的時間字段。 如果RDD的任何分區丟失(例如保存內存分區的錯誤的節點失敗),Spark將通過在HDFS文件的相應塊上的應用過濾器來重建它。 對于將數據從所有節點發送到所有其他節點(例如reduceByKey)的“shuffle”操作,發送方在本地保留其輸出數據,以防接收器出現錯誤。
基于沿襲的恢復比數據密集型工作負載中的復制效率高得多。 它節省了時間,因為寫入RAM要比通過網絡寫入數據快。 恢復通常比簡單地重新運行程序快得多,因為故障節點通常包含多個RDD分區,這些分區可以在其他節點上并行重建。
另外一個復雜些的例子如圖3:
圖3
Spark中邏輯回歸的實現。 它使用批量梯度下降,一個簡單的迭代算法,重復計算數據上的梯度函數作為并行求和。 Spark可以方便地將數據加載到RAM中,并運行多個求和。 因此,它運行速度比傳統的MapReduce快。 例如,在100GB作業中(如圖4),MapReduce每次迭代需要110秒,因為每次迭代需從磁盤加載數據,而Spark在第一次加載后每次迭代只需要一秒。
圖4
與存儲系統的整合
與Google的MapReduce非常相似,Spark旨在與多個外部系統一起使用持久存儲。Spark最常用于集群文件系統,如HDFS和鍵值存儲,如S3和Cassandra。 它還可以作為數據目錄與Apache Hive連接。 RDD通常僅在應用程序中存儲臨時數據,但某些應用程序(例如Spark SQL JDBC服務器)也在多個用戶之間共享RDD。Spark作為存儲系統無關引擎的設計,使用戶可以輕松地對現有數據進行運算和連接各種數據源。
高級庫
RDD編程模型僅提供對象的分布式集合和在其上運行的函數。除此之外,我們在Spark上構建了各種針對專用計算引擎更高級的庫。其關鍵思想是,如果我們控制存儲在RDD中的數據結構,跨節點的數據分區以及在其上運行的函數,我們可以在其他引擎中實現許多執行技術。事實上,正如我們在本節中所展示的,這些庫通常在每個任務上實現最先進的性能,同時在用戶組合使用它們時提供顯著的優勢。我們現在討論Apache Spark包含的四個主要庫。
SQL和DataFrames。最常見的數據處理范例之一是關系查詢。 Spark SQL及其前身Shark使用類似于分析數據庫的技術在Spark上實現這樣的查詢。例如,這些系統支持列式存儲,基于成本的優化和用于查詢執行的代碼生成。這些系統的主要思想是使用與分析數據庫相同的數據布局 – 壓縮的柱狀存儲 – 內部RDD。在Spark SQL中,RDD中的每個記錄都保存為以二進制格式存儲的一系列行,并且系統生成直接針對此布局運行的代碼。
除了運行SQL查詢之外,我們還使用Spark SQL引擎為稱為DataFrames的基本數據變換提供了更高級的抽象,這些變換是具有已知模式的記錄的RDD。 DataFrames是R和Python中的表格數據的常見抽象,具有用于過濾,計算新列和聚合的編程方法。在Spark中,這些操作映射到Spark SQL引擎并接收其所有優化。我們稍后討論DataFrames。
Spark SQL中尚未實現的一種技術是索引,盡管Spark上的其他庫(如IndexedRDDs)確實使用它。
Spark Streaming(流)。 Spark Streaming使用稱為“離散流”的模型實現增量流處理。為了通過Spark實現流式傳輸,我們將輸入數據分成小批量(例如每200毫秒),我們定期與RDD中存儲的狀態組合以產生新結果。以這種方式運行流計算比傳統的分布式流系統有幾個好處。例如,由于使用沿襲,故障恢復更便宜,并且可以將流與批處理和交互式查詢組合。
GraphX。 GraphX提供了類似于Pregel和GraphLab的圖形計算接口,1通過為其構建的RDD選擇分區函數來實現與這些系統相同的布局優化(例如頂點分區方案)。
MLlib。 MLlib,Spark的機器學習庫,實現了50多種常見的分布式模型訓練算法。例如,它包括決策樹(PLANET),Latent Dirichlet分布和交替最小二乘矩陣分解的常見分布式算法。
組合處理任務。 Spark的庫都對RDD進行操作,作為數據抽象,使得它們在應用程序中易于組合。例如,圖5顯示了一個程序,它使用Spark SQL讀取一些歷史推ter數據,使用MLlib訓練一個K-means聚類模型,然后將該模型應用于一個新的tweet流。每個庫返回的數據任務(這里是歷史性的tweet RDD和K-means模型)很容易傳遞給其他庫。
除了API級別的兼容性,Spark中的組合在執行級別也是高效的,因為Spark可以跨處理庫進行優化。例如,如果一個庫運行映射函數,并且下一個庫對其結果運行映射,則Spark將這些操作融合到單個映射中。同樣,Spark的故障恢復在這些庫中無縫地工作,重新計算丟失的數據,無論哪個庫產生它。
圖5
性能
假設這些庫運行在同一引擎上,它們是否會失去性能?我們發現,通過實現我們剛剛在RDD中概述的優化,我們通常可以匹配專用引擎的性能。例如,圖6比較了Spark對三個簡單任務(SQL查詢,流字計數和交替最小二乘矩陣分解)與其他引擎的性能。雖然結果隨著工作負載的不同而不同,但Spark通常與Storm,GraphLab和Impala等專用系統相當。對于流處理,雖然我們顯示了Storm上分布式實現的結果,但是每個節點的吞吐量也可以與商業流引擎如Oracle CEP相媲美。
圖6
即使在高度競爭的基準測試中,我們也使用Apache Spark實現了最先進的性能。在2014年,我們進入了Daytona Gray-Sort基準(http://sortbenchmark.org/),涉及在磁盤上排序100TB的數據,并綁定一個專門的系統構建的新記錄,僅用于在類似數量的機器上排序。與其他示例一樣,這是可能的,因為我們可以實現RDD模型中大規模排序所需的通信和CPU優化。
應用
Apache Spark用于廣泛的應用程序。我們對Spark用戶的調查發現了超過1,000家使用Spark的公司,從Web服務,生物技術到金融等領域。在學術界,我們也看到了幾個科學領域的應用。在這些工作負載中,我們發現用戶利用Spark的通用性,并且通常組合其多個庫。在這里,我們介紹幾個頂級用例。許多用例的演示文稿也可在Spark Summit會議網站( http://www.spark-summit.org )上獲取。
批量處理
Spark最常用的應用程序是對大型數據集進行批處理,包括Extract-Transform-Load工作負載,將數據從原始格式(如日志文件)轉換為更加結構化的格式,并離線訓練機器學習模型。這些工作負載的已發布示例包括Yahoo!的頁面個性化和推薦;管理高盛的數據湖;阿里巴巴圖表挖掘;金融價值風險計算;和豐田的客戶反饋的文本挖掘。我們知道的最大的已發布的用例是在中國社交網絡騰訊的8000節點集群,每天攝取1PB的數據。
雖然Spark可以在內存中處理數據,但是此類別中的許多應用程序只能在磁盤上運行。在這種情況下,Spark相對于MapReduce仍然可以提高性能,因為它支持更復雜的運算符圖。
交互式查詢
互動使用Spark分為三個主要類別。首先,組織通常通過商業智能工具(如Tableau)使用Spark SQL進行關系查詢。例子包括eBay和百度。第二,開發人員和數據科學家可以通過shell或可視化筆記本環境以交互方式使用Spark的Scala,Python和R接口。這種交互式使用對于提出更高級的問題和設計最終導致生產應用程序的模型至關重要,并且在所有部署中都很常見。第三,一些供應商已經開發了在Spark上運行的特定領域的交互式應用程序。示例包括Tresata(反洗錢),Trifacta(數據清理)和PanTera(大規模可視化,如圖7所示)。
圖7
流處理
實時處理也是一種流行的用例,無論是在分析和實時決策應用程序中。 Spark Streaming的已發布使用案例包括思科的網絡安全監控,三星SDS的規范分析以及Netflix的日志挖掘。許多這些應用程序還將流式處理與批處理和交互式查詢相結合。例如,視頻公司Conviva使用Spark持續維護內容分發服務器性能的模型,在跨服務器移動客戶端時自動查詢,在需要對模型維護和查詢進行大量并行工作的應用程序中。
科學應用
Spark還被用于幾個科學領域,包括大規模垃圾郵件檢測,圖像處理,和基因組數據處理。結合批量,交互和流處理的一個例子是Howard Hughes醫學院的Thunder平臺神經科學,Janelia Farm。它被設計成處理來自實驗的腦成像數據,實時地,從生物體(例如斑馬魚和小鼠)擴大到1TB /小時的全腦成像數據。使用Thunder,研究人員可以應用機器學習算法(例如聚類和主成分分析)來識別涉及特定行為的神經元。相同的代碼可以在批處理作業中對來自先前運行的數據或在活動實驗期間的交互式查詢中運行。圖8顯示了使用Spark生成的示例圖像。
圖8
使用的Spark組件
因為Spark是一個統一的數據處理引擎,自然的問題是它的圖書館組織實際使用了多少。我們對Spark用戶的調查表明,組織確實使用多個組件,超過60%的組織使用至少三個Spark的API。圖9概述了Databricks 2015年7月Spark調查中每個組件的使用情況,達到1400名受訪者。我們將Spark Core API(只是RDD)列為一個組件,將更高級別的庫列為其他組件。
我們看到許多組件被廣泛使用,Spark Core和SQL最受歡迎。 Streaming在46%的組織中使用,機器學習在54%中使用。雖然在圖9中未直接示出,但大多數組織使用多個組件; 88%使用其中至少兩個,60%使用至少三個(如Spark Core和兩個庫),27%使用至少四個組件。
圖9
部署環境
我們也看到Apache Spark應用程序運行的地方和它們連接到的數據源的多樣性。雖然第一個Spark部署通常在Hadoop環境中,在2015年7月Spark調查中,僅有40%的部署在Hadoop YARN集群管理器上。此外,52%的受訪者在公共云上運行Spark。
Spark模型的魅力
雖然Apache Spark演示了統一的集群編程模型是可行和有用的,但是了解集群編程模型的廣泛性成因以及Spark的局限性是很有好處的。在這里,我們總結了一個關于Zaharia RDDs的一般性的討論。我們從兩個角度研究RDDs。首先,從能力的角度,我們認為RDDs可以模擬任何分布式計算,并且在多數情況下表現優異,除非計算對網絡延遲敏感。第二,從系統的角度來看,RDD能幫助應用程序對集群中最常見瓶頸的資源進行控制 – 網絡和存儲I/O,從而使得這些資源得到優化。
表達性角度。為了研究RDDs的表達性,我們首先比較RDDs和MapReduce模型。第一個問題是MapReduce本身表達性的計算是什么?雖然有關于MapReduce的限制的許多討論,這里令人驚訝的是MapReduce可以模擬任何分布式計算。
要看到這一點,請注意任何分布式計算由執行本地計算和偶爾交換消息的節點組成。 MapReduce提供了映射操作,允許本地計算和reduce,這允許全部通信。因此,可以通過將其工作分解為時間步長,運行Map以在每個時間步長中執行本地計算,以及在每個步驟結束時使用reduce來批處理和交換消息,來模擬任何分布式計算,盡管效率并不高。一系列MapReduce步驟將捕獲整個結果,如圖10所示。
圖10
雖然這一行的工作表明MapReduce可以模擬任意計算,但又兩個問題會使這種模擬背后的“常數因子”高。首先,MapReduce在跨時間段共享數據方面效率低下,因為它依賴于復制的外部存儲系統來實現此目的。由于需在每個步驟之后寫出其狀態,系統運行將較慢。其次,MapReduce步驟的延遲決定了我們的仿真與真實網絡的匹配程度,大多數Map-Reduce實現是針對具有幾分鐘到幾小時延遲的批處理環境設計的。
RDDs和Spark解決了這兩個問題。在數據共享方面,RDD通過避免中間數據的復制來快速進行數據共享,并且可以緊密模擬在由長時間運行的進程組成的系統中發生的內存中“數據共享”。在延遲方面,Spark可以在大型集群上以100ms延遲運行MapReduce類似的步驟。雖然一些應用程序需要更細粒度的時間步長和通信,但是這100ms的延遲足以處理許多數據密集型工作負載,在通信步驟之前可以大批量進行計算。
總之,RDDs建立在Map-Reduce模擬任何分布式計算的能力之上,但更有效率。它們的主要限制是由于每個通信步驟中的同步而增加的等待時間,但是該等待時間的損失與所得相比是可以忽略的。
系統觀點。獨立于表征Spark的通用性的仿真方法,我們可以采用系統方法。集群計算中的瓶頸資源是什么? RDD可以有效地使用它們嗎?雖然集群應用程序是多樣的,但它們都受底層硬件的相同屬性的約束。當前數據中心具有陡峭的存儲層次結構,以相似的方式限制大多數應用。例如,典型的Hadoop集群可能具有以下特性:
本地存儲。每個節點具有本地存儲器,大約50GB/s的帶寬,以及10到20個本地磁盤,大約1GB/s到2GB/ s的磁盤帶寬。
鏈接。每個節點具有10Gbps(1.3GB/s)鏈路,或者比其存儲器帶寬小約40x,并且比其總的磁盤帶寬小2倍。
機架。節點被組織成20到40臺機器的機架,每個機架的帶寬為40Gbps-80Gbps,或者機架內網絡性能的2-5倍。
給定這些屬性,在許多應用中最重要的性能問題是在網絡中放置數據和計算。幸運的是,RDD提供了控制這種放置的設施;該接口允許應用程序在輸入數據附近放置計算(通過用于輸入源25的“優選位置”的API),并且RDD提供對數據分區和共置(例如指定數據被給定密鑰散列)的控制。因此,庫(例如GraphX)可以實現在專門系統中使用的相同的布置策略。
除了網絡和I / O帶寬,最常見的瓶頸往往是CPU時間,特別是如果數據在內存中。在這種情況下,Spark可以運行在每個節點上的專用系統中使用的相同的算法和庫。例如,它使用Spark SQL中的列存儲和處理,MLlib中的本機BLAS庫等。正如我們之前討論的,RDD明顯增加成本的唯一區域是網絡延遲。
從系統角度來看的最后一個觀點是,由于容錯,Spark可能會對當今某些專用系統產生額外的成本。例如,在Spark中,每個shuffle操作中的map任務將它們的輸出保存到它們運行的機器上的本地文件,因此reduce任務可以稍后重新獲取。此外,Spark在shuffle階段實現了一個障礙,所以reduce任務不會開始,直到所有的Map已經完成。這避免了故障恢復所需的一些復雜性,如果一個“推”直接從映射記錄以流水線方式減少。雖然刪除一些這些功能將加快系統。但默認情況下,我們在Spark中會保持開啟容錯,以便于對應用程序進行容錯處理。
不斷探索
Apache Spark仍然是一個快速發展的項目。自2013年6月以來,代碼庫規模增長了6倍。擁有超過200個第三方可用軟件包。在研究社區,Berkeley,MIT和Stanford的多個項目基于Spark,許多新的庫(如GraphX和Spark Streaming)來自研究小組。在這里,我們簡述四個主要的成果。
DataFrames和更多的聲明性API。核心Spark API基于對包含任意類型的Scala,Java或Python對象的分布式集合的函數式編程。雖然這種方法非常具有表現力,但也使程序更難以自動分析和優化。存儲在RDD中的Scala/ Java/Python對象可能具有復雜的結構,運行它們的函數可能包括任意代碼。在許多應用程序中,如果開發人員沒有使用正確的運算符,他們可能會得到不理想的性能;例如,系統本身不能在Map之前推送過濾器功能。
為了解決這個問題,我們在2015年擴展了Spark,以便根據關系代數添加一個名為DataFrames的更具聲明性的API。數據幀是Python和R中表格數據的通用API。數據幀是一組具有已知模式的記錄,基本上等同于數據庫表,支持使用受限“表達式”API進行過濾和聚合等操作。然而,與在SQL語言中工作不同,數據幀操作被調用作為更通用的編程語言(例如Python和R)中的函數調用,允許開發人員使用主語言中的抽象(例如函數和類)。圖11和圖12顯示了API的示例。
圖12
Spark的DataFrames提供了類似于單節點程序包的API,但是使用Spark SQL的查詢計劃程序自動并行化和優化計算。用戶代碼因此接收在Spark的功能API下不可用的優化(例如謂詞下推,運算符重新排序和連接算法選擇)。據我們所知,Spark DataFrames是第一個在數據框架API.d下執行這種關系優化的庫。
雖然DataFrames仍然是新的,但是這不妨礙它的流行。在我們2015年7月的調查中,60%的受訪者報告使用它們。由于DataFrames的成功,我們還開發了一個名為Datasetse的類型安全接口,讓Java和Scala程序員將DataFrames視為Java對象的靜態類型集合,類似于RDD API,并仍然接收關系優化。我們期望這些API逐漸成為在Spark庫之間傳遞數據的標準抽象。
性能優化。Spark最近的許多工作都是在性能上。在2014年,Databricks團隊花費了大量的精力來優化Spark的網絡和I/O元操作,在 Daytona GraySort挑戰中成功打破賽事記錄。挑戰是項目是對100TB數據進行排序,Spark的成績較前冠軍快了3倍,但是僅需1/10的設備。這個基準測試不是在內存中執行,而是在(固態)磁盤上執行的。
R語言支持。 SparkR project在2015年被合并到Spark中,在R中提供了一個編程接口。R接口基于DataFrames,使用與R的內置數據框架幾乎完全相同的語法。其他Spark庫(如MLlib)也很容易從R中調用,因為它們接受DataFrames的輸入。
庫的研究。 Apache Spark繼續努力于構建更高級別的數據處理庫。最近的項目包括Thunder神經科學,ADAM基因組學以及Kira天文學圖像處理。其他研究庫(如GraphX)已被合并到主要代碼庫。
小結
可擴展數據處理對于下一代計算機應用是必不可少的,但通常涉及不同的計算系統。為了簡化這個任務,Spark項目為大數據應用程序引入了統一的編程模型和引擎。實踐證明,這樣的模型可以有效地支持當前的工作負荷,并為用戶帶來實質性的好處。希望Apache Spark能增強在大數據編程庫中的可組合性,并開發更易于用戶使用的庫。
本文中描述的所有Apache Spark庫都是開源的,可通過 http://spark.apache.org/ 查看。 Databricks還制作了所有Spark峰會的視頻,可在 https://spark-summit.org/ 中免費獲得。
來自:http://blog.jobbole.com/108564/