MapReduce實例淺析
在文章《 MapReduce原理與設計思想》中,詳細剖析了MapReduce的原理,這篇文章則通過實例重點剖析MapReduce
1.MapReduce概述
Hadoop Map/Reduce是一個使用簡易的軟件框架,基于它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯的方式并行處理上T級別的數據集。
一個Map/Reduce 作業(job) 通常會把輸入的數據集切分為若干獨立的數據塊,由 map任務(task)以完全并行的方式處理它們。框架會對map的輸出先進行排序, 然后把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。 整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。
通常,Map/Reduce框架和分布式文件系統是運行在一組相同的節點上的,也就是說,計算節點和存儲節點通常在一起。這種配置允許框架在那些已經存好數據的節點上高效地調度任務,這可以使整個集群的網絡帶寬被非常高效地利用。
Map/Reduce框架由一個單獨的master JobTracker 和每個集群節點一個slave TaskTracker共同組成。master負責調度構成一個作業的所有任務,這些任務分布在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。
應用程序至少應該指明輸入/輸出的位置(路徑),并通過實現合適的接口或抽象類提供map和reduce函數。再加上其他作業的參數,就構成了作業配置(job configuration)。然后,Hadoop的 job client提交作業(jar包/可執行程序等)和配置信息給JobTracker,后者負責分發這些軟件和配置信息給slave、調度任務并監控它們的執行,同時提供狀態和診斷信息給job-client。
雖然Hadoop框架是用Java實現的,但Map/Reduce應用程序則不一定要用 Java來寫 。
2.樣例分析:單詞計數
1、WordCount源碼分析
單詞計數是最簡單也是最能體現MapReduce思想的程序之一,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到
單詞計數主要完成的功能是:統計一系列文本文件中每個單詞出現的次數,如圖所示:
(1)Map過程
Map過程需要繼承org.apache.hadoop.mapreduce包中的Mapper類,并重寫map方法
通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼,可以發現map方法中的value值存儲的是文本文件中的一行(以回車符作為行結束標記),而key值為該行的首字符相對于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成一個個的單詞,并將<word,1>作為map方法的結果輸出,其余的工作都交由MapReduce框架處理。其中IntWritable和Text類是Hadoop對int和string類的封裝,這些類能夠被串行化,以方便在分布式環境中進行數據交換。
TokenizerMapper的實現代碼如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("key = " + key.toString());//添加查看key值 System.out.println("value = " + value.toString());//添加查看value值 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }
}</pre>
(2)Reduce過程
Reduce過程需要繼承org.apache.hadoop.mapreduce包中的Reducer類,并重寫reduce方法
reduce方法的輸入參數key為單個單詞,而values是由各Mapper上對應單詞的計數值所組成的列表,所以只要遍歷values并求和,即可得到某個單詞的出現總次數
IntSumReduce類的實現代碼如下:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result);
} }</pre>
(3)執行MapReduce任務
在MapReduce中,由Job對象負責管理和運行一個計算任務,并通過Job的一些方法對任務的參數進行相關的設置。此處設置了使用TokenizerMapper完成Map過程和使用的IntSumReduce完成Combine和Reduce過程。還設置了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。任務的輸入和輸出路徑則由命令行參數指定,并由FileInputFormat和FileOutputFormat分別設定。完成相應任務的參數設定后,即可調用job.waitForCompletion()方法執行任務,主函數實現如下:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(wordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }運行結果如下:
14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_0001
14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 05:53:26 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680
key = 0
value = Hello World
key = 12
value = Bye World
14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output
14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.
14/12/17 05:53:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output
key = 0
value = Hello Hadoop
key = 13
value = Bye Hadoop
14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments
14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out
14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.
14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100%
14/12/17 05:53:27 INFO mapred.JobClient: Job complete: job_local_0001
14/12/17 05:53:27 INFO mapred.JobClient: Counters: 14
14/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters
14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932
14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431
14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4
14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6
14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4
14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4
14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12
14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78
14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8
14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=62、WordCount處理過程
上面給出了WordCount的設計思路和源碼,但是沒有深入細節,下面對WordCount進行更加詳細的分析:
(1)將文件拆分成splits,由于測試用的文件較小,所以每一個文件為一個split,并將文件按行分割成<key, value>對,如圖,這一步由Mapreduce框架自動完成,其中偏移量包括了回車所占的字符
(2)將分割好的<key, value>對交給用戶定義的map方法進行處理,生成新的<key, value>對
(3)得到map方法輸出的<key, value>對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key值相同的value值累加,得到Mapper的最終輸出結果,如圖:
(4)Reduce先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的<key, value>對,并作為WordCount的輸出結果,如圖:
3.MapReduce,你夠了解嗎?
MapReduce框架在幕后默默地完成了很多的事情,如果不重寫map和reduce方法,會出現什么情況呢?
下面來實現一個簡化的MapReduce,新建一個LazyMapReduce,該類只對任務進行必要的初始化及輸入/輸出路徑的設置,其余的參數均保持默認
代碼如下:
public class LazyMapReduce { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount<in><out>"); System.exit(2); } Job job = new Job(conf, "LazyMapReduce"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); } }運行結果為:
14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%
14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output
14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0
14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/17 23:04:19 INFO mapred.LocalJobRunner:
14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.
14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output
14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0
14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments
14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out
14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce
14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.
14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%
14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001
14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14
14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters
14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040
14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471
14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808
14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132
14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework
14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3
14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0
14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4
14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4
14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8
14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78
14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0
14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4
14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4可見在默認情況下,MapReduce原封不動地將輸入<key, value>寫到輸出
下面介紹MapReduce的部分參數及其默認設置:
(1)InputFormat類
該類的作用是將輸入的數據分割成一個個的split,并將split進一步拆分成<key, value>對作為map函數的輸入
(2)Mapper類
實現map函數,根據輸入的<key, value>對生產中間結果
(3)Combiner
實現combine函數,合并中間結果中具有相同key值的鍵值對。
(4)Partitioner類
實現getPartition函數,用于在Shuffle過程按照key值將中間數據分成R份,每一份由一個Reduce負責
(5)Reducer類
實現reduce函數,將中間結果合并,得到最終的結果
(6)OutputFormat類
該類負責輸出最終的結果
上面的代碼可以改寫為:
public class LazyMapReduce { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount<in><out>"); System.exit(2); } Job job = new Job(conf, "LazyMapReduce"); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class);job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(FileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); }
}</pre>
不過由于版本問題,顯示有些類已經過時
參考資料
《實戰Hadop:開啟通向云計算的捷徑.劉鵬》
http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html
</div>
原文出處: codingwu