MapReduce V1:MapTask執行流程分析
來自: http://shiyanjun.cn/archives/1457.html
我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。
在文章《 MapReduce V1:TaskTracker設計要點概要分析 》中我們已經了解了org.apache.hadoop.mapred.Child啟動的基本流程,在Child VM啟動的過程中會運行MapTask,實際是運行用戶編寫的MapReduce程序中的map方法中的處理邏輯,我們首先看一下,在Child類中,Child基于TaskUmbilicalProtocol協議與TaskTracker通信,獲取到該Child VM需要加載的Task相關數據,包括Task本身,代碼如下所示:
</div>
final TaskUmbilicalProtocol umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() { @Override public TaskUmbilicalProtocol run() throws Exception { // 建立Child到TaskTracker的RPC連接 return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID, address, defaultConf); } }); ... ... JvmContext context = new JvmContext(jvmId, pid); // 根據啟動Child VM命令行傳遞的參數構造一個JvmContext對象 ... ... JvmTask myTask = umbilical.getTask(context); // 基于umbilical獲取到一個JvmTask ... ... task = myTask.getTask(); // 通過JvmTask獲取到MapTask或ReduceTask
上面代碼中,JvmTask中就包含了一個Task,也就是task,它可能是MapTask或ReduceTask。看一下在org.apache.hadoop.mapred.Child中運行Task的基本代碼,如下所示:
// Create a final reference to the task for the doAs block final Task taskFinal = task; childUGI.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { try { // use job-specified working directory FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); taskFinal.run(job, umbilical); // 這行是核心代碼,運行實際的MapTask或ReduceTask } finally { TaskLog.syncLogs (logLocation, taskid, isCleanup, logIsSegmented(job)); TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf); trunc.truncateLogs(new JVMInfo( TaskLog.getAttemptDir(taskFinal.getTaskID(), taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal))); }return null; } });</pre>
我們關注執行MapTask,上面,通過調用MapTask的run方法,來實際啟動MapTask的運行。
MapTask整體執行流程
MapTask運行的整體流程,如下圖所示:
![]()
上面流程比較直觀,我們結合MapTask的run方法的代碼,來進行分析,代碼如下所示:
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical;// start thread that will handle communication with parent TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); // 創建TaskReporter對象 reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // 初始化MapTask // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); // 運行JobCleanupTask return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); // 運行JobSetupTask return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); // 運行TaskCleanupTask return; } if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); // 運行MapTask } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); // Task運行完成
}</pre>
上面代碼中,run方法的參數TaskUmbilicalProtocol umbilical表示一個RPC代理對象,通過該對象可以與TaskTracker進行通信,從而在Task運行過程中,能夠將Task的運行進度、狀態信息匯報給TaskTracker。
通過上面看出,啟動一個MapTask,可能運行的是JobCleanupTask、JobSetupTask、TaskCleanupTask、Mapper四種Task之中的一種,運行每種Task都會向TaskTracker進行匯報。關于一個MapTask如何被劃分的,可以參考JobTracker端在JobInProgress中initTasks()方法。
下面,我們根據MapTask中run方法中的處理流程,分為如下幾個子流程,進行詳細分析:
</div>
初始化Task分析
這里,主要分析在MapTask的run方法中,調用initialize方法的初始化邏輯。先看在調用initialize方法之前,首先創建了一個TaskReporter線程對象,該對象又是基于TaskUmbilicalProtocol umbilical來實現將Task運行狀態匯報給TaskTracker。在initialize方法中的初始化流程如下所示:
- 根據JobConf job與JobID id,以及TaskReporter,創建一個JobContext對象
- 創建一個TaskAttemptContext對象
- 如果Task狀態為TaskStatus.State.UNASSIGNED,修改為TaskStatus.State.RUNNING
- 根據JobConf創建OutputFormat,以及OutputCommitter
- 為該Task創建Job的輸出目錄等內容
- 初始化ResourceCalculatorPlugin,用來計算Task運行過程中對節點資源的使用情況
運行JobCleanupTask
執行JobCleanupTask,主要是清理Job運行過程中產生的數據,因為該Task可能上次運行過一次,但是失敗,為了下一次重新運行,需要將之前失敗Task的數據清理掉。即使Job運行成功,也需要清理MapTask執行后輸出的中間結果數據,具體流程,如下圖所示:
![]()
上面的序列圖比較詳細,將Task運行過程中與TaskTracker之間進行通信都描述出來,我們總結如下幾個要點:
- 運行JobCleanupTask首先將當前Task運行階段設置為CLEANUP,并向TaskTracker匯報狀態變更
- 如果Job的狀態是JobStatus.State.FAILED,則刪除在該節點上運行Task所產生的臨時數據
- 如果Job的狀態是JobStatus.State.KILLED,同樣刪除臨時數據,并在該Job對應的目錄下創建_SUCCESS文件,標識Job成功
- 最后,向TaskTracker匯報狀態,更新相關TIP數據結構狀態,并釋放鎖占用的資源,以供其他Task運行所使用
運行JobSetupTask
運行JobSetupTask,主要是初始化Job對應的臨時目錄,為后續運行Task做準備,具體處理流程,如下圖所示:
![]()
在創建運行Job的基本臨時目錄以后,也需要與TaskTracker通信,匯報該Task的運行狀態。
運行TaskCleanupTask
TaskCleanupTask與JobCleanupTask類似,主要是清理Task運行過程中產生的一些臨時目錄和文件,具體流程如下圖所示:
![]()
運行MapTask
MapTask執行的核心邏輯在runNewMapper方法中,該方法中對應的處理流程,如下圖所示:
![]()
方法runNewMapper的聲明,如下所示:
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException其中,JobConf job包含了該Job的配置信息,TaskSplitIndex splitIndex包含了該MapTask所處理的InputSplit的信息(包括splitLocation和startOffset),TaskUmbilicalProtocol umbilical是與TaskTracker通信的代理對象,TaskReporter reporter是一個與TaskTracker通信的線程。上面的序列圖,所描述的具體處理流程,如下所示:
- 創建TaskAttemptContext對象
- 通過反射,創建Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>對象
- 通過反射,創建InputFormat<INKEY,INVALUE>對象
- 根據TaskSplitIndex splitIndex ,創建InputSplit對象
- 創建RecordReader<INKEY,INVALUE>對象,用來讀取輸入的InputSplit對應的文件
- 創建RecordWriter<K, V>對象,用來將處理后的數據寫入文件系統
- 創建Mapper.Context對象,可以在編寫MapReduce程序中,實現Mapper時使用
- 初始化RecordReader<INKEY,INVALUE>對象
- 執行Mapper的run方法,調用用戶編寫的MapReduce程序的Mapper中的處理邏輯,內部循環調用map方法
- 回收資源,關閉相關的流對象
下面,我們詳細看一下,Mapper處理過程中相關的要點:
- Mapper.Context結構
Mapper.Context是Mapper類的一個內部類,它包含了運行一個MapTask過程中所需要的所有上下文信息,該類的繼承層次結構,如下圖所示:
![]()
可以看出,一個Mapper對應的執行上下文信息,繼承了該Mapper對應TaskAttempt的上下文信息,再向上繼承了Job的上下文信息,一個Job包含的配置信息都可以被一個Mapper讀取到。
MapperContext中包含如下信息:
private RecordReader<KEYIN,VALUEIN> reader; private InputSplit split;通過使用一個RecordReader,能夠讀取InputSplit對應的HDFS上的Block文件數據。TaskInputOutputContext中包含如下信息:
private RecordWriter<KEYOUT,VALUEOUT> output; private StatusReporter reporter; private OutputCommitter committer;可見,在該層能夠實現將Task運行的統計,通過StatusReporter以Counter的形式收集,并提供進度(Progress)獲取接口。同時,使用RecordWriter將Mapper的輸出寫入到文件系統。OutputCommitter是一個比較重要的對象,該抽象類代碼,如下所示:
public abstract class OutputCommitter {public abstract void setupJob(JobContext jobContext) throws IOException;
public void commitJob(JobContext jobContext) throws IOException { cleanupJob(jobContext); }
@Deprecated public void cleanupJob(JobContext context) throws IOException { }
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { cleanupJob(jobContext); }
public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;
public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;
public abstract void abortTask(TaskAttemptContext taskContext) throws IOException; }</pre>
可以參考實現類FileOutputCommitter,它主要負責,在Job運行過程中,管理Task執行過程中對應的文件或目錄的信息,如開始運行之前創建目錄,運行完成后將臨時有用的文件移動到供Job共享的目錄下,也會執行無用臨時文件的清理。
TaskAttemptContext中主要是有關一個TaskAttempt的相關信息,包括TaskAttemptID及其狀態信息。
JobContext主要包括一個Job的相關配置信息,如MapReduce Job的輸入輸出規格(輸入輸出KV格式)、Mapper類信息、輸入文件格式信息,還有JobID信息。
</div>
- Mapper處理流程
通過調用mapper.run(mapperContext);進入Mapper的執行流程,我們首先看一下,Mapper提供的接口的聲明,代碼如下所示:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { // Mapper.Context是Mapper運行時上下文對象,包含了運行MapTask所需要的基本信息 public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } }
/**
Called once at the beginning of the task. */ protected void setup(Context context) throws IOException, InterruptedException { // 執行map方法之前,進行初始化工作 // NOTHING }
/**
- Called once for each key/value pair in the input split. Most applications
should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // 用戶實際實現的map處理邏輯 context.write((KEYOUT) key, (VALUEOUT) value); }
/**
Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // 執行map完成之后,進行清理 // NOTHING }
/**
- Expert users can override this method for more complete control over the
- execution of the Mapper.
- @param context
@throws IOException */ public void run(Context context) throws IOException, InterruptedException { // 該方法是程序的驅動入庫,循環調用map方法處理每一個鍵值對數據 setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }</pre>
上面run方法中,輸入的InputSplit對應的每一個記錄,循環調用map方法,map方法中是用戶實現的MapReduce的Mapper中的代碼。
Mapper輸出流程分析
在runNewMapper方法中可以看到,如果Job有0個ReduceTask,則Mapper將結果直接寫入到HDFS文件;而如果Job有大于0個ReduceTask,則Mapper將結果輸出到本地,是中間結果。代碼如下所示:
if (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }</pre>
上面,NewDirectOutputCollector實際上將Mapper的輸出直接寫入到HDFS文件;而NewOutputCollector將Mapper輸出寫入到本地文件,我們看一下NewOutputCollector是如何實現Mapper輸出落地的,它的構造方法,代碼如下所示:
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); // partitions = jobContext.getNumReduceTasks(); // 根據ReduceTask個數計算Mapper輸出分區個數 if (partitions > 0) { // 分區個數大于0,創建一個Partitioner對象 partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override public int getPartition(K key, V value, int numPartitions) { return -1; }
}; } }</pre>
可以看出,上面的collector里面的邏輯非常核心,它是MapOutputBuffer類實例,該類實現也是相對復雜的。接著,我們從如下幾個方面詳細分析:
- 使用Buffer存儲鍵值對數據
Mapper輸出后,會首先將鍵值對數據寫入到一個Buffer中,通過其它的幾個Buffer來跟蹤鍵值對的在Buffer中偏移位置信息,以及鍵值對所屬的分區(Partition)。我們看一下NewOutputCollector調用write方法輸出鍵值對數據的邏輯:
@Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }通過partitioner計算一個輸出鍵值對所屬的分區,然后調用MapOutputBuffer的collect方法實現Mapper中間結果數據輸出。下面,我們看MapOutputBuffer涉及到的一些存儲結構,其中包含3個buffer,如下代碼所示:
private final int[] kvoffsets; // indices into kvindices private final int[] kvindices; // partition, k/v offsets into kvbuffer private byte[] kvbuffer;上面的數組結構,kvbuffer用來存儲Map輸出的鍵值對數據,kvoffsets用來存儲Map輸出的鍵值對數據在kvbuffer中的偏移位置,kvindices用來記錄一個鍵值對對應的分區、鍵開始、值開始位置的數據結構,具體我們會在后面詳細說明。首先,看一下用來定義和限制這些buffer的一些基本默認配置參數:
io.sort.mb = 100MB io.sort.record.percent = 0.05 io.sort.spill.percent = 0.8sortmb = 100MB rEcper = 0.05 spillper = 0.8
RECSIZE = 16 ACCTSIZE = 3</pre>
基于這些配置的參數,計算得到一些buffer的大小及其限制配置,為了直觀我們通過使用MB來表示大小,如下所示:
maxMemUsage = sortmb <<20 = 104857600 = 100MB recordCapacity = maxMemUsage recper = 104857600 0.05 = 5242880 = 5MB recordCapacity -= recordCapacity % RECSIZE = 5242880 - 5242880 % 16 = 5242880 = 5MB maxMemUsage – recordCapacity = 104857600 - 5242880 = 99614720 = 95MBsoftBufferLimit = ((maxMemUsage – recordCapacity) spillper) = 99614720 0.8 = 79691776 = 76MB softRecordLimit = (recordCapacity spillper) = 5242880 0.8 = 4194304 = 4MB</pre>
現在,我們看一下這些buffer的結構即相關配置參數,如下圖所示:
![]()
Mapper分別讀取輸入Split的每一行數據,然后輸出鍵值對數據,這些鍵值對數據在上面講述到的Buffer中的存儲細節,如下圖所示:
![]()
上圖中,輸出了2個鍵值對數據,首先kvbuffer中會順序存儲每一個鍵值對數據,同時,kvoffsets會記錄每一個鍵值對的順序編號,或者說,在kvbuffer中的偏移位置,在kvindices中包含了每一個鍵值對的三個屬性信息:所屬分區(kvi partition)、鍵起始偏移量(ki start)、值起始偏移量(vi start)。
</div>
- 將Buffer中鍵值對數據寫入本地磁盤文件
當內存中的鍵值對數據的存儲量在上述Buffer中達到一定限制(通過配置io.sort.spill.percent決定,默認是大約95MB的Buffer存儲容量達到80%,即大約76MB時會觸發將Buffer中數據寫入磁盤文件)時,就會通過SpillThread線程,將Buffer中的數據寫入本地磁盤文件中,可以在SpillThread類中的sortAndSpill方法中看到具體實現。在將Buffer中的鍵值對數據寫入磁盤之前,先進行一次內存排序,排序的規則是:MapOutputBuffer內部有3個Buffer,排序是對鍵值對偏移位置的Buffer kvoffsets進行排序,保證每一個鍵值對所屬的分區(Partition)按照升序排序,然后再保證每個分區(Partition)中的鍵值對按照鍵進行排序,這樣最后得到的kvoffsets中的鍵值對數據就是按照分區進行分組,并且每個分組中是按照鍵排過序的。
如果用戶編寫的MapReduce程序,指定了Combiner,則再排序之后,寫入磁盤文件之前,調用Combiner對數據進行合并,具體可以參考CombinerRunner實現類。
經過幾次填充Buffer,達到配置寫入容量值會寫入文件,最終得到多個分區的、按照鍵(Key)排序的文件,在所有數據都輸出結束后,會對上述生成的多個文件進行合并,合并成一個大文件,該大文件也是經過分區的、排序的文件。在最后的合并階段,也會檢查用戶是否配置了Combiner,如果是則會對每個分區的數據做一個局部化簡合并,輸出到最終的一個文件中,并將多個小的分區文件刪除。
![]()
本文基于 署名-非商業性使用-相同方式共享 4.0 許可協議發布,歡迎轉載、使用、重新發布,但務必保留文章署名時延軍(包含鏈接:http://shiyanjun.cn),不得用于商業目的,基于本文修改后的作品務必以相同的許可發布。如有任何疑問,請與我聯系。
本文由用戶 jdbz8397 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!相關經驗