Hadoop 實現多文件輸出

jopen 11年前發布 | 26K 次閱讀 Hadoop 分布式/云計算/大數據

比如word.txt內容如下:

aaa bbb aba abc

bba bbd bbbc

cc ccd cce

要求按單詞的首字母區分單詞并分文件輸出

代碼如下:

LineRecordWriter

package com.hadoop.multi;

import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class LineRecordWriter<K, V> extends RecordWriter<K, V> {

private static final String utf8 = "UTF-8";

private static final byte[] newline;

static {
    try {
        newline = "n".getBytes(utf8);
    } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8
                + " encoding");
    }
}

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
    this.out = out;
    try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
    } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8
                + " encoding");
    }
}

public LineRecordWriter(DataOutputStream out) {
    this(out, "t");
}

private void writeObject(Object o) throws IOException {
    if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
    } else {
        out.write(o.toString().getBytes(utf8));
    }
}

public synchronized void write(K key, V value) throws IOException {
    boolean nullKey = key == null || key instanceof NullWritable;
    boolean nullValue = value == null || value instanceof NullWritable;
    if (nullKey && nullValue) {
        return;
    }
    if (!nullKey) {
        writeObject(key);
    }
    if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
    }
    if (!nullValue) {
        writeObject(value);
    }
    out.write(newline);
}

public synchronized void close(TaskAttemptContext context)
        throws IOException {
    out.close();
}

}</pre>


MultipleOutputFormat

package com.hadoop.multi;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> {

private MultiRecordWriter writer = null; 

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,   
       InterruptedException {   
    if (writer == null) {   
        writer = new MultiRecordWriter(job, getTaskOutputPath(job));   
    }   
    return writer;   
}

private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {   
    Path workPath = null;   
    OutputCommitter committer = super.getOutputCommitter(conf);   
    if (committer instanceof FileOutputCommitter) {   
        workPath = ((FileOutputCommitter) committer).getWorkPath();   
    } else {   
        Path outputPath = super.getOutputPath(conf);   
        if (outputPath == null) {   
            throw new IOException("Undefined job output-path");   
        }   
        workPath = outputPath;   
    }   
    return workPath;   
} 

protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);   

public class MultiRecordWriter extends RecordWriter<K, V> {   

    private HashMap<String, RecordWriter<K, V>> recordWriters = null;   
    private TaskAttemptContext job = null;   

    private Path workPath = null;   
    public MultiRecordWriter(TaskAttemptContext job, Path workPath) {   
        super();   
        this.job = job;   
        this.workPath = workPath;   
        recordWriters = new HashMap<String, RecordWriter<K, V>>();   
    }   
    @Override   
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {   
        Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();   
        while (values.hasNext()) {   
            values.next().close(context);   
        }   
        this.recordWriters.clear();   
    }   
    @Override   
    public void write(K key, V value) throws IOException, InterruptedException {   
        //得到輸出文件名   
        String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());   
        RecordWriter<K, V> rw = this.recordWriters.get(baseName);   
        if (rw == null) {   
            rw = getBaseRecordWriter(job, baseName);   
            this.recordWriters.put(baseName, rw);   
        }   
        rw.write(key, value);   
    }   

    private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)   
            throws IOException, InterruptedException {   
        Configuration conf = job.getConfiguration();   
        boolean isCompressed = getCompressOutput(job);   
        String keyValueSeparator = ",";   
        RecordWriter<K, V> recordWriter = null;   
        if (isCompressed) {   
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,   
                    GzipCodec.class);   
            CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);   
            Path file = new Path(workPath, baseName + codec.getDefaultExtension());   
            FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
            recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec   
                    .createOutputStream(fileOut)), keyValueSeparator);   
        } else {   
            Path file = new Path(workPath, baseName);   
            FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
            recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);   
        }   
        return recordWriter;   
    }   
}   

}</pre>


MultiFileOutPut

package com.hadoop.multi;

import java.io.IOException; import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.hadoop.multi.MultipleOutputFormat;

public class MultiFileOutPut {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

}

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

}

public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> {
@Override
protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) {
char c = key.toString().toLowerCase().charAt(0);
if (c >= 'a' && c <= 'z') {
return c + ".txt";
}
return "other.txt";
}
}

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(MultiFileOutPut.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(AlphabetOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }</pre>來自:http://blog.csdn.net/zyuc_wangxw/article/details/9304461

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!