MapReduce-Hadoop分布式計算模型
MapReduce概述
MapReduce是一種分布式計算模型,由Google提出,主要用于搜索領域,解決海量數據的計算問題。
MR由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數,即可實現分布式計算,非常簡單。這兩個函數的形參是key、value對,表示函數的輸入信息。
MapReduce實現原理
1.1 讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成key、value對。每一個鍵值對調用次map函數。
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3 對輸出的key、value進行分區。
1.4 對不同分區的數據,按照key進行排序、分組。相同key的value放到一個集合中。
1.5 (可選)分組后的數據進行歸約。
2.reduce任務處理
2.1 對多個map任務的輸出,按照不同的分區,通過網絡copy到不同的reduce節點。
2.2 對多個map任務的輸出進行合并、排序。寫reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
2.3 把reduce的輸出保存到文件中。
序列化
在MapReduce中,序列化是一個很重要的步驟。
序列化就是把結構化的對象轉化為字節流。
反序列化就是把字節流轉回結構化對象。
hadoop中的Partitioner分區
Hadoop中的MapReduce支持對key進行分區,從而可以使map出來的數據均勻分布在reduce上。框架自帶了一個默認的分區類,HashPartitioner,先看看這個類,就知道怎么自定義key分區了,
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/* Use {@link Object#hashCode()} to partition. /
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}</pre>
先解釋一下這個HashPartitioner做的事情,
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
將key均勻分布在ReduceTasks上,舉例如果Key為Text的話,Text的hashcode方法跟String的基本一致,都是采用 的Horner公式計算,得到一個int,string太大的話這個int值可能會溢出變成負數,所以與上Integer.MAX_VALUE(即 0111111111111111),然后再對reduce個數取余,這樣就可以讓key均勻分布在reduce上。
實現分區的步驟:
- 先分析一下具體的業務邏輯,確定大概有多少個分區
- 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
- 重寫public int getPartition這個方法,根據具體邏輯,讀數據庫或者配置返回相同的數字
- 在main方法中設置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
- 設置Reducer的數量,job.setNumReduceTasks(6);
</ol>
舉例, public static class ProviderPartitioner extends Partitioner<Text, DataBean> {
@Override
public int getPartition(Text key, DataBean value, int arg2) {
String account = key.toString();
String sub_acc = account.substring(0,3);
Integer code = 0;
if(sub_acc.equals("aaa")){
code = 1;
}
return code;
}
}</pre><br />
MapReduce中的Combiners編程
每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的數據量。
combiner最基本是實現本地key的歸并,combiner具有類似本地的reduce功能。
如果不用combiner,那么,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。
注意:Combiner的輸出是Reducer的輸入,如果Combiner是可插拔的,添加Combiner絕不能改變最終的計算結果。所以 Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值 等。
舉例: public static class Combine extends Reducer<Text,Text,Text,Text> {
// Reduce Method
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text value : values) {
String fields[] = value.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new Text(sum+","+count));
}
}</pre>
在main方法中設置Combiner的類,
1
job.setCombinerClass(Combine.class);
</tr>
</tbody>
</table>
Shuffle-MapReduce的核心
首先讓我們看一下下面這張圖,
ddd
Mapper處理過程:
- 一個輸入切片對應一個Mapper, 也就是一個Mapper任務讀取文件的一部分;
- 每一個Mapper都會對應一個環形緩沖區,用來存儲Mapper的輸出,默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個后臺線程把內容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件;
- 在寫入磁盤之前要對數據進行分區、排序;
- 等最后記錄寫完,合并全部溢出寫文件為一個分區且排序的文件。
</ol>
Reducer處理過程:
- Reducer通過Http方式得到輸出文件的分區,每個Reducer會取相對應分區的數據;
- Reducer取到數據之后,首先會進行排序,之后合并過的數據會再一此進行排序;
- 排序階段合并map輸出,然后走Reduce階段。
</ol>
總結
MapReduce是一種編程模型,用于大規模數據集(大于1TB)的并行運算。它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統上。
轉自:http://longliqiang88.github.io/2015/07/12/MapReduce-Hadoop%E5%88%86%E5%B8%83%E5%BC%8F%E8%AE%A1%E7%AE%97%E6%A8%A1%E5%9E%8B/
本文由用戶 nbd2 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
相關資訊
sesese色
