MapReduce 圖解流程超詳細解答(2)-【map階段】

jopen 8年前發布 | 10K 次閱讀 分布式/云計算/大數據

接上一篇講解:http://blog.csdn.net/mrcharles/article/details/50465626

map任務:溢寫階段

Spilling phase


正如我們在執行階段看到的一樣,map會使用Mapper.Context.write()將map函數的輸出溢寫到內存中的環形緩沖區 (MapTask.MapOutputBuffer)。緩沖區的大小是固定的,通過mapreduce.task.io.sort.mb (default: 100MB)指定。
任何時候當這個緩沖區將要充滿的時候(mapreduce.map. sort.spill.percent: 默認80% ),溢寫將會被執行(這是一個并行過程,使用的是單獨的線程,緩沖池還可以繼續被寫入)。如果溢寫線程太慢,而緩沖區又忙了的話,map()就會暫停執行而等待。
溢寫線程執行下面的動作:
  1. 創建一個溢寫記錄SpillRecord 和一個FSOutputStream 文件輸出流(本地文件系統)
  2. 內存內排序緩沖中的塊:輸出的數據會使用快排算法按照partitionIdx, key排序
  3. 排序之后的輸出會分割成為分區:每一個分區對應一個reduce
  4. 分區序列化寫到本地文件

有多少個reduce任務呢?

一個job的ReduceTasks 的數量是通過配置mapreduce.job.reduces參數設置的

一個輸出元組的分割指數是多少?

輸出元組的分割指數指的是分區的指數。在 Mapper.Context.write()內部被指定:

partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers
隨著輸出元組以元數據的形式保存在環形緩沖區。用戶可以通過配置mapreduce.job.partitioner.class參數自己定制partitioner 

我們什么時候運行combiner

如果用戶制定了一個特定的combiner ,那么,在溢寫線程寫出到文件之前,會在每一個分區含有元組數據的地方執行combiner 通常,我們做了如下事情:

  1. 創建一個用戶指定的Reducer.class實例(用戶指定的combiner 
  2. 創建一個Reducer.Context:輸出將會保存在本地文件系統
  3. 運行Reduce.run():請看reduce 任務的描述

Map任務:執行結束

在執行階段結束的時候,溢寫線程最后被觸發,細節上,我們作如下事情:

  1. 排序,溢寫最后沒有被溢寫的元組數據
  2. 開始SHUFFLE 相
注意,每一次環形緩沖區將要慢時,達到溢寫的百分比的時候,我們就會得到一個溢寫文件(溢寫記錄+輸出文件)。每一個溢寫文件包含若干分區(分段)

map執行后時期:洗牌shuffle

Hadoop (MapReduce): MapTask - Shuffle

在一個map任務完成的最后時刻,所有的溢寫文件會被合并一個分區文件,與相應的reducer對應。mapreduce.io.sort.merge參數控制合并流一次的數量,默認是100。如果至少有3個溢寫文件,combiner將會再次執行。如果只有一個或者兩個溢寫文件,再次執行combiner從而減少map輸出的數據量的大小已經沒有什么效果,沒有必要再次執行combiner了。分區輸出文件通過http的方式提供給reducer。



Hadoop (MapReduce): MapTask - Shuffle (2)

Merger合并

Hadoop (MapReduce): Merger

略粗糙,望幫助大家

Charles 于2016-01-06  Phnom Penh



版權說明:
本文由Charles Dong原創,本人支持開源以及免費有益的傳播,反對商業化謀利。
CSDN博客:http://blog.csdn.net/mrcharles
個人站:http://blog.xingbod.cn
EMAIL:charles@xingbod.cn


來自: http://blog.csdn.net//mrcharles/article/details/50471603

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!