Spark Shuffle之Sort Shuffle
源文件放在github,隨著理解的深入,不斷更新,如有謬誤之處,歡迎指正。 原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/sort-shuffle.md
正如你所知,spark實現了多種shuffle方法,通過 spark.shuffle.manager來確定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認為sort shuffle。本節主要介紹sort shuffle。
從1.2.0開始默認為sort shuffle( spark.shuffle.manager = sort),實現邏輯類似于Hadoop MapReduce,Hash Shuffle每一個reducers產生一個文件,但是Sort Shuffle只是產生一個按照reducer id排序可索引的文件,這樣,只需獲取有關文件中的相關數據塊的位置信息,并fseek就可以讀取指定reducer的數據。但對于rueducer數比較少的情況,Hash Shuffle明顯要比Sort Shuffle快,因此Sort Shuffle有個“fallback”計劃,對于reducers數少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),我們使用fallback計劃,hashing相關數據到分開的文件,然后合并這些文件為一個,具體實現為 BypassMergeSortShuffleWriter 。
在map進行排序,在reduce端應用Timsort[1]進行合并。map端是否容許spill,通過 spark.shuffle.spill 來設置,默認是true。設置為false,如果沒有足夠的內存來存儲map的輸出,那么就會導致OOM錯誤,因此要慎用。
用于存儲map輸出的內存為: “JVM Heap Size” \* spark.shuffle.memoryFraction \* spark.shuffle.safetyFraction ,默認為 “JVM Heap Size” \* 0.2 \* 0.8 = “JVM Heap Size” \* 0.16 。如果你在同一個執行程序中運行多個線程(設定 spark.executor.cores/ spark.task.cpus 超過1),每個map任務存儲的空間為 “JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus , 默認2個cores,那么為 0.08 * “JVM Heap Size” 。
spark使用AppendOnlyMap存儲map輸出的數據,利用開源hash函數 MurmurHash3 和平方探測法把key和value保存在相同的array中。這種保存方法可以是spark進行combine。如果spill為true,會在spill前sort。
Sort Shuffle內存的源碼級別更詳細說明可以參考[4],讀寫過程可以參考[5]
優點
- map創建文件量較少
- 少量的IO隨機操作,大部分是順序讀寫
缺點
- 要比Hash Shuffle要慢,需要自己通過 spark.shuffle.sort.bypassMergeThreshold 來設置合適的值。
- 如果使用SSD盤存儲shuffle數據,那么Hash Shuffle可能更合適。
參考
[1][Timsort原理介紹]( http://blog.csdn.net/yangzhongblog/article/details/8184707 )
[2][形式化方法的逆襲——如何找出Timsort算法和玉兔月球車中的Bug?]( http://bindog.github.io/blog/2015/03/30/use-formal-method-to-find-the-bug-in-timsort-and-lunar-rover/ )
[3][Spark Architecture: Shuffle]( http://0x0fff.com/spark-architecture-shuffle/ )
[4][Spark Sort Based Shuffle內存分析]( http://www.jianshu.com/p/c83bb237caa8 )
[5][Spark Shuffle Write階段磁盤文件分析]( http://www.jianshu.com/p/2d837bf2dab6 )