MapReduce-Hadoop分布式計算模型

nbd2 10年前發布 | 38K 次閱讀 Hadoop 分布式/云計算/大數據

MapReduce概述

MapReduce是一種分布式計算模型,由Google提出,主要用于搜索領域,解決海量數據的計算問題。

MR由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數,即可實現分布式計算,非常簡單。這兩個函數的形參是key、value對,表示函數的輸入信息。

MapReduce實現原理

 MapReduce-Hadoop分布式計算模型 ddd
執行步驟:
1.map任務處理

1.1 讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成key、value對。每一個鍵值對調用次map函數。

1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。

1.3 對輸出的key、value進行分區。

1.4 對不同分區的數據,按照key進行排序、分組。相同key的value放到一個集合中。

1.5 (可選)分組后的數據進行歸約。

2.reduce任務處理

2.1 對多個map任務的輸出,按照不同的分區,通過網絡copy到不同的reduce節點。

2.2 對多個map任務的輸出進行合并、排序。寫reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。

2.3 把reduce的輸出保存到文件中。

序列化

在MapReduce中,序列化是一個很重要的步驟。

序列化就是把結構化的對象轉化為字節流。

反序列化就是把字節流轉回結構化對象。

hadoop中的Partitioner分區

Hadoop中的MapReduce支持對key進行分區,從而可以使map出來的數據均勻分布在reduce上。
框架自帶了一個默認的分區類,HashPartitioner,先看看這個類,就知道怎么自定義key分區了,
public class HashPartitioner<K, V> extends Partitioner<K, V> {

/* Use {@link Object#hashCode()} to partition. / public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }

}</pre>
先解釋一下這個HashPartitioner做的事情,

(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

將key均勻分布在ReduceTasks上,舉例如果Key為Text的話,Text的hashcode方法跟String的基本一致,都是采用 的Horner公式計算,得到一個int,string太大的話這個int值可能會溢出變成負數,所以與上Integer.MAX_VALUE(即 0111111111111111),然后再對reduce個數取余,這樣就可以讓key均勻分布在reduce上。

實現分區的步驟:

  1. 先分析一下具體的業務邏輯,確定大概有多少個分區
  2. 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
  3. 重寫public int getPartition這個方法,根據具體邏輯,讀數據庫或者配置返回相同的數字
  4. 在main方法中設置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
  5. 設置Reducer的數量,job.setNumReduceTasks(6);
  6. </ol> 舉例,
    public static class ProviderPartitioner extends Partitioner<Text, DataBean> {

        @Override
        public int getPartition(Text key, DataBean value, int arg2) {
    
            String account = key.toString();
            String sub_acc = account.substring(0,3);
            Integer code = 0;
            if(sub_acc.equals("aaa")){
    
                code = 1;
            }
            return code;
        }   
    }</pre><br />
    

    MapReduce中的Combiners編程

    每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的數據量。

    combiner最基本是實現本地key的歸并,combiner具有類似本地的reduce功能。

    如果不用combiner,那么,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。

    注意:Combiner的輸出是Reducer的輸入,如果Combiner是可插拔的,添加Combiner絕不能改變最終的計算結果。所以 Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值 等。
    舉例:
    public static class Combine extends Reducer<Text,Text,Text,Text> {

       // Reduce Method  
       public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
           double sum = 0;  
           int count = 0;  
           for (Text value : values) {  
               String fields[] = value.toString().split(",");  
               sum += Double.parseDouble(fields[0]);  
               count += Integer.parseInt(fields[1]);  
           }  
           context.write(key, new Text(sum+","+count));  
       }  
    

    }</pre>

    在main方法中設置Combiner的類,

    </tr> </tbody> </table>

    Shuffle-MapReduce的核心

    首先讓我們看一下下面這張圖,
     MapReduce-Hadoop分布式計算模型 ddd
    Mapper處理過程:

    1. 一個輸入切片對應一個Mapper, 也就是一個Mapper任務讀取文件的一部分;
    2. 每一個Mapper都會對應一個環形緩沖區,用來存儲Mapper的輸出,默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個后臺線程把內容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件;
    3. 在寫入磁盤之前要對數據進行分區、排序;
    4. 等最后記錄寫完,合并全部溢出寫文件為一個分區且排序的文件。
    5. </ol>

      Reducer處理過程:

      1. Reducer通過Http方式得到輸出文件的分區,每個Reducer會取相對應分區的數據;
      2. Reducer取到數據之后,首先會進行排序,之后合并過的數據會再一此進行排序;
      3. 排序階段合并map輸出,然后走Reduce階段。
      4. </ol>

        總結

        MapReduce是一種編程模型,用于大規模數據集(大于1TB)的并行運算。它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統上。

        轉自:http://longliqiang88.github.io/2015/07/12/MapReduce-Hadoop%E5%88%86%E5%B8%83%E5%BC%8F%E8%AE%A1%E7%AE%97%E6%A8%A1%E5%9E%8B/

         本文由用戶 nbd2 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
         轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
         本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
    1 
    job.setCombinerClass(Combine.class); 
sesese色