TaskTracker端啟動Task流程分析
我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。
TaskTracker周期性地向JobTracker發送心跳報告,在RPC調用返回結果后,解析結果得到JobTracker下發的運行Task的指令,即LaunchTaskAction,就會在TaskTracker節點上準備運行這個Task。Task的運行是在一個與TaskTracker進程隔離的JVM實例中執行,該JVM實例是通過org.apache.hadoop.mapred.Child來創建的,所以在創建Child VM實例之前,需要做大量的準備工作來啟動Task運行。一個Task的啟動過程,如下序列圖所示:
通過上圖,結合源碼,我們將一個Task啟動的過程,分為下面3個主要的步驟:
1.初始化跟蹤Task運行的相關數據結構
2. 準備Task運行所共享的Job資源
3. 啟動Task
下面,我們詳細分析上面3個步驟的流程:
初始化跟蹤Task運行的相關數據結構
如果是LaunchTaskAction,則TaskTracker會將該指令加入到一個啟動Task的隊列中,進行一步加載處理,如下所示:
``````````````
根據Task的類型,分別加入到對應類型的TaskLauncher的隊列中。這里需要了解一下TaskLauncher線程類,在TaskTracker中創建了2個TaskLauncher線程,一個是為啟動MapTask,另一個是為啟動ReduceTask。下面是TaskLauncher類的構造方法:
構造方法中,參數taskType表示Task類型,分為MapTask和ReduceTask,參數numSlots表示對每一種類型的Task每個TaskTracker上最多可以啟動的Task的實例數,默認都是2個。在TaskTracker初始化時,會讀取mapred-site.xml配置文件,讀取mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum配置的參數值,分別賦值給maxMapSlots和maxReduceSlots這2個屬性,如下TaskTracker構造方法中初始化這2個屬性:
然后,在TaskTracker創建時,會根據上述maxMapSlots和maxReduceSlots的值來創建并啟動2個TaskLauncher線程:
將LaunchTaskAction加入到TaskLauncher的隊列中,這個是調用TaskLauncher的addToTaskQueue()方法:
上面方法中,最關鍵的就是registerTask()方法,調用該方法來初始化TaskTracker端Task對應TaskInProgress結構,代碼如下所示:
上面方法中,tasks隊列用來記錄該TaskTracker上所有的Task,包括正在運行和已經完成的Task,而隊列runningTasks則表示當前TaskTracker上正在運行的Task。同時,通過mapTotal和reduceTotal來分別記錄當前TaskTracker上運行的總的MapTask和ReduceTask的數量。
根據LaunchTaskAction創建的TaskInProgress結構被加入到隊列tasksToLaunch中,然后通知TaskLauncher線程,在方法run中檢測并取出隊列中TaskInProgress對象,并判斷當前TaskTracker的資源狀態能否啟動一個Task,如果可以則調用startNewTask()方法啟動Task,代碼如下所示:
這樣,當前TaskTracker所在節點的資源狀態,和Task對應的TIP狀態都已經滿足啟動Task的要求,可以啟動一個Task去運行。
準備Task運行所共享的Job資源
調用startNewTask()方法,異步地啟動了一個單獨的線程去啟動Task,該方法如下所示:
如果在一個TaskTracker節點上運行的多個Task都屬于同一個Job(一個TaskTracker上運行的Task按照Job來分組,每一組Task都屬于同一個Job),那么第一次初始化時,還沒有建立一個Task到Job的映射關系,也就是說,在TaskTracker端也要維護Job的狀態,以及屬于該Job的所有Task的狀態信息。比如,如果用戶提交了一個kill掉Job的請求,那么正在運行的屬于該Job的所有Task都應該被kill掉。
上面代碼中調用localizeJob()方法,執行了如下處理:
1.創建一個RunningJob對象,并加入到TaskTracker維護的runningJobs隊列(包含了JobID到RunningJob的映射關系)中,同時將Task對應的TIP對象加入到RunningJob所維護的tasks隊列中。
2. 一個Job完成初始化,還需要將Job相關的信息,如Job配置信息從HDFS上下載到TaskTracker所在節點本地,供該Job的一組Task運行共享。我們知道,在JobClient提交Job時,會將相關資源拷貝到HDFS上的指定目錄中,例如,在HDFS上的/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/目錄下存儲Job相關的資源文件,拷貝到TaskTracker本地目錄下,例如/tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/目錄。
3.調用TaskController的initializeJob()方法初始化Job所包含的相關資源信息,為屬于該Job的一組Task所共享。
這里,TaskController使用的LinuxTaskController實現類,通過調用該方法,實際上構造了一個Shell命令行,用來在TaskTracker節點上初始化目錄和拷貝相關資源,該命令行示例如下所示:
通過工具ShellCommandExecutor來執行上述命令行,啟動一個單獨的JVM實例,完成Job資源初始化,完成即退出。通過上述命令行可以看到,主要的初始化工作都在JobLocalizer中完成的,需要傳入2個參數:用戶、jobid,在JobLocalizer中創建了一個Job所包含的各種資源,供Task在TaskTracker節點上運行共享,這些相關的目錄或資源文件包括:
這樣,在一個TaskTracker節點上運行的一組Task所共享的對應唯一Job相關的資源已經滿足,接下來就可以啟動Task了。
啟動Task
啟動Task的流程相對復雜一些,我們分幾個階段/要點來進行說明:
啟動Task準備
在startNewTask()方法中調用localizeJob()方法,完成了Job資源在TaskTracker節點上的初始化,接著就可以調用launchTaskForJob()方法進入啟動Task的處理流程,代碼如下所示:
通過調用TaskInProgress tip的launchTask()方法來啟動Task,我們看一下該方法實現代碼:
TaskInProgress里面taskStatus維護了一個TIP的狀態,通過上述代碼可以看出,一個Task只有具備下面3個狀態之一:UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN,才能夠被啟動。
首先要進行Task的初始化,調用localizeTask()方法,如下所示:
在這里,Task可能是MapTask,也可能是ReduceTask,所以調用task.localizeConfiguration()的初始化邏輯稍微有些不同,具體可以查看MapTask和ReduceTask類實現。另外,對于不同類型的Task,也會創建不同類型的TaskRunner線程,分別對應于MapTaskRunner和ReduceTaskRunner,實際所有Task啟動的相關邏輯都是在這2個TaskRunner中實現的。
在TaskRunner中,主要邏輯是在run()方法中實現的,其中在調用launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir)之前,做了一些準備工作:
1.構建setupCmds:讀取系統環境變量,或者hadoop設置的環境變量,LD_LIBRARY_PATH、LD_LIBRARY_PATH、USER、SHELL、LOGNAME、HOME、HADOOP_TOKEN_FILE_LOCATION、HADOOP_ROOT_LOGGER、HADOOP_CLIENT_OPTS、HADOOP_CLIENT_OPTS,這些變量都是鍵值對的形式,最后會通過export在當前環境下導出這些變量配置
2.構建vargs:設置啟動Child VM的配置,讀取mapred-site.xml配置文件中mapred.map.child.java.opts和mapred.reduce.child.java.opts的配置內容,最終會使用org.apache.hadoop.mapred.Child創建一個JVM實例來啟動Task
3. 目錄文件設置:包括2個日志文件stdout和stderr,以及當前啟動JVM所在的目錄workDir
使用JvmManager管理啟動Task相關數據
完成上述準備工作以后,調用launchJvmAndWait()方法,創建Child VM實例,如下所示:
最終是通過JvmManager來實現JVM實例的創建,下面是JvmManager保存的一些數據結構,用來維護JVM相關數據的數據結構,如下圖所示:
可以看到,一個JvmManager對應2個JvmManagerForType,分別負責管理MapTask和ReduceTask啟動對應的Child VM等數據,JvmManager的構造方法,如下所示:
上面調用了jvmManager.launchJvm()方法,其中內部根據Task類型,選擇調用mapJvmManager或reduceJvmManager的reapJvm()方法,如下所示:
上面代碼中,調用setRunningTaskForJvm()很關鍵,實際上把需要啟動的Task與JvmRunner建立映射關系,更新相應的內存數據結構(隊列),如下所示:
該方法,在spawnNewJvm()方法也調用了,spawnNewJvm()方法創建了一個新的JVM,代碼如下所示:
接下來,我們看一下JvmRunner線程類,該線程體run()方法中直接調用了runChild()方法,該方法實現代碼,如下所示:
在JvmRunner線程類中,其中委托TaskController來控制Task的實際啟動。
使用TaskController控制啟動Child VM
下面,我們看TaskController啟動Task的實現方法launchTask(),代碼如下所示:
將構造好的啟動Child的命令行寫入到本地目錄下的文件中,該腳本文件的絕對路徑,示例如下所示:
在TaskController(實際上是LinuxTaskController)的launchTask()方法中,使用ShellCommandExecutor工具執行的命令行,類似如下這樣:
在taskjvm.sh腳本中的內容,才是真正啟動Child VM的命令行,示例如下所示:
至此,一個Task通過Child VM的加載已經啟動,就可以運行一個Task了,我們后續再詳細介紹。
來自:http://www.uml.org.cn/bigdata/201607224.asp