Spark性能優化——和shuffle搏斗

Spark的性能分析和調優很有意思,今天再寫一篇。主要話題是shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了 幾種不同場景的性能優化 ,包括portal的性能優化,web service的性能優化,還有Spark job的性能優化。Spark的性能優化有一些特殊的地方,比如實時性一般不在考慮范圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這么一個大家伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大boss”,通常抓住和解決大boss以后,能解決其中一大部分問題。比如對于portal來說,是頁面靜態化,對于web service來說,是高并發(當然,這兩種可以說并不確切,這只是針對我參與的項目總結的經驗而已),而對于Spark來說,這個大boss就是shuffle。

首先要明確什么是shuffle。Shuffle指的是從map階段到reduce階段轉換的時候,即map的output向著reduce的input映射的時候,并非節點一一對應的,即干map工作的slave A,它的輸出可能要分散跑到reduce節點A、B、C、D …… X、Y、Z去,就好像shuffle的字面意思“洗牌”一樣,這些map的輸出數據要打散然后根據新的路由算法(比如對key進行某種hash算法),發送到不同的reduce節點上去。(下面這幅圖來自 《Spark Architecture: Shuffle》

為什么說shuffle是Spark job的大boss,就是因為Spark本身的計算通常都是在內存中完成的,比如這樣一個map結構的RDD:(String, Seq),key是字符串,value是一個Seq,如果只是對value進行一一映射的map操作,比如(1)先計算Seq的長度,(2)再把這個長度作為元素添加到Seq里面去。這兩步計算,都可以在local完成,而事實上也是在內存中操作完成的,換言之,不需要跑到別的node上去拿數據,因此執行的速度是非常快的。但是,如果對于一個大的rdd,shuffle發生的時候,就會因為網絡傳輸、數據序列化/反序列化產生大量的磁盤IO和CPU開銷。這個性能上的損失是非常巨大的。

要減少shuffle的開銷,主要有兩個思路:

  1. 減少shuffle次數,盡量不改變key,把數據處理在local完成;
  2. 減少shuffle的數據規模。

先去重,再合并

比如有A、B這樣兩個規模比較大的RDD,如果各自內部有大量重復,那么二者一合并,再去重:

A.union(B).distinct()

這樣的操作固然正確,但是如果可以先各自去重,再合并,再去重,可以大幅度減小shuffle的開銷(注意Spark的默認union和Oracle里面的“union all”很像——不去重):

A.distinct().union(B.distinct()).distinct()

看起來變復雜了對不對,但是當時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。

如果中間結果rdd如果被調用多次,可以顯式調用cache()和persist(),以告知Spark,保留當前rdd。當然,即便不這么做,Spark依然存放不久前計算過的結果(以下來自 官方指南 ):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

數據量大,并不一定慢。通常情況下,由于Spark的job是放到內存里面進行運算的,因此一個復雜的map操作不一定執行起來很慢。但是如果牽涉到shuffle,這里面有網絡傳輸和序列化的問題,就有可能非常慢。

類似地,還有filter等等操作,目的也是要先對大的RDD進行“瘦身”操作,然后在做其他操作。

mapValues比map好

明確key不會變的map,就用mapValues來替代,因為這樣可以保證Spark不會shuffle你的數據:

A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}

改成:

A.map{case ((B, C), (D, E)) => (B, C, E)}

用broadcast + filter來代替join

這種優化是一種特定場景的神器,就是拿大的RDD A去join一個小的RDD B,比如有這樣兩個RDD:

  • A的結構為(name, age, sex),表示全國人民的RDD,超大
  • B的結果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個RDD顯然很小,因為人的年齡范圍在0~200歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

A.map{case (name, age, sex) => (age, (name, sex))}
 .join(B)
 .map{case (age, ((name, sex), title)) => (name, age, sex)}

你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

val b = sc.broadcast(B.collectAsMap)
A.filter{case (name, age, sex) => b.values.contains(age)}

一次shuffle都沒有,A老老實實待著不動,等著全量的B被分發過來。

另外,在Spark SQL里面直接有BroadcastHashJoin,也是把小的rdd廣播出去。

不均勻的shuffle

在工作中遇到這樣一個問題,需要轉換成這樣一個非常巨大的RDD A,結構是(countryId, product),key是國家id,value是商品的具體信息。當時在shuffle的時候,這個hash算法是根據key來選擇節點的,但是事實上這個countryId的分布是極其不均勻的,大部分商品都在美國(countryId=1),于是我們通過Ganglia看到,其中一臺slave的CPU特別高,計算全部聚集到那一臺去了。

找到原因以后,問題解決就容易了,要么避免這個shuffle,要么改進一下key,讓它的shuffle能夠均勻分布(比如可以拿countryId+商品名稱的tuple作key,甚至生成一個隨機串)。

明確哪些操作必須在master完成

如果想打印一些東西到stdout里去:

A.foreach(println)

想把RDD的內容逐條打印出來,但是結果卻沒有出現在stdout里面,因為這一步操作被放到slave上面去執行了。其實只需要collect一下,這些內容就被加載到master的內存中打印了:

A.collect.foreach(println)

再比如,如果遇到RDD操作嵌套的情況,通常考慮優化掉,因為只有master才能去理解和執行RDD的操作,slave只能處理被分配的task而已。比如:

A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

就可以用join來代替:

A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

用reduceByKey代替groupByKey

這一條應該是比較經典的了。reduceByKey會在當前節點(local)中做reduce操作,也就是說,會在shuffle前,盡可能地減小數據量。而groupByKey則不是,它會不做任何處理而直接去shuffle。當然,有一些場景下,功能上二者并不能互相替換。因為reduceByKey要求參與運算的value,并且和輸出的value類型要一樣,但是groupByKey則沒有這個要求。

有一些類似的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。

另外,還有一條類似的是用treeReduce來代替reduce,主要是用于單個reduce操作開銷比較大,可以條件treeReduce的深度來控制每次reduce的規模。

 

來自: http://www.raychase.net/3788

 

 本文由用戶 xc43fgt66 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!