MapReduce中combine、partition、shuffle的作用是什么

jopen 8年前發布 | 18K 次閱讀 分布式/云計算/大數據

http://www.aboutyun.com/thread-8927-1-1.html


Mapreduce在hadoop中是一個比較難以的概念。下面需要用心看,然后自己就能總結出來了。

概括:</span>
combine和partition都是函數,中間的步驟應該只有shuffle!

1.combine
combine分為map端和reduce端,作用是把同一個key的鍵值對合并在一起,可以自定義的。
combine函數把一個map函數產生的<key,value>對(多個key,value)合并成一個新的<key2,value2>.將新的<key2,value2>作為輸入到reduce函數中
這個value2亦可稱之為values,因為有多個。這個合并的目的是為了減少網絡傳輸。

具體實現是由Combine類。
實現combine函數,該類的主要功能是合并相同的key鍵,通過job.setCombinerClass()方法設置,默認為null,不合并中間結果。實現map函數
具體調用:(下圖是調用reduce,合并map的個數)

難點:不知道這個reduce和mapreduce中的reduce區別是什么?</span>
下面簡單說一下:后面慢慢琢磨:
在mapreduce中,map多,reduce少。
在reduce中由于數據量比較多,所以干脆,我們先把自己map里面的數據歸類,這樣到了reduce的時候就減輕了壓力。</span>

這里舉個例子:
map與reduce的例子
map理解為銷售人員,reduce理解為銷售經理。
每個人(map)只管銷售,賺了多少錢銷售人員不統計,也就是說這個銷售人員沒有Combine,那么這個銷售經理就累垮了,因為每個人都沒有統計,它需要統計所有人員賣了多少件賺錢了多少錢。</span>
這樣是不行的,所以銷售經理(reduce)為了減輕壓力,每個人(map)都必須統計自己賣了多少錢,賺了多少錢(</span>Combine),</span>然后經理所做的事情就是統計每個人統計之后的結果。這樣經理就輕松多了。所以Combine在map所做的事情,減輕了reduce的事情。
(這就是為什么說map的Combine干reduce的事情,相信你應該明白了)</span>

public  static void main(String[] args)throws IOException {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setCombinerClass(reduce.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputFormatClass(TextOutFormat.class);
    }
}
</div>


2.partition
partition是分割map每個節點的結果,按照key分別映射給不同的reduce,也是可以自定義的。這里其實可以理解歸類。
我們對于錯綜復雜的數據歸類。比如在動物園里有牛羊雞鴨鵝,他們都是混在一起的,但是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的作用就是把這些數據歸類。只不過在寫程序的時候,mapreduce使用哈希</span></span>HashPartitioner</span></span>幫我們歸類了。這個我們也可以自定義。

        HashPartitioner是mapreduce的默認partitioner。計算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到當前的目的reducer。
</div>
下面在看該如何自定義,該如何調用:(下面便是自定義了一個Partition函數,紅字部分是算法的核心,也就是分區的核心

public static class Partition extends Partitioner<IntWritable, IntWritable> {
                @Override
                public int getPartition(IntWritable key, IntWritable value,
                                int numPartitions) {
                        int Maxnumber = 65223;
                        int bound = Maxnumber / numPartitions + 1;
                        int keynumber = key.get();
                        for (int i = 0; i < numPartitions; i++) {
                                if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
                                        return i - 1;
                                }

                        }
                        return 0;
                }

        }
</div>
那么我們該如何調用:(下面調用之后,你的分區函數就生效了)

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}
</div>


3.shuffle

shuffle就是map和reduce之間的過程,包含了兩端的combine和partition。它比較難以理解,因為我們摸不著,看不到它,它只是理論存在的,而且確實存在,它屬于mapreduce的框架,編程的時候,我們用不到它,它屬于mapreduce框架。詳細可以看通過實例讓你真正明白mapreduce---填空式、分布(分割)編程

3.1shuffle的作用是
Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作后,通過OutputFormat,進行輸出
shuffle階段的主要函數是fetchOutputs(),這個函數的功能就是將map階段的輸出,copy到reduce 節點本地。

</td> </tr> </tbody> </table>

來自: http://blog.csdn.net//mrcharles/article/details/50458637

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
  • sesese色