MapReduce的組合式,迭代式,鏈式

jopen 10年前發布 | 16K 次閱讀 MapReduce 分布式/云計算/大數據

1.比如我們輸出的mapreduce結果,需要進入下一個mapreduce,該怎么解決?可以使用迭代式 2.那么什么是迭代式? 3.什么是依賴式? 4.什么是鏈式? 5.三種模式各自的應用場景是什么?

1.迭代式mapreduce
        一些復雜的任務難以用一次MapReduce處理完成,需要多次 MapReduce 才能完成任務,例如Pagrank,K-means算法都需要多次的迭代,關于 MapReduce 迭代在Mahout中運用較多。有興趣的可以參考一下Mahout的源碼。
             在MapReduce的迭代思想,類似for循環,前一個 MapReduce的輸出結果,作為下一個 MapReduce的輸入,任務完成后中間結果都可以刪除。
        代碼示例:

Configuration conf1 = new Configuration();

Job job1 = new Job(conf1,"job1");

.....

FileInputFormat.addInputPath(job1,InputPaht1);

FileOutputFromat.setOoutputPath(job1,Outpath1);

job1.waitForCompletion(true);

//sub Mapreduce

Configuration conf2 = new Configuration();

Job job2 = new Job(conf1,"job1");

.....

FileInputFormat.addInputPath(job2,Outpath1);

FileOutputFromat.setOoutputPath(job2,Outpath2);

job2.waitForCompletion(true);

//sub Mapreduce

Configuration conf3 = new Configuration();

Job job3 = new Job(conf1,"job1");

.....

FileInputFormat.addInputPath(job3,Outpath2);

FileOutputFromat.setOoutputPath(job3,Outpath3);

job3.waitForCompletion(true);

.....

 

關鍵點:
上面滿眼的代碼,下面列出關鍵代碼:
第一個job的輸出路徑為Outpath1
FileOutputFromat.setOoutputPath(job1,Outpath1);

第二個job的輸入路徑為Outpath1,輸出路徑為Outpath2
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOoutputPath(job2,Outpath2);

第三個job的輸入路徑為Outpath2
FileInputFormat.addInputPath(job3,Outpath2);
換句話說:第一個job的輸出路徑為第二個job的輸入路徑,以此類推。

上面采用的是一種直線式,那么他們能不能更省事,成為循環樣式
成為循環式,是可以的,但是有不少需要解決的問題:這里只是舉例,你可能還會碰到其它問題。
1.需要他們的key,value等值是完全一致的,也就是說兩個job或則說job之間必須是一致的。
2.輸入輸出路徑需要區分等。


2.依賴關系組合式MapReduce

我們可以設想一下MapReduce有3個子任務job1,job2,job3構成,其中job1和job2相互獨立,job3要在job1和job2完成之后才執行。這種關系就叫復雜數據依賴關系的組合時mapreduce。hadoop為這種組合關系提供了一種執行和控制機制,hadoop通過job和 jobControl類提供具體的編程方法。Job除了維護子任務的配置信息,還維護子任務的依賴關系,而jobControl控制整個作業流程,把所有的子任務作業加入到JobControl中,執行JobControl的run()方法即可運行程序。

下面給出偽代碼:

Configuration job1conf = new Configuration();

Job job1 = new Job(job1conf,"Job1");

.........//job1 其他設置

Configuration job2conf = new Configuration();

Job job2 = new Job(job2conf,"Job2");

.........//job2 其他設置

Configuration job3conf = new Configuration();

Job job3 = new Job(job3conf,"Job3");

.........//job3 其他設置

job3.addDepending(job1);//設置job3和job1的依賴關系

job3.addDepending(job2);

JobControl JC = new JobControl("123");

JC.addJob(job1);//把三個job加入到jobcontorl中

JC.addJob(job2);

JC.addJob(job3);

JC.run();

 

關鍵點:
下面代碼:addDepending()這個函數,作用的是建立兩個job之間的依賴關系。那么如何建立,看下面兩行

 

//下面面代碼的作用是設置job3和job1的依賴關系

job3.addDepending(job1);

//下面代碼的作用是設置job3和job2的依賴關系

job3.addDepending(job2);

 

建立依賴關系之后,還有一步驟,也就是還有一個類需要我們了解JobControl,通過這個類來控制他們之間的依賴關系。如何做到,通過下面代碼:

 

//實例化

JobControl JC = new JobControl("123");

//把三個job加入到jobcontorl中

JC.addJob(job1);

JC.addJob(job2);

JC.addJob(job3);

JC.run();

 

3.鏈式MapReduce


首先看一下例子,來說明為什么要有鏈式MapReduce,假設在統計單詞是,會出現這樣的詞,make,made,making等,他們都屬于一個詞,在單詞累加的時候,都歸于一個詞。解決的方法為用一個單獨的Mapreduce任務可以實現,單增加了多個Mapreduce作業,將增加整個作業處理的周期,還增加了I/O操作,因而處理效率不高。

一個較好的辦法就是在核心的MapReduce之外,增加一個輔助的Map過程,然后將這個輔助的Map過程和核心的Mapreudce過程合并為一個鏈式的Mapreduce,從而完成整個作業。hadoop提供了專門的鏈式 ChainMapper和ChainReducer來處理鏈式任務,ChainMapper允許一個Map任務中添加多個Map的子任務,ChainReducer可以在Reducer執行之后,在加入多個Map的子任務。其調用形式如下:

 

ChainMapper.addMapper(...);

    ChainReducer.addMapper(...);

    //addMapper()調用的方法形式如下:

    public static void addMapper(JOb job,

            Class<? extends Mapper> mclass,

            Class<?> inputKeyClass,

            Class<?> inputValueClass,

            Class<?> outputKeyClass,

            Class<?> outputValueClass,

            Configuration conf

    )

其中,ChainReducer專門提供了一個setRreducer()方法來設置整個作業唯一的Reducer。


note:這些Mapper和Reducer之間傳遞的鍵和值都必須保持一致。


下面舉個例子:用ChainMapper把Map1加如并執行,然后用ChainReducer把Reduce和Map2加入到Reduce過程中。代碼如下:Map1.class 要實現map方法


public void function throws IOException {

        Configuration conf = new Configuration();

        Job job = new Job(conf);

        job.setJobName("ChianJOb");

        // 在ChainMapper里面添加Map1

        Configuration map1conf = new Configuration(false);

        ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,

                Text.class, Text.class, true, map1conf);

        // 在ChainReduce中加入Reducer,Map2;

        Configuration reduceConf = new Configuration(false);

        ChainReducer.setReducer(job, Reduce.class, LongWritable.class,

                Text.class, Text.class, Text.class, true, map1conf);

        Configuration map2Conf = new Configuration();

        ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,

                Text.class, Text.class, true, map1conf);

        job.waitForCompletion(true);

    }


關鍵點:

鏈式,那么什么是鏈式,鏈式是mapreduce中存在多個map.

那么是怎么實現的?通過鏈式ChainMapper和ChainReducer實現。


ChainMapper允許一個Map任務中添加多個Map的子任務,ChainReducer可以在Reducer執行之后,在加入多個Map的子任務。

下面為ChainMapper、ChainReducer:的具體實現。



ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,

Text.class, Text.class, true, map1conf);

ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,

Text.class, Text.class, true, map1conf);

 

hadoop本身就不適合做迭代運算,所以在實際運用中,應適當優化程序,減少MR迭代次數。如需進行大量迭代性工作,建議使用spark。

</div>

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!