hadoop作業調優參數整理及原理(主要為shuffle過程)
1 Map side tuning參數
1.1 MapTask運行內部原理
當map task開始運算,并產生中間數據時,其產生的中間結果并非直接就簡單的寫入磁盤。這中間的過程比較復雜,并且利用到了內存buffer來進行已經產生的 部分結果的緩存,并在內存buffer中進行一些預排序來優化整個map的性能。如上圖所示,每一個map都會對應存在一個內存 buffer(MapOutputBuffer,即上圖的buffer in memory),map會將已經產生的部分結果先寫入到該buffer中,這個buffer默認是100MB大小,但是這個大小是可以根據job提交時的 參數設定來調整的,該參數即為:io.sort.mb。當map的產生數據非常大時,并且把io.sort.mb調 大,那么map在整個計算過程中spill的次數就勢必會降低,map task對磁盤的操作就會變少,如果map tasks的瓶頸在磁盤上,這樣調整就會大大提高map的計算性能。map做sort和spill的內存結構如下如所示:
map在運行過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer并不一定能將全部的map輸出緩存下來,當map輸出超出一 定閾值(比如100M),那么map就必須將該buffer中的數據寫入到磁盤中去,這個過程在mapreduce中叫做spill。map并不是要等到 將該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的情況。所 以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個閾值也是由一個job的配置參數來控制,即io.sort.spill.percent,默認為0.80或80%。這個參數同樣也是影響spill頻繁程度,進而影響map task運行周期對磁盤的讀寫頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對用戶來說更加方便。
當map task的計算部分全部完成后,如果map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結果。map在正常退出之前,需要將這 些spill合并(merge)成一個,所以map在結束之前還有一個merge的過程。merge的過程中,有一個參數可以調整這個過程的行為,該參數 為:io.sort.factor。該參數默認為10。它表示當merge spill文件時,最多能有多少并行的stream向merge文件中寫入。比如如果map產生的數據非常的大,產生的spill文件大于10,而 io.sort.factor使用的是默認的10,那么當map計算完成做merge時,就沒有辦法一次將所有的spill文件merge成一個,而是會 分多次,每次最多10個stream。這也就是說,當map的中間結果非常大,調大io.sort.factor,有利于減少merge次數,進而減少 map對磁盤的讀寫頻率,有可能達到優化作業的目的。
當job指定了combiner的時候,我們都知道map介紹后會在map端根據combiner定義的函數將map結果進行合并。運行combiner函數的時機有可能會是merge完成之前,或者之后,這個時機可以由一個參數控制,即min.num.spill.for.combine(default 3),當job中設定了combiner,并且spill數最少有3個的時候,那么combiner函數就會在merge產生結果文件之前運行。通過這樣 的方式,就可以在spill非常多需要merge,并且很多數據需要做conbine的時候,減少寫入到磁盤文件的數據數量,同樣是為了減少對磁盤的讀寫 頻率,有可能達到優化作業的目的。
減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最后merge產生的結果文件,都是 可以壓縮的。壓縮的好處在于,通過壓縮減少寫入讀出磁盤的數據量。對中間結果非常大,磁盤速度成為map執行瓶頸的job,尤其有用。控制map中間結果 是否使用壓縮的參數為:mapred.compress.map.output(true/false)。將這個參數 設置為true時,那么map在寫中間結果時,就會將數據壓縮后再寫入磁盤,讀結果時也會采用先解壓后讀取數據。這樣做的后果就是:寫入磁盤的中間結果數 據量會變少,但是cpu會消耗一些用來壓縮和解壓。所以這種方式通常適合job中間結果非常大,瓶頸不在cpu,而是在磁盤的讀寫的情況。說的直白一些就 是用cpu換IO。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常復雜。所以對中間結果采用壓縮通常來說是有收益的。以下是一個 wordcount中間結果采用壓縮和不采用壓縮產生的map中間結果本地磁盤讀寫的數據量對比:
map中間結果不壓縮:
map中間結果壓縮:
可以看出,同樣的job,同樣的數據,在采用壓縮的情況下,map中間結果能縮小將近10倍,如果map的瓶頸在磁盤,那么job的性能提升將會非常可觀。
當采用map中間結果壓縮的情況下,用戶還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現在hadoop支持的壓縮格式 有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁盤壓縮 比,LzoCodec比較適合。但也要取決于job的具體情況。用戶若想要自行選擇中間結果的壓縮算法,可以設置配置參數:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用戶自行選擇的壓縮方式。
1.2 Map side相關參數調優
選項 | 類型 | 默認值 | 描述 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
io.sort.mb | int | 100 | 緩存map中間結果的buffer大小(in MB) | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
io.sort.record.percent | float | 0.05 | io.sort.mb中用來保存map output記錄邊界的百分比,其他緩存用來保存數據 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
io.sort.spill.percent | float | 0.80 | map開始做spill操作的閾值 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
io.sort.factor | int | 10 | 做merge操作時同時操作的stream數上限。 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
min.num.spill.for.combine | int | 3 | combiner函數運行的最小spill數 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
mapred.compress.map.output | boolean | false | map中間結果是否采用壓縮 | </tr>|||||||||||||||||||||||||||||||||||||||||||||||||||||
mapred.map.output.compression.codec | class name | org.apache.hadoop.io. compress.DefaultCodec |
map中間結果的壓縮格式 | </tr> </tbody> </table>
選項 | 類型 | 默認值 | 描述 | </tr>|||||||||||||||||||||||||
mapred.reduce.parallel.copies | int | 5 | 每個reduce并行下載map結果的最大線程數 | </tr>|||||||||||||||||||||||||
mapred.reduce.copy.backoff | int | 300 | reduce下載線程最大等待時間(in sec) | </tr>|||||||||||||||||||||||||
io.sort.factor | int | 10 | 同上 | </tr>|||||||||||||||||||||||||
mapred.job.shuffle.input.buffer.percent | float | 0.7 | 用來緩存shuffle數據的reduce task heap百分比 | </tr>|||||||||||||||||||||||||
mapred.job.shuffle.merge.percent | float | 0.66 | 緩存的內存中多少百分比后開始做merge操作 | </tr>|||||||||||||||||||||||||
mapred.job.reduce.input.buffer.percent | float | 0.0 | sort完成后reduce計算階段用來緩存數據的百分比 | </tr> </tbody> </table>