Spark在騰訊數據倉庫TDW的應用
為了滿足挖掘分析與交互式實時查詢的計算需求,騰訊大數據使用了Spark平臺來支持挖掘分析類計算、交互式實時查詢計算以及允許誤差范圍的快速查 詢計算,目前騰訊大數據擁有超過200臺的Spark集群,并獨立維護Spark和Shark分支。Spark集群已穩定運行2年,積累了大量的案例和運 營經驗能力,另外多個業務的大數據查詢與分析應用,已在陸續上線并穩定運行。在SQL查詢性能方面普遍比MapReduce高出2倍以上,利用內存計算和 內存表的特性,性能至少在10倍以上。在迭代計算與挖掘分析方面,精準推薦將小時和天級別的模型訓練轉變為Spark的分鐘級別的訓練,同時簡潔的編程接 口使得算法實現比MR在時間成本和代碼量上高出許多。
Spark VS MapReduce
盡管MapReduce適用大多數批處理工作,并且在大數據時代成為企業大數據處理的首選技術,但由于以下幾個限制,它對一些場景并不是最優選擇:
- 缺少對迭代計算以及DAG運算的支持
- Shuffle過程多次排序和落地,MR之間的數據需要落Hdfs文件系統 </ul>
- 提供了一套支持DAG圖的分布式并行計算的編程框架,減少多次計算之間中間結果寫到Hdfs的開銷
- 提供Cache機制來支持需要反復迭代計算或者多次數據共享,減少數據讀取的IO開銷
- 使用多線程池模型來減少task啟動開稍,shuffle過程中避免不必要的sort操作以及減少磁盤IO操作
- 廣泛的數據集操作類型 </ul>
- 騰訊。廣點通是最早使用Spark的應用之一。騰訊大數據精準推薦借助Spark快速迭代的優勢,圍繞“數 據+算法+系統”這套技術方案,實現了在“數據實時采集、算法實時訓練、系統實時預測”的全流程實時并行高維算法,最終成功應用于廣點通pCTR投放系統 上,支持每天上百億的請求量。基于日志數據的快速查詢系統業務構建于Spark之上的Shark,利用其快速查詢以及內存表等優勢,承擔了日志數據的即席 查詢工作。在性能方面,普遍比Hive高2-10倍,如果使用內存表的功能,性能將會比Hive快百倍。
- Yahoo。Yahoo將Spark用在Audience Expansion中的應用。Audience Expansion是廣告中尋找目標用戶的一種方法:首先廣告者提供一些觀看了廣告并且購買產品的樣本客戶,據此進行學習,尋找更多可能轉化的用戶,對他 們定向廣告。Yahoo采用的算法是logistic regression。同時由于有些SQL負載需要更高的服務質量,又加入了專門跑Shark的大內存集群,用于取代商業BI/OLAP工具,承擔報表 /儀表盤和交互式/即席查詢,同時與桌面BI工具對接。目前在Yahoo部署的Spark集群有112臺節點,2TB內存。
- 淘寶。阿里搜索和廣告業務,最初使用Mahout或者自己寫的MR來解決復雜的機器學習,導致效率低而且代碼 不易維護。淘寶技術團隊使用了Spark來解決多次迭代的機器學習算法、高計算復雜度的算法等。將Spark運用于淘寶的推薦相關算法上,同時還利用 Graphx解決了許多生產問題,包括以下計算場景:基于度分布的中樞節點發現、基于最大連通圖的社區發現、基于三角形計數的關系衡量、基于隨機游走的用 戶屬性傳播等。
- 優酷土豆。優酷土豆在使用Hadoop集群的突出問題主要包括:第一是商業智能BI方面,分析師提交任務之后 需要等待很久才得到結果;第二就是大數據量計算,比如進行一些模擬廣告投放之時,計算量非常大的同時對效率要求也比較高,最后就是機器學習和圖計算的迭代 運算也是需要耗費大量資源且速度很慢。最終發現這些應用場景并不適合在MapReduce里面去處理。通過對比,發現Spark性能比MapReduce 提升很多。首先,交互查詢響應快,性能比Hadoop提高若干倍;模擬廣告投放計算效率高、延遲小(同hadoop比延遲至少降低一個數量級);機器學 習、圖計算等迭代計算,大大減少了網絡傳輸、數據落地等,極大的提高的計算性能。目前Spark已經廣泛使用在優酷土豆的視頻推薦(圖計算)、廣告業務 等。 </ul>
- 經過改造和優化的Shark和Spark吸收了TDW平臺的功能,如Hive的特有功能:元數據重構,分區優化等,同時可以通過IDE或者洛子調度來直接執行HiveSql查詢和定時調度Spark的任務;
- 與Gaia和TDW的底層存儲直接兼容,可以直接安全且高效地使用TDW集群上的數據;
- 對Spark底層的使用門檻,資源管理與調度,任務監控以及容災等多個功能進行完善,并支持快速的遷移和擴容。 </ul>
- 節點關系表relation,字段有id, fid,表示兩個節點存在關系。
- 節點特征表features,字段有id, feature,表示每個節點具有的特征信息。 </ul>
- 通過兩次JOIN操作,生成一張臨時表,臨時表中的一個元組對應節點關系表中的一對節點和這兩個節點的特征向量。
- 遍歷臨時表,對每個元組中的兩個節點計算其相似度。 </ol>
- 采用二維圖劃分的思想,減少節點的復制數目
- 每個數據分區中,對于同一個節點,只保留一份該節點特征向量 </ol>
- 利用二維劃分方法將節點關系表劃分成多個數據分區,假設我們將分區數設為4,則Table 1所示的節點關系表將會劃分到4個分區,每個元組對應的分區如下Table 3所示:
- 根據每個分區中的節點列表,計算出每個節點所在的分區列表,稱為路由表,記錄了每個節點所在的分區信息,其結果如Table 4所示。
- 根據路由表將每個節點的特征向量發送至每個分區之中,保證每個分區中一個節點只保存一份特征向量,如Table 5所示。
- 對于每個分區,將該分區的關系集合與該分區中所有結點的特征向量進行關聯,遍歷每對節點關系,利用相似度函數和特征向量計算二者的相似度。 </ol>
- 優化分區參數設置。在相似度計算的應用中,分區個數越多,會導致節點的復制份數增加,從而增大網絡數據傳輸量。因此我們基于中間結果的統計信息來確定確定分區個數,使得在充分利用每個節點內存和CPU的前提下,最小化分區個數。
- 優化內存表示。由于數據量大,對象個數多,導致內存使用量較高,GC時間較長。我們使用列存儲格式來對內存數據進行壓縮,減少數據量的同時也減少了對象個數。
- 提高網絡穩定性。隨著集群中機器數目的增加,網絡連接數也會成倍增加。當網絡出現擁擠時,經常會伴隨著連接超時從而導致shuffle數據拉取失 敗。更糟糕的情況是,網絡超時會讓Master誤認為Executor已經丟失,故會使得整個Executor上已經完成的任務全部重做。因此在 shuffle時增加網絡超時重試機制,同時控制每次發送的請求連接數,避免shuffle拉數據超時,減少任務失敗次數,防止Executor丟失的情 況出現。
- 使用sort-based shuffle時將文件塊索引信息緩存一份在內存中,后續拉數據時直接讀內存獲取索引信息。預測執行時,當同一任務的一批運行實例有一個完成時,殺掉正在運行的其余實例,提早釋放計算資源。
- 參數調整。由于每個Executor進程還會使用到堆外內存,因此Executor進程占用的內存往往會大于JVM設定的最大值,為了保證 Gaia不會將超過JVM內存的Executor進程殺掉,配置參數yarn.executor.memoryOverhead以免被kill。由于 Executor在Full GC時需要較長時間,需要配置參數spark.storage.blockManagerSlaveTimeoutMs來延長blockManager的 超時時間。 </ul>
- User-based CF: 基于User的協同過濾,通過不同用戶對Item的評分來評測用戶之間的相似性,根據用戶之間的相似性做出推薦;
- Item-based CF: 基于Item的協同過濾,通過用戶對不同Item的評分來評測Item之間的相似性,根據Item之間的相似性做出推薦;
- Model-based CF: 以模型為基礎的協同過濾(Model-based Collaborative Filtering)是先用歷史資料得到一個模型,再用此模型進行預測推薦。 </ul>
- 計算用戶喜好:不同用戶對Item的評分數值可能相差較大,因此需要先對每個用戶的評分做二元化處理,例如對于某一用戶對某一Item的評分大于其給出的平均評分則標記為好評1,否則為差評0。
- 計算Item相似性:采用Jaccard系數作為計算兩個Item的相似性方法。狹義Jaccard相似度適合計算兩個集合之間的相似程度,計算方法為兩個集合的交集除以其并集,具體的分為以下三步。 </ul>
- Item好評數統計,統計每個Item的好評用戶數。
- Item好評鍵值對統計,統計任意兩個有關聯Item的相同好評用戶數。
- Item相似性計算,計算任意兩個有關聯Item的相似度。 </ol>
- 找出最相似的前N個Item。這一步中,Item的相似度還需要歸一化后整合,然后求出每個Item最相似的前N個Item,具體的分為以下三步。 </ul>
- Item相似性歸一化。
- Item相似性評分整合。
- 獲取每個Item相似性最高的前N個Item。 </ol>
- 為了實現一個業務邏輯需要使用七個MapReduce作業,七個作業間的數據交換通過HDFS完成,增加了網絡和磁盤的開銷。
- 七個作業都需要分別調度到集群中運行,增加了Gaia集群的資源調度開銷。
- MR2和MR3重復讀取相同的數據,造成冗余的HDFS讀寫開銷。 </ul>
- DAG編程模型。通過Spark的DAG編程模型可以把七個MapReduce簡化為一個Spark作業。Spark會把該作業自動切分為八個 Stage,每個Stage包含多個可并行執行的Tasks。Stage之間的數據通過Shuffle傳遞。最終只需要讀取和寫入HDFS一次。減少了六 次HDFS的讀寫,讀寫HDFS減少了70%。
- Spark作業啟動后會申請所需的Executor資源,所有Stage的Tasks以線程的方式運行,共用Executors,相對于MapReduce方式,Spark申請資源的次數減少了近90%。
- Spark引入了RDD(Resilient Distributed Dataset)模型,中間數據都以RDD的形式存儲,而RDD分布存儲于slave節點的內存中,這就減少了計算過程中讀寫磁盤的次數。RDD還提供了 Cache機制,例如對上圖的rdd3進行Cache后,rdd4和rdd7都可以訪問rdd3的數據。相對于MapReduce減少MR2和MR3重復 讀取相同數據的問題。 </ul>
Spark在很多方面都彌補了MapReduce的不足,比MapReduce的通用性更好,迭代運算效率更高,作業延遲更低,它的主要優勢包括:
MapReduce由于其設計上的約束只適合處理離線計算,在實時查詢和迭代計算上仍有較大的不足,而隨著業務的發展,業界對實時查詢和迭代分析有 更多的需求,單純依靠MapReduce框架已經不能滿足業務的需求了。Spark由于其可伸縮、基于內存計算等特點,且可以直接讀寫Hadoop上任何 格式的數據,成為滿足業務需求的最佳候選者。
應用Spark的成功案例
目前大數據在互聯網公司主要應用在廣告、報表、推薦系統等業務上。在廣告業務方面需要大數據做應用分析、效果分析、定向優化等,在推薦系統方面則需 要大數據優化相關排名、個性化推薦以及熱點點擊分析等。這些應用場景的普遍特點是計算量大、效率要求高。Spark恰恰滿足了這些要求,該項目一經推出便 受到開源社區的廣泛關注和好評。并在近兩年內發展成為大數據處理領域最炙手可熱的開源項目。本章將列舉國內外應用Spark的成功案例。
騰訊大數據Spark的概況
騰訊大數據綜合了多個業務線的各種需求和特性,目前正在進行以下工作:
Spark在相似度計算方面的應用
相似度是指兩個節點之間特定屬性的相似程度,相似度計算是數據挖掘、推薦引擎中的最基本問題。例如在推薦系統中通過計算推薦物品的相似度,從而給目 標用戶推薦與他喜歡的物品相似度較高的物品,或是計算用戶之間的相似度,給目標用戶推薦與其相似的用戶喜歡的物品。因此,相似度計算技術在很大程度上決定 著推薦系統的性能。
隨著大數據時代的來臨,日益增加的數據量使得單機的計算能力已經遠遠無法滿足需求。在對大規模的節點對進行相似度計算時,分布式處理往往是可行的解 決方案。MapReduce是目前流行的分布式編程框架。Hadoop與Spark是MapReduce編程模型的兩個開源實現。相比于 Hadoop,Spark提供了cache機制,增加了對迭代計算的支持;還提供了DAG調度來支持復雜的計算任務,減少了中間結果的磁盤讀寫,能夠獲得 更佳的性能。
問題描述
輸入數據可以表示成兩張表:
下列兩個表格表示了在一個擁有6個節點的關系網絡中,節點關系表和節點特征表的情況。
相似度計算即是對節點關系表中的所有節點對 (id,fid),其特征向量分別為 和,利用相似度計算函數similarity-Calculation,計算和之間的相似度。相似度計算函數similarity-Calculation依據具體的相似度衡量方法而定。
MapReduce 解決方案
Hive是建立在Hadoop之上提供SQL接口處理的海量數據處理工具,對于上述相似度計算問題,其計算流程可以用如下SQL來描述,并使用Hive來計算。
整個計算流程可以分為兩個步驟:
下圖展示了該SQL語句的執行過程:
使用Hive對千億節點關系記錄進行相似度計算,兩次JOIN操作成為性能的主要瓶頸瓶頸。在兩次JOIN的過程中,網絡數據傳輸和磁盤讀寫達到了 200TB,集群多數結點的硬盤無法支持,任務失敗經常發生,作業運行了時間超過了24小時。通過將節點關系表拆分成多個子表,每個子表獨立地進行相似度 計算,多個子表的任務并行執行,最后再將多個子作業的結果匯總,得到最終結果。采用這樣的方式,作業總時間仍然超過了24小時。
Spark解決方案
通過對Hive計算過程的分析,我們發現網絡數據開銷主要來自于節點特征向量的大量復制。對于節點關系表中的每對關系,計算時都需要得到兩個節點的特征向量,從而導致了大量的數據復制。因此,我們從兩個方面去減少數據復制:
二維圖劃分方法:
任何一張關系網絡,都可以用一個大矩陣M來表示,矩陣的兩個維度用來表示節點,矩陣的元素M[i, j]表示節點i和節點j是否存在關聯,如果存在,則M[i,j]值為1,否則,M[i, j]值為0。下圖展示了通過采用二維劃分的方法,將一個矩陣劃分成了16個分區。
使用二維劃分可以減少節點的復制數目。假設分區總數為,采用一維劃分的方法,最差情況下每個節點的復制份數是,即每個分區都會有該節點的復制;采用 二維劃分方法,最差情況下每個節點的復制份是 。對于大數據量,分區總數通常很大,所以采用二維劃分通常可以減少每個節點的復制份數。
計算步驟:
通過以上步驟,即可以計算出節點關系表中每對節點的相似度。與MapReduce的計算方法相比,如果一個用戶多次出現在同一個分區中,比如用戶1 在分區1中出現了兩次,上述計算步驟只會將用戶1的特征向量發送一份到分區1中,但是MapReduce的計算方法會發送兩次,產生冗余的網絡數據傳輸。 使用上述計算方法,我們將網絡傳輸量降到了50 T,遠小于MapReduce方法的網絡傳輸量。
系統層次優化:
除了在計算流程上進行改進,我們還對Spark進行了以下方面的優化:
實驗對比
實驗環境:分別在擁有200臺、600臺和1000臺TS5機器節點的集群上進行了對比,每臺機器擁有64GB內存,2*12T硬盤,24線程 CPU。在兩個數據集上進行了Hadoop、社區GraphX和TDW-Spark的性能對比,一個數據集擁有五百億節點對,而另一個擁有千億量級的節點 對。實驗結果如下表所示:
通過上述實驗對比,可以看出在MapReduce上的實現的性能遠遠低于在Spark上的性能,使用JOIN的方法使得網絡通信開銷非常大,五百億 數據集的任務執行時間超過12個小時,千億數據集任務執行時間超過24個小時;GraphX采用的同樣是二維圖劃分,但是由于其是一個面向通用的圖計算框 架,維護了復雜的數據結構和計算流程,造成性能下降。同時,GraphX在網絡穩定性方面存在許多問題,當集群規模達到600臺時便會有大量的任務失敗。
與前兩者相比,TDW-Spark在集群為200臺時在兩個數據集上都獲得了較大的性能增長,所消耗時間少于GraphX的一半。當集群規模從 200臺擴充至600臺,TDW-Spark在五百億節點對數據集上獲得加速比218%,在千億節點上的加速比為280%;當集群規模從200臺擴充至 1000臺時,加速比分別為279%和350%。因此,TDW-Spark不僅在性能上獲得了很大的提升,還可以在千臺規模的集群之上穩定運行,同時獲得 良好的水平擴展能力。
Spark在基于物品的協同過濾推薦算法的應用
互聯網的發展導致了信息爆炸。面對海量的信息,如何對信息進行刷選和過濾,將用戶最關注最感興趣的信息展現在用戶面前,已經成為了一個亟待解決的問 題。推薦系統可以通過用戶與信息之間的聯系,一方面幫助用戶獲取有用的信息,另一方面又能讓信息展現在對其感興趣的用戶面前,實現了信息提供商與用戶的雙 贏。
協同過濾推薦(Collaborative Filtering Recommendation)算法是最經典最常用的推薦算法,算法通過分析用戶興趣,在用戶群中找到指定用戶的相似用戶,綜合這些相似用戶對某一信息的 評價,形成系統對該指定用戶對此信息的喜好程度預測。協同過濾可細分為以下三種:
問題描述
輸入數據格式:Uid,ItemId,Rating (用戶Uid對ItemId的評分)。
輸出數據:每個ItemId相似性最高的前N個ItemId。
由于篇幅限制,這里我們只選擇基于Item的協同過濾算法解決這個例子。
算法邏輯:基于Item的協同過濾算法的基本假設為兩個相似的Item獲得同一個用戶的好評的可能性較高。因此,該算法首先計算用戶對物品的喜好程度,然后根據用戶的喜好計算Item之間的相似度,最后找出與每個Item最相似的前N個Item。該算法的詳細描述如下:
基于MapReduce的實現方案
使用MapReduce編程模型需要為每一步實現一個MapReduce作業,一共存在包含七個MapRduce作業。每個MapReduce作業 都包含Map和Reduce,其中Map從HDFS讀取數,輸出數據通過Shuffle把鍵值對發送到Reduce,Reduce階段 以
七個MapReduce作業意味著需要七次讀取和寫入HDFS,而它們的輸入輸出數據存在關聯,七個作業輸入輸出數據關系如圖所示。
基于MapReduce實現此算法存在以下問題:
這些問題導致作業運行時間大大增長,作業成本增加。
基于Spark的實現方案
相比與MapReduce編程模型,Spark提供了更加靈活的DAG(Directed Acyclic Graph) 編程模型,不僅包含傳統的map、reduce接口,還增加了filter、flatMap、union等操作接口,使得編寫Spark程序更加靈活方 便。使用Spark編程接口實現上述的業務邏輯如圖所示。
相對于MapReduce,Spark在以下方面優化了作業的執行時間和資源使用。
效果對比
測試使用相同規模的資源,其中MapReduce方式包含200個Map和100個Reduce,每個Map和Reduce配置4G的內存; 由于Spark不再需要Reduce資源,而MapReduce主要邏輯和資源消耗在Map端,因此使用200和400個Executor做測試,每個 Executor包含4G內存。測試結果如下表所示,其中輸入記錄約38億條。
對比結果表的第一行和第二行,Spark運行效率和成本相對于MapReduce方式減少非常明顯,其中,DAG模型減少了70%的HDFS讀寫、 cache減少重復數據的讀取,這兩個優化即能減少作業運行時間又能降低成本;而資源調度次數的減少能提高作業的運行效率。對比結果表的第二行和第三行, 增加一倍的Executor數目,作業運行時間減少約50%,成本增加約25%,從這個結果看到,增加Executor資源能有效的減少作業的運行時間, 但并沒有做到完全線性增加。這是因為每個Task的運行時間并不是完全相等的, 例如某些task處理的數據量比其他task多;這可能導致Stage的最后時刻某些Task未結束而無法啟動下一個Stage,另一方面作業是一直占有 Executor的,這時候會出現一些Executor空閑的狀況,于是導致了成本的增加。
參考鏈接:







