MapReduce程序reduce輸出控制

openkk 12年前發布 | 53K 次閱讀 MapReduce 分布式/云計算/大數據

1,在hadoop中,reduce支持多個輸出,輸出的文件名也是可控的,就是繼承MultipleTextOutputFormat類,重寫generateFileNameForKey方法

public class LzoHandleLogMr extends Configured implements Tool {

 static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {


    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        try {
            String[] sp = value.toString().split(",");
            output.collect(new Text(sp[0]), value);
        }catch (Exception e) {
           e.printStackTrace();
        }       
    }


}
static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {



    @Override
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, NullWritable> output, Reporter reporter)
            throws IOException {
        while (values.hasNext()) {
              output.collect(values.next(), NullWritable.get());   
           }

    }   
}

public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable> 
   {


    @Override
    protected String generateFileNameForKeyValue(Text key,
            NullWritable value, String name) {
        String sp[] = key.toString().split(",");
        String filename = sp[0];
        if(sp[0].contains(".")) filename="000000000000";
        return filename;
    }

}



@Override
public int run(String[] args) throws Exception {

        JobConf jobconf = new JobConf(LzoHandleLogMr.class);
        jobconf.setMapperClass(LzoHandleLogMapper.class);
        jobconf.setReducerClass(LzoHandleLogReducer.class);
        jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);
        jobconf.setOutputKeyClass(Text.class);
        jobconf.setNumReduceTasks(12);


     FileInputFormat.setInputPaths(jobconf,new Path(args[0]));
        FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));
        FileOutputFormat.setCompressOutput(jobconf, true);
        FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);  

        JobClient.runJob(jobconf);
      return 0;

}

}</pre></span></span>

在新版本的hadoopAPI是通過Job類來設置各種參數的,但是我調用 Job.setOutputFormatClass()來使用MultipleTextOutputFormat的時候,竟然報錯,原因是必須繼承子org.apache.hadoop.mapreduce.OutputFormat。0.20.2比較致命的其中一個bug, 升級到0.21能解決

 

2, 如果同一行數據,需要同時輸出至多個文件的話,我們可以使用MultipleOutputs類:

    public class MultiFile extends Confi gured implements Tool {  
        public static class MapClass extends MapReduceBase  
            implements Mapper<LongWritable, Text, NullWritable, Text> {  
                private MultipleOutputs mos;  

                private OutputCollector<NullWritable, Text> collector;  
                public void confi gure(JobConf conf) {  
                    mos = new MultipleOutputs(conf);  
                }  

                public void map(LongWritable key, Text value,  
                        OutputCollector<NullWritable, Text> output,  
                        Reporter reporter) throws IOException {  
                    String[] arr = value.toString().split(",", -1);  
                    String chrono = arr[0] + "," + arr[1] + "," + arr[2];  
                    String geo = arr[0] + "," + arr[4] + "," + arr[5];  
                    collector = mos.getCollector("chrono", reporter);  
                    collector.collect(NullWritable.get(), new Text(chrono));  
                    collector = mos.getCollector("geo", reporter);  
                    collector.collect(NullWritable.get(), new Text(geo));  
                }  

                public void close() throws IOException {  
                    mos.close();  
                }  
        }  

        public int run(String[] args) throws Exception {  
            Confi guration conf = getConf();  
            JobConf job = new JobConf(conf, MultiFile.class);  
            Path in = new Path(args[0]);  
            Path out = new Path(args[1]);  
            FileInputFormat.setInputPaths(job, in);  
            FileOutputFormat.setOutputPath(job, out);  
            job.setJobName("MultiFile");  
            job.setMapperClass(MapClass.class);  
            job.setInputFormat(TextInputFormat.class);  
            job.setOutputKeyClass(NullWritable.class);  
            job.setOutputValueClass(Text.class);  
            job.setNumReduceTasks(0);  
            MultipleOutputs.addNamedOutput(job,  
                    "chrono",  
                    TextOutputFormat.class,  
                    NullWritable.class,  
                    Text.class);  
            MultipleOutputs.addNamedOutput(job,  
                    "geo",  
                    TextOutputFormat.class,  
                    NullWritable.class,  
                    Text.class);  
            JobClient.runJob(job);  
            return 0;  
        }  
    }  

這個類維護了一個<name, OutputCollector>的map。我們可以在job配置里添加collector,然后在reduce方法中,取得對應的collector并調用collector.write即可。

轉自:http://blog.csdn.net/liuzhoulong/article/details/7294397

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!