MapReduce 圖解流程超詳細解答(1)-【map階段】

jopen 8年前發布 | 39K 次閱讀 分布式/云計算/大數據

在MapReduce中,一個YARN  應用被稱作一個job, MapReduce 框架提供的應用,master的一個實現被稱作MRAppMaster</span></span>

MapReduce Job的時間線

Timeline MapReduce Job


MapReduce Job  運行的時間線:

  • Map Phase:若干 Map Tasks 被執行
  • Reduce Phase: 若干Reduce Tasks 被執行

reduce可能會在map階段結束之前開始執行,因此上面顯示的有重疊的地方。

Map Phase

現在我們集中考察map相,一個關鍵的問題是一個應用需要多少map任務去運行現在的這個job

用戶給了我們什么?

我們退回到之前的一步,當一個用戶提交一個應用的時候,若干信息被提供給了YARN ,分別是:


  • 一個配置:這可以是一部分的,因為一些參數不需要用戶特別指定,可以有自己的默認值。
  • 一個jar文件,含有一個map,一個combiner,一個reduce
  • 一個輸入和輸出信息 輸入目錄 是不是在hdfs上,有多少文件呢?輸出的時候,我們存儲在哪里

The number of files inside the input directory is used for deciding the number of Map Tasks of a job.
那么,輸入的目錄中文件的數量決定多少個map會被運行起來

多少個map任務?

應用針對每一個分片運行一個map,一般而言,對于每一個輸入的文件會有一個map split。如果輸入文件太大,超過了hdfs塊的大小(64M)那么對于同一個輸入文件我們會有多余2個的map運行起來。下面是FileInputFormat class 的getSplits()的偽代碼: 

num_splits = 0
for each input file f:
   remaining = f.length
   while remaining / split_size > split_slope:
      num_splits += 1
      remaining -= split_size

where:

split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小約等于hdfs塊大小

在mapreduce2.0以上版本mapreduce.job.maps 屬性會被忽略

MapTask Launch
啟動MapTask

mapreduce應用會向資源管理器請求這個job需要的容器,一個maptask容器請求每一個maptask。一個容器對每一個maptask的請求會嘗試利用map分片的本地性,應用會請求一下數據:

  • 請求map split 和container在同一個節點管理器的container 
  • 如果沒有,請求一個map split 和container在同一個機架上的節點管理器上的container
  • 否則請求任意節點管理器上的container

這只是一小部分資源任務。資源任務器在資源任務器既定目標和指定目標沖突的時候,可以忽略本地性。當一個容器被分配一個任務,map就馬上啟動了。

Map階段:一個執行階段的例子

Map Phase execution

map 相的一個簡要圖:


  • 有兩個節點管理器:每一個2GB的內存,每一個map需要1GB我們可以并行運行兩個容器。這是最好的情況,而資源任務器的決策可能會有所不同
  • 集群沒有其他的YARN任務運行
  • 我們的job有8個map分片,也就是在輸入文件夾中有7個文件,只有一個是大于hdfs塊大小的,需要被拆分為兩個文件。


map任務的執行時間線

Map Task Execution Timeline

現在我們可以聚焦單個的map task:這是單個map的執行時間線:

  • 初始相:我們設置map任務
  • 執行相:map分片里面的每一個鍵值對進行map()函數運算
  • 溢寫相:map的輸出保存在環形內存緩沖區,當緩沖區滿80%(一般80%),啟動溢寫相,將緩沖的數據寫出到磁盤。
  • 洗牌相:在溢寫相的結尾,我們合并多有的輸出,并且打包他們以便進行reduce相處理。


map任務:初始化

在初始化階段,我們:


  1. 創建一個上下文對象(context )(TaskAttemptContext
  2. 創建用戶map.class實例
  3. 設置輸入
  4. 設置輸出
  5. 創建mapper的上下文(MapContext.classMapper.Context.class)
  6. 初始化輸入也就是:
  7. 創建 SplitLineReader.class 分片行閱讀器
  8. 創建HdfsDataInputStream.class hdfs數據輸入流


Map任務:執行階段

MapTask execution

執行階段通過 Mapper class.的run()方法:

用戶可以重寫這個方法,但是默認的時候通常會調用setup而啟動這個程序。這個函數默認并不做什么有用的 事情,但是可以被用戶覆蓋重寫以便于設置任務(例如初始化類的變量),當設置完成之后,分片的每一個鍵值對會激發map()方法。因此map()接收到一個鍵,一個值,以及一個上下文context。使用這個上下文對象,一個map就會存儲其輸出到緩存中。

請注意,map分片是一個快一個塊截取的(例如64kb),每一個快分割成為若干鍵值對的數據( SplitLineReader.class干的好事),這是在Mapper.Context.nextKeyValue內部完成的。當map分片被全部處理之后,run()會調用clean()方法。默認的,沒有什么會被執行,除非用戶重寫覆蓋他。


map任務:溢寫階段

Spilling phase


正如我們在執行階段看到的一樣,map會使用Mapper.Context.write()將map函數的輸出溢寫到內存中的環形緩沖區 (MapTask.MapOutputBuffer)。緩沖區的大小是固定的,通過mapreduce.task.io.sort.mb (default: 100MB)指定。
任何時候當這個緩沖區將要充滿的時候(mapreduce.map. sort.spill.percent: 默認80% ),溢寫將會被執行(這是一個并行過程,使用的是單獨的線程,緩沖池還可以繼續被寫入)。如果溢寫線程太慢,而緩沖區又忙了的話,map()就會暫停執行而等待。
溢寫線程執行下面的動作:
  1. 創建一個溢寫記錄SpillRecord 和一個FSOutputStream 文件輸出流(本地文件系統)
  2. 內存內排序緩沖中的塊:輸出的數據會使用快排算法按照partitionIdx, key排序
  3. 排序之后的輸出會分割成為分區:每一個分區對應一個reduce
  4. 分區序列化寫到本地文件

來自: http://blog.csdn.net//mrcharles/article/details/50465626

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!