MapReduce 圖解流程超詳細解答(1)-【map階段】
MRAppMaster
</span></span> MapReduce Job的時間線
- Map Phase:若干 Map Tasks 被執行
- Reduce Phase: 若干Reduce Tasks 被執行
reduce可能會在map階段結束之前開始執行,因此上面顯示的有重疊的地方。
Map Phase
現在我們集中考察map相,一個關鍵的問題是一個應用需要多少map任務去運行現在的這個job
用戶給了我們什么?
我們退回到之前的一步,當一個用戶提交一個應用的時候,若干信息被提供給了YARN ,分別是:
- 一個配置:這可以是一部分的,因為一些參數不需要用戶特別指定,可以有自己的默認值。
- 一個jar文件,含有一個map,一個combiner,一個reduce
- 一個輸入和輸出信息 輸入目錄 是不是在hdfs上,有多少文件呢?輸出的時候,我們存儲在哪里
The number of files inside the input directory is used for deciding the number of Map Tasks of a job.
那么,輸入的目錄中文件的數量決定多少個map會被運行起來
多少個map任務?
應用針對每一個分片運行一個map,一般而言,對于每一個輸入的文件會有一個map split。如果輸入文件太大,超過了hdfs塊的大小(64M)那么對于同一個輸入文件我們會有多余2個的map運行起來。下面是FileInputFormat class 的getSplits()的偽代碼:
num_splits = 0 for each input file f: remaining = f.length while remaining / split_size > split_slope: num_splits += 1 remaining -= split_size
where:
split_slope = 1.1 分割斜率 split_size =~ dfs.blocksize 分割大小約等于hdfs塊大小
在mapreduce2.0以上版本mapreduce.job.maps
屬性會被忽略
MapTask Launch
啟動MapTask
mapreduce應用會向資源管理器請求這個job需要的容器,一個maptask容器請求每一個maptask。一個容器對每一個maptask的請求會嘗試利用map分片的本地性,應用會請求一下數據:
- 請求map split 和container在同一個節點管理器的container
- 如果沒有,請求一個map split 和container在同一個機架上的節點管理器上的container
- 否則請求任意節點管理器上的container
這只是一小部分資源任務。資源任務器在資源任務器既定目標和指定目標沖突的時候,可以忽略本地性。當一個容器被分配一個任務,map就馬上啟動了。
Map階段:一個執行階段的例子
map 相的一個簡要圖:
- 有兩個節點管理器:每一個2GB的內存,每一個map需要1GB我們可以并行運行兩個容器。這是最好的情況,而資源任務器的決策可能會有所不同
- 集群沒有其他的YARN任務運行
- 我們的job有8個map分片,也就是在輸入文件夾中有7個文件,只有一個是大于hdfs塊大小的,需要被拆分為兩個文件。
map任務的執行時間線
現在我們可以聚焦單個的map task:這是單個map的執行時間線:
- 初始相:我們設置map任務
- 執行相:map分片里面的每一個鍵值對進行map()函數運算
- 溢寫相:map的輸出保存在環形內存緩沖區,當緩沖區滿80%(一般80%),啟動溢寫相,將緩沖的數據寫出到磁盤。
- 洗牌相:在溢寫相的結尾,我們合并多有的輸出,并且打包他們以便進行reduce相處理。
map任務:初始化
在初始化階段,我們:
- 創建一個上下文對象(context )(TaskAttemptContext)
- 創建用戶map.class實例
- 設置輸入
- 設置輸出
- 創建mapper的上下文(
MapContext.class
,Mapper.Context.class)
- 初始化輸入也就是:
- 創建
SplitLineReader.class 分片行閱讀器
- 創建HdfsDataInputStream.class hdfs數據輸入流
Map任務:執行階段
執行階段通過 Mapper
class.的run()方法:
用戶可以重寫這個方法,但是默認的時候通常會調用setup而啟動這個程序。這個函數默認并不做什么有用的 事情,但是可以被用戶覆蓋重寫以便于設置任務(例如初始化類的變量),當設置完成之后,分片的每一個鍵值對會激發map()方法。因此map()接收到一個鍵,一個值,以及一個上下文context。使用這個上下文對象,一個map就會存儲其輸出到緩存中。
請注意,map分片是一個快一個塊截取的(例如64kb),每一個快分割成為若干鍵值對的數據( SplitLineReader.class干的好事
),這是在Mapper.Context.nextKeyValue內部完成的。當map分片被全部處理之后,run()會調用clean()方法。默認的,沒有什么會被執行,除非用戶重寫覆蓋他。
map任務:溢寫階段
MapTask.MapOutputBuffer
)。緩沖區的大小是固定的,通過mapreduce.task.io.sort.mb
(default: 100MB)指定。 mapreduce.map. sort.spill.percent
: 默認80% ),溢寫將會被執行(這是一個并行過程,使用的是單獨的線程,緩沖池還可以繼續被寫入)。如果溢寫線程太慢,而緩沖區又忙了的話,map()就會暫停執行而等待。 - 創建一個溢寫記錄
SpillRecord
和一個FSOutputStream
文件輸出流(本地文件系統) - 內存內排序緩沖中的塊:輸出的數據會使用快排算法按照partitionIdx, key排序
- 排序之后的輸出會分割成為分區:每一個分區對應一個reduce
- 分區序列化寫到本地文件
來自: http://blog.csdn.net//mrcharles/article/details/50465626