Spark Shuffle之Hash Shuffle

jopen 8年前發布 | 12K 次閱讀 哈希表 Spark 分布式/云計算/大數據

源文件放在github,如有謬誤之處,歡迎指正。 原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/hash-shuffle.md

正如你所知,spark實現了多種shuffle方法,通過 spark.shuffle.manager來確定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認為sort shuffle。本節主要介紹hash shuffle。

spark在1.2前默認為hash shuffle(spark.shuffle.manager = hash),但hash shuffle也經歷了兩個發展階段。

第一階段

上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數為 2,可以同時運行兩個 task。每個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每個 task 包含 R 個緩沖區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩沖區被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,默認是 32KB(Spark 1.1 版本以前是 100KB)。

第二階段

這樣的實現很簡單,但有幾個問題:

1 產生的 FileSegment 過多。每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數據文件。

2 緩沖區占用內存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 M * R 個 bucket。雖然一個 ShuffleMapTask 結束后,對應的緩沖區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內存空間也就達到了 cores * R * 32 KB 。對于 8 核 1000 個 reducer 來說,占用內存就是 256MB。

spark.shuffle.consolidateFiles默認為false,如果為true,shuffleMapTask輸出文件可以被合并。如圖

可以明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數降為 cores * R 。 但是緩存空間占用大還沒有解決

總結

優點

  1. 快-不需要排序,也不需要維持hash表
  2. 不需要額外空間用作排序
  3. 不需要額外IO-數據寫入磁盤只需一次,讀取也只需一次

缺點

  1. 當partitions大時,輸出大量的文件(cores * R),性能開始降低
  2. 大量的文件寫入,使文件系統開始變為隨機寫,性能比順序寫要降低100倍
  3. 緩存空間占用比較大

當然,數據經過序列化、壓縮寫入文件,讀取的時候,需要反序列化、解壓縮。reduce fetch的時候有一個非常重要的參數 spark.reducer.maxSizeInFlight ,這里用 softBuffer 表示,默認大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。如果增大,reduce請求的chunk就會變大,可以提高性能,但是增加了reduce的內存使用量。

如果排序在reduce不強制執行,那么reduce只返回一個依賴于map的迭代器。如果需要排序, 那么在reduce端,調用 ExternalSorter

參考文獻

spark Architecture:Shuffle

shuffle 過程

sort shuffle

來自: http://www.cnblogs.com/jacksu-tencent/p/5118917.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!