MapReduce V1:MapTask執行流程分析

jdbz8397 8年前發布 | 17K 次閱讀 MapReduce 分布式/云計算/大數據

來自: http://shiyanjun.cn/archives/1457.html

我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。

在文章《 MapReduce V1:TaskTracker設計要點概要分析 》中我們已經了解了org.apache.hadoop.mapred.Child啟動的基本流程,在Child VM啟動的過程中會運行MapTask,實際是運行用戶編寫的MapReduce程序中的map方法中的處理邏輯,我們首先看一下,在Child類中,Child基于TaskUmbilicalProtocol協議與TaskTracker通信,獲取到該Child VM需要加載的Task相關數據,包括Task本身,代碼如下所示:

</div>

final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
        @Override
        public TaskUmbilicalProtocol run() throws Exception { // 建立Child到TaskTracker的RPC連接
          return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
              TaskUmbilicalProtocol.versionID,
              address,
              defaultConf);
        }
    });
... ...
    JvmContext context = new JvmContext(jvmId, pid); // 根據啟動Child VM命令行傳遞的參數構造一個JvmContext對象
... ...
        JvmTask myTask = umbilical.getTask(context); // 基于umbilical獲取到一個JvmTask
... ...
        task = myTask.getTask(); // 通過JvmTask獲取到MapTask或ReduceTask

上面代碼中,JvmTask中就包含了一個Task,也就是task,它可能是MapTask或ReduceTask。看一下在org.apache.hadoop.mapred.Child中運行Task的基本代碼,如下所示:

// Create a final reference to the task for the doAs block
        final Task taskFinal = task;
        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            try {
              // use job-specified working directory
              FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
              taskFinal.run(job, umbilical);        // 這行是核心代碼,運行實際的MapTask或ReduceTask
            } finally {
              TaskLog.syncLogs
                (logLocation, taskid, isCleanup, logIsSegmented(job));
              TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
              trunc.truncateLogs(new JVMInfo(
                  TaskLog.getAttemptDir(taskFinal.getTaskID(),
                    taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
            }

        return null;
      }
    });</pre> 

我們關注執行MapTask,上面,通過調用MapTask的run方法,來實際啟動MapTask的運行。

MapTask整體執行流程

MapTask運行的整體流程,如下圖所示:

上面流程比較直觀,我們結合MapTask的run方法的代碼,來進行分析,代碼如下所示:

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); // 創建TaskReporter對象
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi); // 初始化MapTask

// check if it is a cleanupJobTask
if (jobCleanup) {
  runJobCleanupTask(umbilical, reporter); // 運行JobCleanupTask
  return;
}
if (jobSetup) {
  runJobSetupTask(umbilical, reporter); // 運行JobSetupTask
  return;
}
if (taskCleanup) {
  runTaskCleanupTask(umbilical, reporter); // 運行TaskCleanupTask
  return;
}

if (useNewApi) {
  runNewMapper(job, splitMetaInfo, umbilical, reporter); // 運行MapTask
} else {
  runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter); // Task運行完成

}</pre>

上面代碼中,run方法的參數TaskUmbilicalProtocol umbilical表示一個RPC代理對象,通過該對象可以與TaskTracker進行通信,從而在Task運行過程中,能夠將Task的運行進度、狀態信息匯報給TaskTracker。

通過上面看出,啟動一個MapTask,可能運行的是JobCleanupTask、JobSetupTask、TaskCleanupTask、Mapper四種Task之中的一種,運行每種Task都會向TaskTracker進行匯報。關于一個MapTask如何被劃分的,可以參考JobTracker端在JobInProgress中initTasks()方法。

下面,我們根據MapTask中run方法中的處理流程,分為如下幾個子流程,進行詳細分析:

</div>

初始化Task分析

這里,主要分析在MapTask的run方法中,調用initialize方法的初始化邏輯。先看在調用initialize方法之前,首先創建了一個TaskReporter線程對象,該對象又是基于TaskUmbilicalProtocol umbilical來實現將Task運行狀態匯報給TaskTracker。在initialize方法中的初始化流程如下所示:

  1. 根據JobConf job與JobID id,以及TaskReporter,創建一個JobContext對象
  2. 創建一個TaskAttemptContext對象
  3. 如果Task狀態為TaskStatus.State.UNASSIGNED,修改為TaskStatus.State.RUNNING
  4. 根據JobConf創建OutputFormat,以及OutputCommitter
  5. 為該Task創建Job的輸出目錄等內容
  6. 初始化ResourceCalculatorPlugin,用來計算Task運行過程中對節點資源的使用情況

運行JobCleanupTask

執行JobCleanupTask,主要是清理Job運行過程中產生的數據,因為該Task可能上次運行過一次,但是失敗,為了下一次重新運行,需要將之前失敗Task的數據清理掉。即使Job運行成功,也需要清理MapTask執行后輸出的中間結果數據,具體流程,如下圖所示:

上面的序列圖比較詳細,將Task運行過程中與TaskTracker之間進行通信都描述出來,我們總結如下幾個要點:

  • 運行JobCleanupTask首先將當前Task運行階段設置為CLEANUP,并向TaskTracker匯報狀態變更
  • 如果Job的狀態是JobStatus.State.FAILED,則刪除在該節點上運行Task所產生的臨時數據
  • 如果Job的狀態是JobStatus.State.KILLED,同樣刪除臨時數據,并在該Job對應的目錄下創建_SUCCESS文件,標識Job成功
  • 最后,向TaskTracker匯報狀態,更新相關TIP數據結構狀態,并釋放鎖占用的資源,以供其他Task運行所使用

運行JobSetupTask

運行JobSetupTask,主要是初始化Job對應的臨時目錄,為后續運行Task做準備,具體處理流程,如下圖所示:

在創建運行Job的基本臨時目錄以后,也需要與TaskTracker通信,匯報該Task的運行狀態。

運行TaskCleanupTask

TaskCleanupTask與JobCleanupTask類似,主要是清理Task運行過程中產生的一些臨時目錄和文件,具體流程如下圖所示:

運行MapTask

MapTask執行的核心邏輯在runNewMapper方法中,該方法中對應的處理流程,如下圖所示:

方法runNewMapper的聲明,如下所示:

private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException

其中,JobConf job包含了該Job的配置信息,TaskSplitIndex splitIndex包含了該MapTask所處理的InputSplit的信息(包括splitLocation和startOffset),TaskUmbilicalProtocol umbilical是與TaskTracker通信的代理對象,TaskReporter reporter是一個與TaskTracker通信的線程。上面的序列圖,所描述的具體處理流程,如下所示:

  1. 創建TaskAttemptContext對象
  2. 通過反射,創建Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>對象
  3. 通過反射,創建InputFormat<INKEY,INVALUE>對象
  4. 根據TaskSplitIndex splitIndex ,創建InputSplit對象
  5. 創建RecordReader<INKEY,INVALUE>對象,用來讀取輸入的InputSplit對應的文件
  6. 創建RecordWriter<K, V>對象,用來將處理后的數據寫入文件系統
  7. 創建Mapper.Context對象,可以在編寫MapReduce程序中,實現Mapper時使用
  8. 初始化RecordReader<INKEY,INVALUE>對象
  9. 執行Mapper的run方法,調用用戶編寫的MapReduce程序的Mapper中的處理邏輯,內部循環調用map方法
  10. 回收資源,關閉相關的流對象

下面,我們詳細看一下,Mapper處理過程中相關的要點:

  • Mapper.Context結構

Mapper.Context是Mapper類的一個內部類,它包含了運行一個MapTask過程中所需要的所有上下文信息,該類的繼承層次結構,如下圖所示:

可以看出,一個Mapper對應的執行上下文信息,繼承了該Mapper對應TaskAttempt的上下文信息,再向上繼承了Job的上下文信息,一個Job包含的配置信息都可以被一個Mapper讀取到。

MapperContext中包含如下信息:

private RecordReader<KEYIN,VALUEIN> reader;
  private InputSplit split;

通過使用一個RecordReader,能夠讀取InputSplit對應的HDFS上的Block文件數據。TaskInputOutputContext中包含如下信息:

private RecordWriter<KEYOUT,VALUEOUT> output;
  private StatusReporter reporter;
  private OutputCommitter committer;

可見,在該層能夠實現將Task運行的統計,通過StatusReporter以Counter的形式收集,并提供進度(Progress)獲取接口。同時,使用RecordWriter將Mapper的輸出寫入到文件系統。OutputCommitter是一個比較重要的對象,該抽象類代碼,如下所示:

public abstract class OutputCommitter {

public abstract void setupJob(JobContext jobContext) throws IOException;

public void commitJob(JobContext jobContext) throws IOException { cleanupJob(jobContext); }

@Deprecated public void cleanupJob(JobContext context) throws IOException { }

public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { cleanupJob(jobContext); }

public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;

public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;

public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;

public abstract void abortTask(TaskAttemptContext taskContext) throws IOException; }</pre>

可以參考實現類FileOutputCommitter,它主要負責,在Job運行過程中,管理Task執行過程中對應的文件或目錄的信息,如開始運行之前創建目錄,運行完成后將臨時有用的文件移動到供Job共享的目錄下,也會執行無用臨時文件的清理。

TaskAttemptContext中主要是有關一個TaskAttempt的相關信息,包括TaskAttemptID及其狀態信息。

JobContext主要包括一個Job的相關配置信息,如MapReduce Job的輸入輸出規格(輸入輸出KV格式)、Mapper類信息、輸入文件格式信息,還有JobID信息。

</div>

  • Mapper處理流程

通過調用mapper.run(mapperContext);進入Mapper的執行流程,我們首先看一下,Mapper提供的接口的聲明,代碼如下所示:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { // Mapper.Context是Mapper運行時上下文對象,包含了運行MapTask所需要的基本信息 public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } }

/**

  • Called once at the beginning of the task. */ protected void setup(Context context) throws IOException, InterruptedException { // 執行map方法之前,進行初始化工作 // NOTHING }

    /**

  • Called once for each key/value pair in the input split. Most applications
  • should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // 用戶實際實現的map處理邏輯 context.write((KEYOUT) key, (VALUEOUT) value); }

    /**

  • Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // 執行map完成之后,進行清理 // NOTHING }

    /**

  • Expert users can override this method for more complete control over the
  • execution of the Mapper.
  • @param context
  • @throws IOException */ public void run(Context context) throws IOException, InterruptedException { // 該方法是程序的驅動入庫,循環調用map方法處理每一個鍵值對數據 setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }</pre>

    上面run方法中,輸入的InputSplit對應的每一個記錄,循環調用map方法,map方法中是用戶實現的MapReduce的Mapper中的代碼。

    Mapper輸出流程分析

    在runNewMapper方法中可以看到,如果Job有0個ReduceTask,則Mapper將結果直接寫入到HDFS文件;而如果Job有大于0個ReduceTask,則Mapper將結果輸出到本地,是中間結果。代碼如下所示:

    if (job.getNumReduceTasks() == 0) {

    output =
      new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    

    } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }</pre>

    上面,NewDirectOutputCollector實際上將Mapper的輸出直接寫入到HDFS文件;而NewOutputCollector將Mapper輸出寫入到本地文件,我們看一下NewOutputCollector是如何實現Mapper輸出落地的,它的構造方法,代碼如下所示:

    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

                  JobConf job,
                  TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter) throws IOException, ClassNotFoundException {
    

    collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); // partitions = jobContext.getNumReduceTasks(); // 根據ReduceTask個數計算Mapper輸出分區個數 if (partitions > 0) { // 分區個數大于0,創建一個Partitioner對象 partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {

     @Override
     public int getPartition(K key, V value, int numPartitions) {
       return -1;
     }
    

    }; } }</pre>

    可以看出,上面的collector里面的邏輯非常核心,它是MapOutputBuffer類實例,該類實現也是相對復雜的。接著,我們從如下幾個方面詳細分析:

    • 使用Buffer存儲鍵值對數據

    Mapper輸出后,會首先將鍵值對數據寫入到一個Buffer中,通過其它的幾個Buffer來跟蹤鍵值對的在Buffer中偏移位置信息,以及鍵值對所屬的分區(Partition)。我們看一下NewOutputCollector調用write方法輸出鍵值對數據的邏輯:

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
     collector.collect(key, value, partitioner.getPartition(key, value, partitions));
    }

    通過partitioner計算一個輸出鍵值對所屬的分區,然后調用MapOutputBuffer的collect方法實現Mapper中間結果數據輸出。下面,我們看MapOutputBuffer涉及到的一些存儲結構,其中包含3個buffer,如下代碼所示:

    private final int[] kvoffsets;     // indices into kvindices
    private final int[] kvindices;     // partition, k/v offsets into kvbuffer
    private byte[] kvbuffer;

    上面的數組結構,kvbuffer用來存儲Map輸出的鍵值對數據,kvoffsets用來存儲Map輸出的鍵值對數據在kvbuffer中的偏移位置,kvindices用來記錄一個鍵值對對應的分區、鍵開始、值開始位置的數據結構,具體我們會在后面詳細說明。首先,看一下用來定義和限制這些buffer的一些基本默認配置參數:

    io.sort.mb             = 100MB
    io.sort.record.percent = 0.05
    io.sort.spill.percent  = 0.8

sortmb = 100MB rEcper = 0.05 spillper = 0.8

RECSIZE = 16 ACCTSIZE = 3</pre>

基于這些配置的參數,計算得到一些buffer的大小及其限制配置,為了直觀我們通過使用MB來表示大小,如下所示:

maxMemUsage = sortmb <<20 = 104857600 = 100MB
recordCapacity = maxMemUsage  recper = 104857600  0.05 = 5242880 = 5MB
recordCapacity -= recordCapacity % RECSIZE = 5242880 - 5242880 % 16 = 5242880 = 5MB
maxMemUsage – recordCapacity = 104857600 - 5242880 = 99614720 = 95MB

softBufferLimit = ((maxMemUsage – recordCapacity) spillper) = 99614720 0.8 = 79691776 = 76MB softRecordLimit = (recordCapacity spillper) = 5242880 0.8 = 4194304 = 4MB</pre>

現在,我們看一下這些buffer的結構即相關配置參數,如下圖所示:

Mapper分別讀取輸入Split的每一行數據,然后輸出鍵值對數據,這些鍵值對數據在上面講述到的Buffer中的存儲細節,如下圖所示:

上圖中,輸出了2個鍵值對數據,首先kvbuffer中會順序存儲每一個鍵值對數據,同時,kvoffsets會記錄每一個鍵值對的順序編號,或者說,在kvbuffer中的偏移位置,在kvindices中包含了每一個鍵值對的三個屬性信息:所屬分區(kvi partition)、鍵起始偏移量(ki start)、值起始偏移量(vi start)。

</div>

  • 將Buffer中鍵值對數據寫入本地磁盤文件

當內存中的鍵值對數據的存儲量在上述Buffer中達到一定限制(通過配置io.sort.spill.percent決定,默認是大約95MB的Buffer存儲容量達到80%,即大約76MB時會觸發將Buffer中數據寫入磁盤文件)時,就會通過SpillThread線程,將Buffer中的數據寫入本地磁盤文件中,可以在SpillThread類中的sortAndSpill方法中看到具體實現。在將Buffer中的鍵值對數據寫入磁盤之前,先進行一次內存排序,排序的規則是:MapOutputBuffer內部有3個Buffer,排序是對鍵值對偏移位置的Buffer kvoffsets進行排序,保證每一個鍵值對所屬的分區(Partition)按照升序排序,然后再保證每個分區(Partition)中的鍵值對按照鍵進行排序,這樣最后得到的kvoffsets中的鍵值對數據就是按照分區進行分組,并且每個分組中是按照鍵排過序的。

如果用戶編寫的MapReduce程序,指定了Combiner,則再排序之后,寫入磁盤文件之前,調用Combiner對數據進行合并,具體可以參考CombinerRunner實現類。

經過幾次填充Buffer,達到配置寫入容量值會寫入文件,最終得到多個分區的、按照鍵(Key)排序的文件,在所有數據都輸出結束后,會對上述生成的多個文件進行合并,合并成一個大文件,該大文件也是經過分區的、排序的文件。在最后的合并階段,也會檢查用戶是否配置了Combiner,如果是則會對每個分區的數據做一個局部化簡合并,輸出到最終的一個文件中,并將多個小的分區文件刪除。

本文基于 署名-非商業性使用-相同方式共享 4.0 許可協議發布,歡迎轉載、使用、重新發布,但務必保留文章署名時延軍(包含鏈接:http://shiyanjun.cn),不得用于商業目的,基于本文修改后的作品務必以相同的許可發布。如有任何疑問,請與我聯系。

 本文由用戶 jdbz8397 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!