mapreduce源碼分析總結
一 MapReduce概述
Map/Reduce是一個用于大規模數據處理的分布式計算模型,它最初是由Google工程師設計并實現的,Google已經將它完整的MapReduce論 文公開發布了。其中對它的定義是,Map/Reduce是一個編程模型(programmingmodel),是一個用于處理和生成大規模數據集 (processing and generating large data sets)的相關的實現。用戶定義一個map函數來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數將所 有這些中間的有著相同key的values合并起來。很多現實世界中的任務都可用這個模型來表達。
二 MapReduce工作原理
Map-Reduce框架的運作完全基于
一個Map-Reduce任務的執行過程以及數據輸入輸出的類型如下所示:
Map:
Reduce:
下面通過一個的例子來詳細說明這個過程。
WordCount是Hadoop自帶的一個例子,目標是統計文本文件中單詞的個數。假設有如下的兩個文本文件來運行WorkCount程序:
Hello World Bye World
Hello Hadoop GoodBye Hadoop
1 map數據輸入
Hadoop針對文本文件缺省使用LineRecordReader類來實現讀取,一行一個key/value對,key取偏移量,value為行內容。
如下是map1的輸入數據:
Key1 |
Value1 |
0 |
Hello World Bye World |
如下是map2的輸入數據:
Key1 |
Value1 |
0 |
Hello Hadoop GoodBye Hadoop |
2 map輸出/combine輸入
如下是map1的輸出結果
Key2 |
Value2 |
Hello |
1 |
World |
1 |
Bye |
1 |
World |
1 |
如下是map2的輸出結果
Key2 |
Value2 |
Hello |
1 |
Hadoop |
1 |
GoodBye |
1 |
Hadoop |
1 |
3 combine輸出
Combiner類實現將相同key的值合并起來,它也是一個Reducer的實現。
如下是combine1的輸出
Key2 |
Value2 |
Hello |
1 |
World |
2 |
Bye |
1 |
如下是combine2的輸出
Key2 |
Value2 |
Hello |
1 |
Hadoop |
2 |
GoodBye |
1 |
4 reduce輸出
Reducer類實現將相同key的值合并起來。
如下是reduce的輸出
Key2 |
Value2 |
Hello |
2 |
World |
2 |
Bye |
1 |
Hadoop |
2 |
GoodBye |
1 |
三 MapReduce框架結構
1 角色
1.1 JobTracker
JobTracker是一個master服務, JobTracker負責調度job的每一個子任務task運行于TaskTracker上,并監控它們,如果發現有失敗的task就重新運行它。一般情況應該把JobTracker部署在單獨的機器上。
1.2 TaskTracker
TaskTracker是運行于多個節點上的slaver服務。TaskTracker則負責直接執行每一個task。TaskTracker都需要運行在HDFS的DataNode上,
1.3 JobClient
每一個job都會在用戶端通過JobClient類將應用程序以及配置參數打包成jar文件存儲在HDFS,并把路徑提交到JobTracker,然后由JobTracker創建每一個Task(即MapTask和ReduceTask)并將它們分發到各個TaskTracker服務中去執行。
2 數據結構
2.1 Mapper和Reducer
運行于Hadoop的MapReduce應用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創建JobConf的執行程序,在一些應用中還可以包括一個Combiner類,它實際也是Reducer的實現。
2.2 JobInProgress
JobClient提交job后,JobTracker會創建一個JobInProgress來跟蹤和調度這個job,并把它添加到job隊列里。JobInProgress會根據提交的job jar中定義的輸入數據集(已分解成FileSplit)創建對應的一批TaskInProgress用于監控和調度MapTask,同時在創建指定數目的TaskInProgress用于監控和調度ReduceTask,缺省為1個ReduceTask。
2.3 TaskInProgress
JobTracker啟動任務時通過每一個TaskInProgress來launchTask,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到后會創建對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,作用類似)用于監控和調度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載jobjar,并設置好環境變量后啟動一個獨立的java child進程來執行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。
2.4 MapTask和ReduceTask
一個完整的job會自動依次執行Mapper、Combiner(在JobConf指定了Combiner時執行)和Reducer,其中Mapper和Combiner是由MapTask調用執行,Reducer則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據jobjar中定義的輸入數據集按
下圖描述了Map/Reduce框架中主要組成和它們之間的關系:
3 流程
一道MapRedcue作業是通過JobClient.rubJob(job)向master節點的JobTracker提交的, JobTracker接到JobClient的請求后把其加入作業隊列中。JobTracker一直在等待JobClient通過RPC提交作業,而 TaskTracker一直通過RPC向 JobTracker發送心跳heartbeat詢問有沒有任務可做,如果有,讓其派發任務給它執行。如果JobTracker的作業隊列不為空, 則TaskTracker發送的心跳將會獲得JobTracker給它派發的任務。這是一道pull過程。slave節點的TaskTracker接到任 務后在其本地發起Task,執行任務。以下是簡略示意圖:
下面詳細介紹一下Map/Reduce處理一個工作的流程。
四JobClient
在編寫MapReduce程序時通常是上是這樣寫的:
Configuration conf = new Configuration();// 讀取hadoop配置
Job job = new Job(conf, "作業名稱"); // 實例化一道作業
job.setMapperClass(Mapper類型);
job.setCombinerClass(Combiner類型);
job.setReducerClass(Reducer類型);
job.setOutputKeyClass(輸出Key的類型);
job.setOutputValueClass(輸出Value的類型);
FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑));
FileOutputFormat.setOutputPath(job, newPath(輸出hdfs路徑));
// 其它初始化配置
JobClient.runJob(job);
1配置Job
JobConf是用戶描述一個job的接口。下面的信息是MapReduce過程中一些較關鍵的定制信息:
2 JobClient.runJob():運行Job并分解輸入數據集
一個MapReduce的Job會通過JobClient類根據用戶在JobConf類中定義的InputFormat實現類來將輸入的數據集分解成一批小的數據集,每一個小數據集會對應創建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調用FileInputFormat.getSplits()方法生成小數據集,如果判斷數據文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統一打包到jobFile的jar中。
JobClient然后使用submitJob(job)方法向 master提交作業。submitJob(job)內部是通過submitJobInternal(job)方法完成實質性的作業提交。 submitJobInternal(job)方法首先會向hadoop分布系統文件系統hdfs依次上傳三個文件: job.jar, job.split和job.xml。
job.xml: 作業配置,例如Mapper,Combiner, Reducer的類型,輸入輸出格式的類型等。
job.jar: jar包,里面包含了執行此任務需要的各種類,比如 Mapper,Reducer等實現。
job.split: 文件分塊的相關信息,比如有數據分多少個塊,塊的大小(默認64m)等。
這三個文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統路徑mapred.system.dir屬性 + jobid決定。mapred.system.dir屬性默認是/tmp/hadoop-user_name/mapred/system。寫完這三個文 件之后, 此方法會通過RPC調用master節點上的JobTracker.submitJob(job)方法,此時作業已經提交完成。
3提交Job
jobFile的提交過程是通過RPC模塊(有單獨一章來詳細介紹)來實現的。大致過程是,JobClient類中通過RPC實現的Proxy接口調用JobTracker的submitJob()方法,而JobTracker必須實現JobSubmissionProtocol接口。
JobTracker創建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態信息,如執行時間、Map和Reduce任務完成的比例等。JobClient會根據這個JobStatus對象創建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執行過程的統計數據來監控并打印到用戶的控制臺。
與創建Job過程相關的類和方法如下圖所示
五 JobTracker
上面已經提到,job是統一由JobTracker來調度的,具體的Task分發給各個TaskTracker節點來執行。下面來詳細解析執行過程,首先先從JobTracker收到JobClient的提交請求開始。
1JobTracker初始化Job
1.1JobTracker.submitJob() 收到請求
當JobTracker接收到新的job請求(即submitJob()函數被調用)后,會創建一個JobInProgress對象并通過它來管理和調度任務。JobInProgress在創建的時候會初始化一系列與任務有關的參數,調用到FileSystem,把在JobClient端上傳的所有任務文件下載到本地的文件系統中的臨時目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。
1.2JobTracker.JobInitThread 通知初始化線程
JobTracker 中的監聽器類EagerTaskInitializationListener負責任務Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一個專門管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。resortInitQueue方法根據作業的優先級排序。然后調用notifyAll()函數,會喚起一個用于初始化job的線程JobInitThread來處理。JobInitThread收到信號后即取出最靠前的job,即優先級別最高的job,調用TaskTrackerManager的initJob最終調用JobInProgress.initTasks()執行真正的初始化工作。
1.3JobInProgress.initTasks() 初始化TaskInProgress
任務Task分兩種: MapTask 和reduceTask,它們的管理對象都是TaskInProgress 。
首先JobInProgress會創建Map的監控對象。在initTasks()函數里通過調用JobClient的readSplitFile()獲得已分解的輸入數據的RawSplit列表,然后根據這個列表創建對應數目的Map執行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應的所有在HDFS里的blocks所在的DataNode節點的host,這個會在RawSplit創建時通過FileSplit的getLocations()函數獲取,該函數會調用DistributedFileSystem的getFileCacheHints()獲得(這個細節會在HDFS中講解)。當然如果是存儲在本地文件系統中,即使用LocalFileSystem時當然只有一個location即“localhost”了。
創建這些TaskInProgress對象完畢后,initTasks()方法會通過createCache()方法為這些TaskInProgress對象產生一個未執行任務的Map緩存nonRunningMapCache。slave端的TaskTracker向master發送心跳時,就可以直接從這個cache中取任務去執行。
其次JobInProgress會創建Reduce的監控對象,這個比較簡單,根據JobConf里指定的Reduce數目創建,缺省只創建1個Reduce任務。監控和調度Reduce任務的是TaskInProgress類,不過構造方法有所不同,TaskInProgress會根據不同參數分別創建具體的MapTask或者ReduceTask。同樣地,initTasks()也會通過createCache()方法產生nonRunningReduceCache成員。
JobInProgress創建完TaskInProgress后,最后構造JobStatus并記錄job正在執行中,然后再調用JobHistory.JobInfo.logStarted()記錄job的執行日志。到這里JobTracker里初始化job的過程全部結束。
2 JobTracker調度Job
hadoop默認的調度器是FIFO策略的JobQueueTaskScheduler,它有兩個成員變量 jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。 JobQueueJobInProgressListener是JobTracker的另一個監聽器類,它包含了一個映射,用來管理和調度所有的 JobInProgress。jobAdded(job)同時會加入job到JobQueueJobInProgressListener中的映射。
JobQueueTaskScheduler最重要的方法是assignTasks,他實現了工作調度。具體實 現:JobTracker 接到TaskTracker的heartbeat() 調用后,首先會檢查上一個心跳響應是否完成,是沒要求啟動或重啟任務,如果一切正常,則會處理心跳。首先它會檢查 TaskTracker 端還可以做多少個 map 和 reduce 任務,將要派發的任務數是否超出這個數,是否超出集群的任務平均剩余可負載數。如果都沒超出,則為此TaskTracker 分配一個 MapTask 或 ReduceTask 。產生 Map 任務使用 JobInProgress 的obtainNewMapTask() 方法,實質上最后調用了 JobInProgress 的 findNewMapTask() 訪問nonRunningMapCache 。
上面講解任務初始化時說過,createCache()方法會在網絡拓撲結構上掛上需要執行的TaskInProgress。 findNewMapTask()從近到遠一層一層地尋找,首先是同一節點,然后在尋找同一機柜上的節點,接著尋找相同數據中心下的節點,直到找了 maxLevel層結束。這樣的話,在JobTracker給TaskTracker派發任務的時候,可以迅速找到最近的TaskTracker,讓它執 行任務。
最終生成一個Task類對象,該對象被封裝在一個LanuchTaskAction中,發回給TaskTracker,讓它去執行任務。
產生 Reduce 任務過程類似,使用JobInProgress.obtainNewReduceTask() 方法,實質上最后調用了JobInProgress 的 findNewReduceTask() 訪問 nonRuningReduceCache。
六 TaskTracker
1TaskTracker加載Task到子進程
Task的執行實際是由TaskTracker發起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等。如果發現有自己需要執行的新任務也會在這時啟動,即是在TaskTracker調用JobTracker的heartbeat()方法時進行,此調用底層是通過IPC層調用Proxy接口實現。下面一一簡單介紹下每個步驟。
1.1TaskTracker.run() 連接JobTracker
TaskTracker的啟動過程會初始化一系列參數和服務,然后嘗試連接JobTracker(即必須實現InterTrackerProtocol接口),如果連接斷開,則會循環嘗試連接JobTracker,并重新初始化所有成員和參數。
1.2TaskTracker.offerService() 主循環
如果連接JobTracker服務成功,TaskTracker就會調用offerService()函數進入主執行循環中。這個循環會每隔10秒與JobTracker通訊一次,調用transmitHeartBeat(),獲得HeartbeatResponse信息。然后調用HeartbeatResponse的getActions()函數獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數組。再遍歷這個數組,如果是一個新任務指令即LaunchTaskAction則調用調用addToTaskQueue加入到待執行隊列,否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執行KillJobAction或者KillTaskAction等。
1.3TaskTracker.transmitHeartBeat() 獲取JobTracker指令
在transmitHeartBeat()函數處理中,TaskTracker會創建一個新的TaskTrackerStatus對象記錄目前任務的執行狀況,檢查目前執行的Task數目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設置heartbeat()的askForNewTask參數為true。然后通過IPC接口調用JobTracker的heartbeat()方法發送過去,heartbeat()返回值TaskTrackerAction數組。
1.4 TaskTracker.addToTaskQueue,交給TaskLauncher處理
TaskLauncher是用來處理新任務的線程類,包含了一個待運行任務的隊列 tasksToLaunch。TaskTracker.addToTaskQueue會調用TaskTracker的registerTask,創建 TaskInProgress對象來調度和監控任務,并把它加入到runningTasks隊列中。同時將這個TaskInProgress加到 tasksToLaunch中,并notifyAll()喚醒一個線程運行,該線程從隊列tasksToLaunch取出一個待運行任務,調用 TaskTracker的startNewTask運行任務。
1.5 TaskTracker.startNewTask() 啟動新任務
調用localizeJob()真正初始化Task并開始執行。
1.6 TaskTracker.localizeJob() 初始化job目錄等
此函數主要任務是初始化工作目錄workDir,再將job jar包從HDFS復制到本地文件系統中,調用RunJar.unJar()將包解壓到工作目錄。然后創建一個RunningJob并調用 addTaskToJob()函數將它添加到runningJobs監控隊列中。addTaskToJob方法把一個任務加入到該任務屬于的 runningJob的tasks列表中。如果該任務屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調用 launchTaskForJob()開始執行Task。
1.7 TaskTracker.launchTaskForJob()執行任務
啟動Task的工作實際是調用TaskTracker$TaskInProgress的launchTask()函數來執行的。
1.8 TaskTracker$TaskInProgress.launchTask()執行任務
執行任務前先調用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調用Task的createRunner()方法創建TaskRunner對象并調用其start()方法最后啟動Task獨立的java執行子進程。
1.9 Task.createRunner()創建啟動Runner對象
Task有兩個實現版本,即MapTask和ReduceTask,它們分別用于創建Map和Reduce任務。MapTask會創建MapTaskRunner來啟動Task子進程,而ReduceTask則創建ReduceTaskRunner來啟動。
1.10 TaskRunner.start()啟動子進程
TaskRunner負責將一個任務放到一個進程里面來執行。它會調用run()函數來處理,主要的工作就是初始化啟動java子進程的一系列環境變量, 包括設定工作目錄workDir,設置CLASSPATH環境變量等。然后裝載job jar包。JvmManager用于管理該TaskTracker上所有運行的Task子進程。每一個進程都是由JvmRunner來管理的,它也是位于 單獨線程中的。JvmManager的launchJvm方法,根據任務是map還是reduce,生成對應的JvmRunner并放到對應 JvmManagerForType的進程容器中進行管理。JvmManagerForType的reapJvm()
分配一個新的JVM進程。如果JvmManagerForType槽滿,就尋找idle的進程,如果是同Job的直接放進去,否則殺死這個進程,用 一個新的進程代替。 如果槽沒有滿,那么就啟動新的子進程。生成新的進程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner線程的run方 法,run方法用于生成一個新的進程并運行它,具體實現是調用runChild。
2 子進程執行MapTask
真實的執行載體,是Child,它包含一個 main函數,進程執行,會將相關參數傳進來,它會拆解這些參數,通過getTask(jvmId)向父進程索取任務,并且構造出相關的Task實例,然后使用Task的run()啟動任務。
2.1 run
方法相當簡單,配置完系統的TaskReporter后,就根據情況執行 runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或執行Mapper。由于 MapReduce現在有兩套API,MapTask需要支持這兩套API,使得MapTask執行Mapper分為runNewMapper和 runOldMapper,我們分析runOldMapper。
2.2 runOldMapper
runOldMapper最開始部分是構造Mapper處理的InputSplit,然后就開始創建Mapper的RecordReader,最終得到 map的輸入。之后構造Mapper的輸出,是通過MapOutputCollector進行的,也分兩種情況,如果沒有Reducer,那么,用 DirectMapOutputCollector,否則,用MapOutputBuffer。
構造完Mapper的輸入輸出,通過構造配置文件中配置的MapRunnable,就可以執行Mapper了。目前系統有兩個 MapRunnable:MapRunner和MultithreadedMapRunner。MapRunner是單線程執行器,比較簡單,他會使用反 射機制生成用戶定義的Mapper接口實現類,作為他的一個成員。
2.3 MapRunner的run方法
會先創建對應的key,value對象,然后,對InputSplit的每一對
2.4 OutputCollector
OutputCollector的作用是收集每次調用map后得到的新的kv對,寧把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。
MapOutputCollector 有兩個子類:MapOutputBuffer和DirectMapOutputCollector。 DirectMapOutputCollector用在不需要Reduce階段的時候。如果Mapper后續有reduce任務,系統會使用 MapOutputBuffer做為輸出, MapOutputBuffer使用了一個緩沖區對map的處理結果進行緩存,放在內存中,又使用幾個數組對這個緩沖區進行管理。
在適當的時機,緩沖區中的數據會被spill到硬盤中。
向硬盤中寫數據的時機:
(1)當內存緩沖區不能容下一個太大的kv對時。spillSingleRecord方法。
(2)內存緩沖區已滿時。SpillThread線程。
(3)Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。
2.5 spillThread線程:將緩沖區中的數據spill到硬盤中。
(1)需要spill時調用函數sortAndSpill,按照partition和key做排序。默認使用的是快速排序QuickSort。
(2)如果沒有combiner,則直接輸出記錄,否則,調用CombinerRunner的combine,先做combin然后輸出。
3 子進程執行ReduceTask
ReduceTask.run方法開始和MapTask類似,包括initialize()初始 化,runJobCleanupTask(),runJobSetupTask(),runTaskCleanupTask()。之后進入正式的工作,主 要有這么三個步驟:Copy、Sort、Reduce。
3.1 Copy
就是從執行各個Map任務的服務器那里,收羅到map的輸出文件。拷貝的任務,是由ReduceTask.ReduceCopier 類來負責。
3.1.1 類圖:
3.1.2 流程: 使用ReduceCopier.fetchOutputs開始
(1)索取任務。使用GetMapEventsThread線程。該線程的run方法不停的調用getMapCompletionEvents方法,該方 法又使用RPC調用TaskUmbilicalProtocol協議的getMapCompletionEvents,方法使用所屬的jobID向其父 TaskTracker詢問此作業個Map任務的完成狀況(TaskTracker要向JobTracker詢問后再轉告給它...)。返回一個數組 TaskCompletionEventevents[]。TaskCompletionEvent包含taskid和ip地址之類的信息。 (2)當獲取到相關Map任務執行服務器的信息后,有一個線程MapOutputCopier開啟,做具體的拷貝工作。 它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。MapOutputCopier的run循環調用 copyOutput,copyOutput又調用getMapOutput,使用HTTP遠程拷貝。
(3)getMapOutput遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。
(4) 同時,還有一個內存Merger線程InMemFSMergeThread和一個文件Merger線程LocalFSMerger在同步工作,它們將下載 過來的文件(可能在內存中,簡單的統稱為文件...),做著歸并排序,以此,節約時間,降低輸入文件的數量,為后續的排序工作減 負。InMemFSMergeThread的run循環調用doInMemMerge,該方法使用工具類Merger實現歸并,如果需要combine, 則combinerRunner.combine。
3.2 Sort
排序工作,就相當于上述排序工作的一個延續。它會在所有的文件都拷貝完畢后進行。使用工具類Merger歸并所有的文件。經過這一個流程,一個合并了所有所需Map任務輸出文件的新文件產生了。而那些從其他各個服務器網羅過來的 Map任務輸出文件,全部刪除了。
3.3Reduce
Reduce任務的最后一個階段。他會準備好 keyClass("mapred.output.key.class" 或"mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class" 或"mapred.output.value.class")和 Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后調用runOldReducer方法。(也是兩套API,我們分析 runOldReducer)
3.3.1 runOldReducer
(1)輸出方面。
它會準備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個 RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統,基本都是分布式文件系 統,或者說是HDFS。
(2)輸入方面,ReduceTask會用準備好的KeyClass、ValueClass、KeyComparator等等之類的自定義類,構造出Reducer所需的鍵類型,和值的迭代類型Iterator(一個鍵到了這里一般是對應一組值)。
(3)有了輸入,有了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。