hadoop mapreduce核心功能描述
核心功能描述
應用程序通常會通過提供map和reduce來實現 Mapper和Reducer接口,它們組成作業的核心。
Mapper
Mapper將輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合。
Map是一類將輸入記錄集轉換為中間格式記錄集的獨立任務。 這種轉換的中間格式記錄集不需要與輸入記錄集的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。
Hadoop Map/Reduce框架為每一個InputSplit產生一個map任務,而每個InputSplit是由該作業的InputFormat產生的。
概括地說,對Mapper的實現者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數,目的是完成Mapper的初始化工作。然后,框架為這個任務的InputSplit中每個鍵值對調用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。應用程序可以通過重寫Closeable.close()方法來執行相應的清理工作。
輸出鍵值對不需要與輸入鍵值對的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。通過調用 OutputCollector.collect(WritableComparable,Writable)可以收集輸出的鍵值對。
應用程序可以使用Reporter報告進度,設定應用級別的狀態消息,更新Counters(計數器),或者僅是表明自己運行正常。
框架隨后會把與一個特定key關聯的所有中間過程的值(value)分成組,然后把它們傳給Reducer以產出最終的結果。用戶可以通過 JobConf.setOutputKeyComparatorClass(Class)來指定具體負責分組的 Comparator。
Mapper的輸出被排序后,就被劃分給每個Reducer。分塊的總數目和一個作業的reduce任務的數目是一樣的。用戶可以通過實現自定義的 Partitioner來控制哪個key被分配給哪個 Reducer。
用戶可選擇通過 JobConf.setCombinerClass(Class)指定一個combiner,它負責對中間過程的輸出進行本地的聚集,這會有助于降低從Mapper到 Reducer數據傳輸量。
這些被排好序的中間過程的輸出結果保存的格式是(key-len, key, value-len, value),應用程序可以通過JobConf控制對這些中間結果是否進行壓縮以及怎么壓縮,使用哪種 CompressionCodec。
需要多少個Map?
Map的數目通常是由輸入數據的大小決定的,一般就是所有輸入文件的總塊(block)數。
Map正常的并行規模大致是每個節點(node)大約10到100個map,對于CPU 消耗較小的map任務可以設到300個左右。由于每個任務初始化需要一定的時間,因此,比較合理的情況是map執行的時間至少超過1分鐘。
這樣,如果你輸入10TB的數據,每個塊(block)的大小是128MB,你將需要大約82,000個map來完成任務,除非使用 setNumMapTasks(int)(注意:這里僅僅是對框架進行了一個提示(hint),實際決定因素見這里)將這個數值設置得更高。
Reducer
Reducer將與一個key關聯的一組中間數值集歸約(reduce)為一個更小的數值集。
用戶可以通過 JobConf.setNumReduceTasks(int)設定一個作業中reduce任務的數目。
概括地說,對Reducer的實現者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數,目的是完成Reducer的初始化工作。然后,框架為成組的輸入數據中的每個<key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,應用程序可以通過重寫Closeable.close()來執行相應的清理工作。
Reducer有3個主要階段:shuffle、sort和reduce。
Shuffle
Reducer的輸入就是Mapper已經排好序的輸出。在這個階段,框架通過HTTP為每個Reducer獲得所有Mapper輸出中與之相關的分塊。
Sort
這個階段,框架將按照key的值對Reducer的輸入進行分組 (因為不同mapper的輸出中可能會有相同的key)。
Shuffle和Sort兩個階段是同時進行的;map的輸出也是一邊被取回一邊被合并的。
Secondary Sort
如果需要中間過程對key的分組規則和reduce前對key的分組規則不同,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組,所以結合兩者可以實現按值的二次排序。
Reduce
在這個階段,框架為已分組的輸入數據中的每個 <key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統的。
應用程序可以使用Reporter報告進度,設定應用程序級別的狀態消息,更新Counters(計數器),或者僅是表明自己運行正常。
Reducer的輸出是沒有排序的。
需要多少個Reduce?
Reduce的數目建議是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95,所有reduce可以在maps一完成時就立刻啟動,開始傳輸map的輸出結果。用1.75,速度快的節點可以在完成第一輪reduce任務后,可以開始第二輪,這樣可以得到比較好的負載均衡的效果。
增加reduce的數目會增加整個框架的開銷,但可以改善負載均衡,降低由于執行失敗帶來的負面影響。
上述比例因子比整體數目稍小一些是為了給框架中的推測性任務(speculative-tasks) 或失敗的任務預留一些reduce的資源。
無Reducer
如果沒有歸約要進行,那么設置reduce任務的數目為零是合法的。
這種情況下,map任務的輸出會直接被寫入由 setOutputPath(Path)指定的輸出路徑。框架在把它們寫入FileSystem之前沒有對它們進行排序。
Partitioner
Partitioner用于劃分鍵值空間(key space)。
Partitioner負責控制map輸出結果key的分割。Key(或者一個key子集)被用于產生分區,通常使用的是Hash函數。分區的數目與一個作業的reduce任務的數目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應該發送給m個reduce任務中的哪一個來進行reduce操作。
HashPartitioner是默認的 Partitioner。
Reporter
Reporter是用于Map/Reduce應用程序報告進度,設定應用級別的狀態消息, 更新Counters(計數器)的機制。
Mapper和Reducer的實現可以利用Reporter 來報告進度,或者僅是表明自己運行正常。在那種應用程序需要花很長時間處理個別鍵值對的場景中,這種機制是很關鍵的,因為框架可能會以為這個任務超時了,從而將它強行殺死。另一個避免這種情況發生的方式是,將配置參數mapred.task.timeout設置為一個足夠高的值(或者干脆設置為零,則沒有超時限制了)。
應用程序可以用Reporter來更新Counter(計數器)。
OutputCollector
OutputCollector是一個Map/Reduce框架提供的用于收集 Mapper或Reducer輸出數據的通用機制 (包括中間輸出結果和作業的輸出結果)。
Hadoop Map/Reduce框架附帶了一個包含許多實用型的mapper、reducer和partitioner 的類庫。
作業配置
JobConf代表一個Map/Reduce作業的配置。
JobConf是用戶向Hadoop框架描述一個Map/Reduce作業如何執行的主要接口。框架會按照JobConf描述的信息忠實地去嘗試完成這個作業,然而:
- 一些參數可能會被管理者標記為 final,這意味它們不能被更改。
- 一些作業的參數可以被直截了當地進行設置(例如: setNumReduceTasks(int)),而另一些參數則與框架或者作業的其他參數之間微妙地相互影響,并且設置起來比較復雜(例如: setNumMapTasks(int))。
通常,JobConf會指明Mapper、Combiner(如果有的話)、 Partitioner、Reducer、InputFormat和 OutputFormat的具體實現。JobConf還能指定一組輸入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及輸出文件應該寫在哪兒 (setOutputPath(Path))。
JobConf可選擇地對作業設置一些高級選項,例如:設置Comparator; 放到DistributedCache上的文件;中間結果或者作業輸出結果是否需要壓縮以及怎么壓縮; 利用用戶提供的腳本(setMapDebugScript(String)/setReduceDebugScript(String)) 進行調試;作業是否允許預防性(speculative)任務的執行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每個任務最大的嘗試次數 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一個作業能容忍的任務失敗的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。
當然,用戶能使用 set(String, String)/get(String, String) 來設置或者取得應用程序需要的任意參數。然而,DistributedCache的使用是面向大規模只讀數據的。
任務的執行和環境
TaskTracker是在一個單獨的jvm上以子進程的形式執行 Mapper/Reducer任務(Task)的。
子任務會繼承父TaskTracker的環境。用戶可以通過JobConf中的 mapred.child.java.opts配置參數來設定子jvm上的附加選項,例如: 通過-Djava.library.path=<> 將一個非標準路徑設為運行時的鏈接用以搜索共享庫,等等。如果mapred.child.java.opts包含一個符號@taskid@, 它會被替換成map/reduce的taskid的值。
下面是一個包含多個參數和替換的例子,其中包括:記錄jvm GC日志; JVM JMX代理程序以無密碼的方式啟動,這樣它就能連接到jconsole上,從而可以查看子進程的內存和線程,得到線程的dump;還把子jvm的最大堆尺寸設置為512MB, 并為子jvm的java.library.path添加了一個附加路徑。
<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
用戶或管理員也可以使用mapred.child.ulimit設定運行的子任務的最大虛擬內存。mapred.child.ulimit的值以(KB)為單位,并且必須大于或等于-Xmx參數傳給JavaVM的值,否則VM會無法啟動。
注意:mapred.child.java.opts只用于設置task tracker啟動的子任務。為守護進程設置內存選項請查看 cluster_setup.html
${mapred.local.dir}/taskTracker/是task tracker的本地目錄, 用于創建本地緩存和job。它可以指定多個目錄(跨越多個磁盤),文件會半隨機的保存到本地路徑下的某個目錄。當job啟動時,task tracker根據配置文檔創建本地job目錄,目錄結構如以下所示:
- ${mapred.local.dir}/taskTracker/archive/ :分布式緩存。這個目錄保存本地的分布式緩存。因此本地分布式緩存是在所有task和job間共享的。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目錄。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目錄。各個任務可以使用這個空間做為暫存空間,用于它們之間共享文件。這個目錄通過job.local.dir 參數暴露給用戶。這個路徑可以通過API JobConf.getJobLocalDir()來訪問。它也可以被做為系統屬性獲得。因此,用戶(比如運行streaming)可以調用System.getProperty("job.local.dir")獲得該目錄。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路徑,用于存放作業的jar文件和展開的jar。job.jar是應用程序的jar文件,它會被自動分發到各臺機器,在task啟動前會被自動展開。使用api JobConf.getJar() 函數可以得到job.jar的位置。使用JobConf.getJar().getParent()可以訪問存放展開的jar包的目錄。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一個job.xml文件,本地的通用的作業配置文件。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每個任務有一個目錄task-id,它里面有如下的目錄結構:
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一個job.xml文件,本地化的任務作業配置文件。任務本地化是指為該task設定特定的屬性值。這些值會在下面具體說明。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一個存放中間過程的輸出文件的目錄。它保存了由framwork產生的臨時map reduce數據,比如map的輸出文件等。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的當前工作目錄。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的臨時目錄。(用戶可以設定屬性mapred.child.tmp 來為map和reduce task設定臨時目錄。缺省值是./tmp。如果這個值不是絕對路徑, 它會把task的工作路徑加到該路徑前面作為task的臨時文件路徑。如果這個值是絕對路徑則直接使用這個值。 如果指定的目錄不存在,會自動創建該目錄。之后,按照選項 -Djava.io.tmpdir='臨時文件的絕對路徑'執行java子任務。 pipes和streaming的臨時文件路徑是通過環境變量TMPDIR='the absolute path of the tmp dir'設定的)。 如果mapred.child.tmp有./tmp值,這個目錄會被創建。
下面的屬性是為每個task執行時使用的本地參數,它們保存在本地化的任務作業配置文件里:
名稱 | 類型 | 描述 |
---|---|---|
mapred.job.id | String | job id |
mapred.jar | String | job目錄下job.jar的位置 |
job.local.dir | String | job指定的共享存儲空間 |
mapred.tip.id | String | task id |
mapred.task.id | String | task嘗試id |
mapred.task.is.map | boolean | 是否是map task |
mapred.task.partition | int | task在job中的id |
map.input.file | String | map讀取的文件名 |
map.input.start | long | map輸入的數據塊的起始位置偏移 |
map.input.length | long | map輸入的數據塊的字節數 |
mapred.work.output.dir | String | task臨時輸出目錄 |
task的標準輸出和錯誤輸出流會被讀到TaskTracker中,并且記錄到 ${HADOOP_LOG_DIR}/userlogs
DistributedCache 可用于map或reduce task中分發jar包和本地庫。子jvm總是把 當前工作目錄 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通過 System.loadLibrary或 System.load裝載緩存的庫。有關使用分布式緩存加載共享庫的細節請參考 native_libraries.html
作業的提交與監控
JobClient是用戶提交的作業與JobTracker交互的主要接口。
JobClient 提供提交作業,追蹤進程,訪問子任務的日志記錄,獲得Map/Reduce集群狀態信息等功能。
作業提交過程包括:
- 檢查作業輸入輸出樣式細節
- 為作業計算InputSplit值。
- 如果需要的話,為作業的DistributedCache建立必須的統計信息。
- 拷貝作業的jar包和配置文件到FileSystem上的Map/Reduce系統目錄下。
- 提交作業到JobTracker并且監控它的狀態。
作業的歷史文件記錄到指定目錄的"_logs/history/"子目錄下。這個指定目錄由hadoop.job.history.user.location設定,默認是作業輸出的目錄。因此默認情況下,文件會存放在mapred.output.dir/_logs/history目錄下。用戶可以設置hadoop.job.history.user.location為none來停止日志記錄。
用戶使用下面的命令可以看到在指定目錄下的歷史日志記錄的摘要。
$ bin/hadoop job -history output-dir
這個命令會打印出作業的細節,以及失敗的和被殺死的任務細節。
要查看有關作業的更多細節例如成功的任務、每個任務嘗試的次數(task attempt)等,可以使用下面的命令
$ bin/hadoop job -history all output-dir
用戶可以使用 OutputLogFilter 從輸出目錄列表中篩選日志文件。
一般情況,用戶利用JobConf創建應用程序并配置作業屬性, 然后用 JobClient 提交作業并監視它的進程。
作業的控制
有時候,用一個單獨的Map/Reduce作業并不能完成一個復雜的任務,用戶也許要鏈接多個Map/Reduce作業才行。這是容易實現的,因為作業通常輸出到分布式文件系統上的,所以可以把這個作業的輸出作為下一個作業的輸入實現串聯。
然而,這也意味著,確保每一作業完成(成功或失敗)的責任就直接落在了客戶身上。在這種情況下,可以用的控制作業的選項有:
- runJob(JobConf):提交作業,僅當作業完成時返回。
- submitJob(JobConf):只提交作業,之后需要你輪詢它返回的 RunningJob句柄的狀態,并根據情況調度。
- JobConf.setJobEndNotificationURI(String):設置一個作業完成通知,可避免輪詢。
作業的輸入
InputFormat 為Map/Reduce作業描述輸入的細節規范。
Map/Reduce框架根據作業的InputFormat來:
- 檢查作業輸入的有效性。
- 把輸入文件切分成多個邏輯InputSplit實例, 并把每一實例分別分發給一個 Mapper。
- 提供RecordReader的實現,這個RecordReader從邏輯InputSplit中獲得輸入記錄, 這些記錄將由Mapper處理。
基于文件的InputFormat實現(通常是 FileInputFormat的子類) 默認行為是按照輸入文件的字節大小,把輸入數據切分成邏輯分塊(logical InputSplit )。 其中輸入文件所在的FileSystem的數據塊尺寸是分塊大小的上限。下限可以設置mapred.min.split.size 的值。
考慮到邊界情況,對于很多應用程序來說,很明顯按照文件大小進行邏輯分割是不能滿足需求的。 在這種情況下,應用程序需要實現一個RecordReader來處理記錄的邊界并為每個任務提供一個邏輯分塊的面向記錄的視圖。
TextInputFormat 是默認的InputFormat。
如果一個作業的Inputformat是TextInputFormat, 并且框架檢測到輸入文件的后綴是.gz和.lzo,就會使用對應的CompressionCodec自動解壓縮這些文件。 但是需要注意,上述帶后綴的壓縮文件不會被切分,并且整個壓縮文件會分給一個mapper來處理。
InputSplit
InputSplit 是一個單獨的Mapper要處理的數據塊。
一般的InputSplit 是字節樣式輸入,然后由RecordReader處理并轉化成記錄樣式。
FileSplit 是默認的InputSplit。 它把 map.input.file 設定為輸入文件的路徑,輸入文件是邏輯分塊文件。
RecordReader
RecordReader 從InputSlit讀入<key, value>對。
一般的,RecordReader 把由InputSplit 提供的字節樣式的輸入文件,轉化成由Mapper處理的記錄樣式的文件。 因此RecordReader負責處理記錄的邊界情況和把數據表示成keys/values對形式。
作業的輸出
OutputFormat 描述Map/Reduce作業的輸出樣式。
Map/Reduce框架根據作業的OutputFormat來:
- 檢驗作業的輸出,例如檢查輸出路徑是否已經存在。
- 提供一個RecordWriter的實現,用來輸出作業結果。 輸出文件保存在FileSystem上。
TextOutputFormat是默認的 OutputFormat。
任務的Side-Effect File
在一些應用程序中,子任務需要產生一些side-file,這些文件與作業實際輸出結果的文件不同。
在這種情況下,同一個Mapper或者Reducer的兩個實例(比如預防性任務)同時打開或者寫 FileSystem上的同一文件就會產生沖突。因此應用程序在寫文件的時候需要為每次任務嘗試(不僅僅是每次任務,每個任務可以嘗試執行很多次)選取一個獨一無二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。
為了避免沖突,Map/Reduce框架為每次嘗試執行任務都建立和維護一個特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目錄,這個目錄位于本次嘗試執行任務輸出結果所在的FileSystem上,可以通過 ${mapred.work.output.dir}來訪問這個子目錄。 對于成功完成的任務嘗試,只有${mapred.output.dir}/_temporary/_${taskid}下的文件會移動到${mapred.output.dir}。當然,框架會丟棄那些失敗的任務嘗試的子目錄。這種處理過程對于應用程序來說是完全透明的。
在任務執行期間,應用程序在寫文件時可以利用這個特性,比如 通過 FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}目錄, 并在其下創建任意任務執行時所需的side-file,框架在任務嘗試成功時會馬上移動這些文件,因此不需要在程序內為每次任務嘗試選取一個獨一無二的名字。
注意:在每次任務嘗試執行期間,${mapred.work.output.dir} 的值實際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個值是Map/Reduce框架創建的。 所以使用這個特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路徑下創建side-file即可。
對于只使用map不使用reduce的作業,這個結論也成立。這種情況下,map的輸出結果直接生成到HDFS上。
RecordWriter
RecordWriter 生成<key, value> 對到輸出文件。
RecordWriter的實現把作業的輸出結果寫到 FileSystem。
其他有用的特性
Counters
Counters 是多個由Map/Reduce框架或者應用程序定義的全局計數器。 每一個Counter可以是任何一種 Enum類型。同一特定Enum類型的Counter可以匯集到一個組,其類型為Counters.Group。
應用程序可以定義任意(Enum類型)的Counters并且可以通過 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架會匯總這些全局counters。
DistributedCache
DistributedCache 可將具體應用相關的、大尺寸的、只讀的文件有效地分布放置。
DistributedCache 是Map/Reduce框架提供的功能,能夠緩存應用程序所需的文件 (包括文本,檔案文件,jar文件等)。
應用程序在JobConf中通過url(hdfs://)指定需要被緩存的文件。 DistributedCache假定由hdfs://格式url指定的文件已經在 FileSystem上了。
Map-Redcue框架在作業所有任務執行之前會把必要的文件拷貝到slave節點上。 它運行高效是因為每個作業的文件只拷貝一次并且為那些沒有文檔的slave節點緩存文檔。
DistributedCache 根據緩存文檔修改的時間戳進行追蹤。 在作業執行期間,當前應用程序或者外部程序不能修改緩存文件。
distributedCache可以分發簡單的只讀數據或文本文件,也可以分發復雜類型的文件例如歸檔文件和jar文件。歸檔文件(zip,tar,tgz和tar.gz文件)在slave節點上會被解檔(un-archived)。 這些文件可以設置執行權限。
用戶可以通過設置mapred.cache.{files|archives}來分發文件。 如果要分發多個文件,可以使用逗號分隔文件所在路徑。也可以利用API來設置該屬性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通過命令行選項 -cacheFile/-cacheArchive 分發文件。
用戶可以通過 DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當前工作目錄下創建到緩存文件的符號鏈接。 或者通過設置配置文件屬性mapred.create.symlink為yes。 分布式緩存會截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當前工作目錄會有名為lib.so的鏈接, 它會鏈接分布式緩存中的lib.so.1。
DistributedCache可在map/reduce任務中作為 一種基礎軟件分發機制使用。它可以被用于分發jar包和本地庫(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用于 緩存文件和jar包,并把它們加入子jvm的classpath。也可以通過設置配置文檔里的屬性 mapred.job.classpath.{files|archives}達到相同的效果。緩存文件可用于分發和裝載本地庫。
Tool
Tool 接口支持處理常用的Hadoop命令行選項。
Tool 是Map/Reduce工具或應用的標準。應用程序應只處理其定制參數, 要把標準命令行選項通過 ToolRunner.run(Tool, String[]) 委托給 GenericOptionsParser處理。
Hadoop命令行的常用選項有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>
IsolationRunner
IsolationRunner 是幫助調試Map/Reduce程序的工具。
使用IsolationRunner的方法是,首先設置 keep.failed.tasks.files屬性為true (同時參考keep.tasks.files.pattern)。
然后,登錄到任務運行失敗的節點上,進入 TaskTracker的本地路徑運行 IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
IsolationRunner會把失敗的任務放在單獨的一個能夠調試的jvm上運行,并且采用和之前完全一樣的輸入數據。
Profiling
Profiling是一個工具,它使用內置的java profiler工具進行分析獲得(2-3個)map或reduce樣例運行分析報告。
用戶可以通過設置屬性mapred.task.profile指定系統是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改屬性值。如果設為true, 則開啟profiling功能。profiler信息保存在用戶日志目錄下。缺省情況,profiling功能是關閉的。
如果用戶設定使用profiling功能,可以使用配置文檔里的屬性 mapred.task.profile.{maps|reduces} 設置要profile map/reduce task的范圍。設置該屬性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范圍的缺省值是0-2。
用戶可以通過設定配置文檔里的屬性mapred.task.profile.params 來指定profiler配置參數。修改屬性要使用api JobConf.setProfileParams(String)。當運行task時,如果字符串包含%s。 它會被替換成profileing的輸出文件名。這些參數會在命令行里傳遞到子JVM中。缺省的profiling 參數是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
調試
Map/Reduce框架能夠運行用戶提供的用于調試的腳本程序。 當map/reduce任務失敗時,用戶可以通過運行腳本在任務日志(例如任務的標準輸出、標準錯誤、系統日志以及作業配置文件)上做后續處理工作。用戶提供的調試腳本程序的標準輸出和標準錯誤會輸出為診斷文件。如果需要的話這些輸出結果也可以打印在用戶界面上。
在接下來的章節,我們討論如何與作業一起提交調試腳本。為了提交調試腳本, 首先要把這個腳本分發出去,而且還要在配置文件里設置。
如何分發腳本文件:
用戶要用 DistributedCache 機制來分發和鏈接腳本文件
如何提交腳本:
一個快速提交調試腳本的方法是分別為需要調試的map任務和reduce任務設置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 屬性的值。這些屬性也可以通過 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API來設置。對于streaming, 可以分別為需要調試的map任務和reduce任務使用命令行選項-mapdebug 和 -reducedegug來提交調試腳本。
腳本的參數是任務的標準輸出、標準錯誤、系統日志以及作業配置文件。在運行map/reduce失敗的節點上運行調試命令是:
$script $stdout $stderr $syslog $jobconf
Pipes 程序根據第五個參數獲得c++程序名。 因此調試pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program
默認行為
對于pipes,默認的腳本會用gdb處理core dump, 打印 stack trace并且給出正在運行線程的信息。
JobControl
JobControl是一個工具,它封裝了一組Map/Reduce作業以及他們之間的依賴關系。
數據壓縮
Hadoop Map/Reduce框架為應用程序的寫入文件操作提供壓縮工具,這些工具可以為map輸出的中間數據和作業最終輸出數據(例如reduce的輸出)提供支持。它還附帶了一些 CompressionCodec的實現,比如實現了 zlib和lzo壓縮算法。 Hadoop同樣支持gzip文件格式。
考慮到性能問題(zlib)以及Java類庫的缺失(lzo)等因素,Hadoop也為上述壓縮解壓算法提供本地庫的實現。更多的細節請參考 這里。
中間輸出
應用程序可以通過 JobConf.setCompressMapOutput(boolean)api控制map輸出的中間結果,并且可以通過 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。
作業輸出
應用程序可以通過 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制輸出是否需要壓縮并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。
如果作業輸出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,來設定 SequenceFile.CompressionType(i.e. RECORD / BLOCK - 默認是RECORD)。
轉自:http://blog.csdn.net/larrylgq/article/details/7581359