四兩撥千斤:借助Spark GraphX將QQ千億關系鏈計算提速20倍

watsons 9年前發布 | 12K 次閱讀 Spark 分布式/云計算/大數據 GraphX

騰訊QQ有著國內最大的關系鏈,而共同好友數,屬于社交網絡分析的基本指標之一,是其它復雜指標的基礎。借助Spark GraphX,我們用寥寥100行核心代碼,在高配置的TDW-Spark集群上,只花了2個半小時,便完成了原來需要2天的全量共同好友計算。這標志著QQ千億級別的關系鏈計算進入了小時級別時代,并具備復雜圖模型的快速計算能力。

問題描述

共同好友數可以用于刻畫用戶與用戶間的關系緊密程度,包括 陌生人/熟人分析,好友親密度,好友推薦,社團劃分等各個方面,是社交網絡分析的最基礎指標。其計算邏輯非常簡單明了。為了簡化模型和降低計算量,這里加了幾個約束:

只有好友之間才進行計算

好友關系是有向的

不關注具體的好友

顯而易見,用戶5和6的共同好友數為4。這個計算貌似非常簡單,但是當圖的規模擴展到騰訊的級別:用戶數(點)為十億級別,關系數(邊)為千億級別時,那這個問題就一點都不簡單了。

大致估算一下,假設每個節點平均的好友數是100,每個點id為Long型,占用8個字節,如果用普通Join計算的話,那么中間的數據量大概是1 billion* 800*800B=640TB,需要通過網絡傳輸640TB的數據,這個量非常的恐怖了,想在一個SQL中完成,幾乎是不可能的。原來舊的計算方式,只能通過分而治之的方法實現。通過把關系鏈表,按號段拆分60份,分別連接用戶好友全量表,分成多個SQL任務運行。這樣做每個SQL任務都需要載入一次全量關系鏈,磁盤 I/O 時間嚴重拖慢計算進度,整個過程需要耗費超過兩天的計算時間。

其實共同好友數是典型的圖問題之一,因此很自然的,我們想到了引入新的圖計算框架和模型,來優化計算過程,讓這個過程更加高效和科學。

框架選擇

目前的分布式圖計算框架,并沒有太多好的選擇,在過去一年半中,業界的分布式圖計算框架,進展基本停滯。經過反復選擇,我們還是選擇了GraphX,主要原因有如下3個:

進展

雖然GraphX本身沒什么進展,但是Spark本身的發展很快,從1.4到1.6版本,Spark Core在性能和穩定性上有了不少的提升。GraphX某種程度上,多多少少還是得到了好處

語義

GraphX的語義和運算符相對豐富,可以進行比較好的圖算法描述,適合變化多樣的圖需求

門檻

GraphX的最大消耗是內存,某種程度上,這是個比較低門檻的投入,可以在預期內得到解決

基于這樣的考慮,我們選擇了GraphX作為共同好友算法的底層框架,并從軟件和硬件兩方面入手,進行了一系列的優化。

模型簡化

基于GraphX的圖模型思想,我們進行數好友模型的簡化。整個過程分為兩個階段:

Phrase1——找鄰居

這個階段,其實就是一放一收,和Map-Reduce模型有異曲同工之妙,分3步

1. 每個頂點,將自己的id,發送給自己所有的鄰居

2. 每個頂點,將收到的所有鄰居id,合并為一個List

3. 對新List進行排序,并和原來的圖進行關聯,附到頂點之上

這個階段,使用GraphX的aggregateMessages,定義好sendMsg和mergeMsg的方法,就可輕松實現。

Phrase2——數好友

這個階段,只要一步,但是非常的關鍵,而且計算量也非常大,需要充分的利用了Triplet的特性

1. 遍歷所有的Triplet,對2個好友的有序好友List進行掃描匹配,數出共同好友數,并將其更新到edge之上

這個階段,需要充分利用GraphX的Triplet特性。需要為了實現Triplet功能,GraphX在設計上消耗了很多的內存,無論我們是否使用,它都在那里,靜靜的占用著內存,所以我們要充分利用好它,將數好友的過程,簡化為一次Triplet的遍歷。

如果沒有這個特性,為了數好友之間的共同好友,就要把一個好友的所有一度好友,發送到它所有的鄰居之上,這樣的消息廣播量,是非常巨大的,會形成消息風暴,相當于計算二跳鄰居。根據我們在騰訊數據量上的測試,這個在現有的硬件下,是無法實現的。

這2個階段經過最終優化之后,代碼非常的簡潔,只要聊聊的數十行代碼,便完成了20億級別的點,千億級別的邊的共同好友計算,可謂于無聲處聽驚雷。將眾多的技術難點,都通過GraphX的優化,化解消弭于無形之中。由于產品的需求,這個計算需要是精確數,而不能是近似值,在數好友的過程中,很多的優化方法,不能被用上,否則的話,可以進一步的提升速度。

硬件選擇

在分布式系統中,往往會傾向于用大量的小低配機器,來完成巨大的計算任務。其思想是即便再復雜的計算,只要將大數據,分解為足夠小的數據片,總能在足夠多的機器上,通過性能的降低和時間的拖延,來完成計算任務。但是很遺憾,在圖計算這樣的場景下,尤其是GraphX的設計框架,這個是行不通的。

要發揮GraphX的最佳性能,最少要有128G以上的內存

主要原因有兩個是:

1、節點復制——越小越浪費

GraphX使用了點切割的方式,這是一種用空間換時間的方法,通過將浪費一定的內存,將點和它的鄰居放到一起,減少Executor之間的通信。

如果用小內存的Executor來運行圖算法,假設1個節點,需要10個Executor才能放下它的鄰居,那么它就需要被復制10份,才能進行計算。如果用大內存的Executor,1個Executor就能放下它的所有鄰居,理論上它就只需要被復制一次,大大減少空間占用。

2、節點膨脹——越小越慢

圖計算中,常常會進行消息的擴散和收集,并把最終的結果,匯總到單個節點之上。

以共同好友數模型為例,第一步需要將節點的一跳好友都收集到該節點上。即便根據鄧巴的“150定律”,將一跳好友的個數,限制在150之內。那么圖的占用空間,還是很可能會膨脹150倍。

這個時候,如果內存空間不夠的話,GraphX為了容下所有的數據,會需要在節點之間,進行大量的Shuffle和Spill操作,使得后續的計算,變得非常慢。

其實這兩個問題,在Spark的其它機器學習算法中,或多或少都會有,也是分布式計算系統中,經常面臨的問題。但是在圖計算中,它們是無法被忽略的問題,而且非常的嚴重。所以,這決定了GraphX需要大的內存,才能有良好的性能。

在正常情況下,128G內存,減掉8G的系統占用,剩下120G。這時配置每個Executor 60G內存,2個Core,每個Core分到30G的內存。這時不需要申請太多的Executor,經過合理的性能優化,全量關系鏈計算,可以運行成功。

性能優化

即便有了良好的模型和硬件保障,在面對QQ如此巨型的關系鏈時,依然需要熟練運用GraphX的技巧,并避開各種雷區,才能最終到達終點。簡單總結兩點:

1、圖緩存:To Cache or not to Cache? That is a question

Spark和GraphX原本設計的精妙之處,亮點之一,便在于Cache,也就Persist(MEMORY_ONLY),或者Persist(MEMORY_AND_DISK)。可以把RDD和Graph,Cache到內存中,方便多次調用而無需重新計算。那么是否對所有的RDD,或者圖,都Cache一下,是最佳的選擇呢?答案是未必。

判斷是否要Cache一個Graph或RDD,最簡單和重要的標準,就是

該Graph,是否會在后續的過程中,被直接使用多次,包括迭代。

如果會,那么這個Graph就要被Persist,然后通過action觸發

如果不會,那么反過來,最好把這個Graph直接unpersist掉

一個Graph被Cache的話,一般最終體現為2個RDD的Cache,一個是Edge,一個是Vertex,其占用量是非常巨大的。在整體空間有限的情況下,cache會導致內存的使用量大大加劇,引發多次GC和重算,反而會拖慢速度。在QQ全量的關系鏈計算,一個全量圖是非常大的,因此如果在一個圖沒被多次使用,那么先unpersist,再返回給下一個計算步驟,反而成了最佳實踐。

示例代碼如下:

val oneNbrGraph = computeOneNbr(originalGraph)

oneNbrGraph.unpersist()

val resultRdd = oneNbrGraph.triplets.map { 

…………

}

當然,既然unpersist了,切記 它只能被再用一次了。

2、分區策略:EdgePartition2D

對GraphX有所了解的人,應該都知道,有4種分區的策略,而其中性能最好的,莫過于EdgePartition2D這種邊分區策略。但是由于QQ全量的關系鏈非常的大,所以,如果先用默認策略,構造了圖,再調用partitionBy的方法來改變分區策略,那么會多一步代價非常高的計算。

因此,為了減少不必要的計算步驟,我們建議在構造圖之前,先對Edge使用該策略進行劃分,再用劃分好的Edge RDD,進行圖構建。 示例代碼如下:

``` val edgesRepartitionRdd = edgeRdd.map { case (src, dst) => val pid = PartitionStrategy.EdgePartition2D.getPartition(src, dst, partitionNum) (pid, (src, dst)) }.partitionBy(new HashPartitioner(partitionNum)).map { case (pid, (src, dst)) => Edge(src, dst, 1) }

val g = Graph.fromEdges(

edgesRepartitionRdd,

...

)

```

當然,有個非常重要的Hint別忘記:partitionNum必須是平方數,才能達到最佳的性能。

最終效果

經過反復多次優化之后,在高配置集群的資源充足情況下,使用如下配置項:

|配置項|配置值| |---|---| |executors | 360 | |executor-cores | 2 | |executor-memory | 50g | |并行度 | 10000 |

總共消耗18T的內存,可以在2個半小時左右,完成整個QQ關系鏈的計算。BTW:

這個配置其實只是可運行配置,并非最佳配置。如果內存有512G,每個Executor配到80G,每個機器6個Executor,每個Executor 4個Core,應該能有更好的表現。可以進一步減少Executor的數目。

集群硬盤是SATA盤,而非SSD硬盤。在整個計算過程中,由于QQ的關系鏈實在龐大,中間的過程也比較復雜,所以不推薦用MEMORYONLY的選項,一旦內存充不下,重算的代價非常高,所以建議用MEMORYAND_DISK。而但是用這種方式意味著和硬盤的通信是必不可少的,因此如果硬盤能換成SSD的高性能盤,對整體性能會有很大提升。

在集群處于正常負荷的情況下,資源充足時,GraphX的任務不發生重跑時,作業可以在2小時10分之內,完成全量計算。但這是在運氣最佳,沒有任何Task發生重跑的情況下的表現。一旦有任務Task失敗,Spark會自動重跑,但是整個計算過程會變得非常長,即便是很少的2-3個Task失敗,也會將計算過程,延長到3個多小時甚至更多,這是因為GraphX的Failover沒做好,而且在有多次迭代的時候,這個現象會更加嚴重。

總結和展望

整個的優化過程,貌似風輕云淡,但是中間經過了反復調優,多次在0.1的抽樣數據和1.0的全量數據之間切換,優化每一步的操作,將硬件和GraphX的性能壓榨到極致,才最終得到這個結果。

在這個過程中,我們發現無論應用層再怎么優化,核心層的軟肋,始終制約著上層算法的性能。包括 Graph中最大限度的預創建和 RDD Cache的激進使用等問題,都會導致性能和穩定性不足,使得很多算法在騰訊級別的圖數據下,顯得捉襟見肘。其實這也難怪,GraphX的代碼,從1.3版本開始,便已經一直沒有變動,基本是在吃Core優化的紅利,沾光提高性能,沒有任何實質性的改進,如果要繼續使用,在核心上必須有所提升才行。

騰訊作為擁有國內最大的關系鏈,在圖計算的領域,無論是處于框架,模型,還是存儲,都大有可為,可以做很多的事情。騰訊的數據平臺部,作為公司的大數據支撐平臺,歡迎在這方面有興趣的業界同仁,和我們進行更多的合作和交流,共同在騰訊關系鏈上,玩出更多社交智能的火花。

來自:https://mp.weixin.qq.com/s?__biz=MzA3MDQ4MzQzMg==&mid=2665690414&idx=1&sn=edec2958aa1fcdcf890ff9a109969702

 

 本文由用戶 watsons 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!