mapreduce實現全局排序
直接附代碼,說明都在源碼里了。
package com.hadoop.totalsort; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; public class SamplerInputFormat extends FileInputFormat<Text, Text> { static final String PARTITION_FILENAME = "_partition.lst"; static final String SAMPLE_SIZE = "terasort.partitions.sample"; private static JobConf lastConf = null; private static InputSplit[] lastResult = null; static class TextSampler implements IndexedSortable { public ArrayList<Text> records = new ArrayList<Text>(); public int compare(int arg0, int arg1) { Text right = records.get(arg0); Text left = records.get(arg1); return right.compareTo(left); } public void swap(int arg0, int arg1) { Text right = records.get(arg0); Text left = records.get(arg1); records.set(arg0, left); records.set(arg1, right); } public void addKey(Text key) { records.add(new Text(key)); } //將采集出來的key數據排序 public Text[] createPartitions(int numPartitions) { int numRecords = records.size(); if (numPartitions > numRecords) { throw new IllegalArgumentException("Requested more partitions than input keys (" + numPartitions + " > " + numRecords + ")"); } new QuickSort().sort(this, 0, records.size()); float stepSize = numRecords / (float) numPartitions; //采集的時候應該是采了100條記錄,從10個分片查找的,此處再取numPartitions-1條 Text[] result = new Text[numPartitions - 1]; for (int i = 1; i < numPartitions; ++i) { result[i - 1] = records.get(Math.round(stepSize * i)); } return result; } } public static void writePartitionFile(JobConf conf, Path partFile) throws IOException { //前段代碼從分片中采集數據,通過sampler.addKey存入TextSampler中的records數組 SamplerInputFormat inputFormat = new SamplerInputFormat(); TextSampler sampler = new TextSampler(); Text key = new Text(); Text value = new Text(); int partitions = conf.getNumReduceTasks(); // Reducer任務的個數 long sampleSize = conf.getLong(SAMPLE_SIZE, 100); // 采集數據-鍵值對的個數 InputSplit[] splits = inputFormat.getSplits(conf, conf.getNumMapTasks());// 獲得數據分片 int samples = Math.min(10, splits.length);// 采集分片的個數 ,采集10個分片 long recordsPerSample = sampleSize / samples;// 每個分片采集的鍵值對個數 int sampleStep = splits.length / samples; // 采集分片的步長 ,總的分片個數/要采集的分片個數 long records = 0; for (int i = 0; i < samples; i++) { //1...10分片數 RecordReader<Text, Text> reader = inputFormat.getRecordReader(splits[sampleStep * i], conf, null); while (reader.next(key, value)) { sampler.addKey(key); //將key值增加到sampler的records數組 records += 1; if ((i + 1) * recordsPerSample <= records) { //目的是均勻采集各分片的條數,比如采集到第5個分片,那么記錄條數應該小于5個分片應該的條數 break; } } } FileSystem outFs = partFile.getFileSystem(conf); if (outFs.exists(partFile)) { outFs.delete(partFile, false); } SequenceFile.Writer writer = SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class); NullWritable nullValue = NullWritable.get(); for (Text split : sampler.createPartitions(partitions)) { //調用createPartitions方法,排序采集出來的數據,并取partitions條 writer.append(split, nullValue); } writer.close(); } static class TeraRecordReader implements RecordReader<Text, Text> { private LineRecordReader in; private LongWritable junk = new LongWritable(); private Text line = new Text(); private static int KEY_LENGTH = 10; public TeraRecordReader(Configuration job, FileSplit split) throws IOException { in = new LineRecordReader(job, split); } public void close() throws IOException { in.close(); } public Text createKey() { // TODO Auto-generated method stub return new Text(); } public Text createValue() { return new Text(); } public long getPos() throws IOException { // TODO Auto-generated method stub return in.getPos(); } public float getProgress() throws IOException { // TODO Auto-generated method stub return in.getProgress(); } public boolean next(Text arg0, Text arg1) throws IOException { if (in.next(junk, line)) { //調用父類方法,將value值賦給key // if (line.getLength() < KEY_LENGTH) { arg0.set(line); arg1.clear(); // } else { // byte[] bytes = line.getBytes(); // 默認知道讀取要比較值的前10個字節 作為key // // 后面的字節作為value; // arg0.set(bytes, 0, KEY_LENGTH); // arg1.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH); // } return true; } else { return false; } } } @Override public InputSplit[] getSplits(JobConf conf, int splits) throws IOException { if (conf == lastConf) { return lastResult; } lastConf = conf; lastResult = super.getSplits(lastConf, splits); return lastResult; } public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException { return new TeraRecordReader(arg1, (FileSplit) arg0); } }
package com.hadoop.totalsort; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SamplerSort extends Configured implements Tool { // 自定義的Partitioner public static class TotalOrderPartitioner implements Partitioner<Text, Text> { private Text[] splitPoints; public TotalOrderPartitioner() { } public int getPartition(Text arg0, Text arg1, int arg2) { // TODO Auto-generated method stub return findPartition(arg0); } public void configure(JobConf arg0) { try { FileSystem fs = FileSystem.getLocal(arg0); Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME); splitPoints = readPartitions(fs, partFile, arg0); // 讀取采集文件 } catch (IOException ie) { throw new IllegalArgumentException("can't read paritions file", ie); } } public int findPartition(Text key) // 分配可以到多個reduce { int len = splitPoints.length; for (int i = 0; i < len; i++) { int res = key.compareTo(splitPoints[i]); if (res > 0 && i < len - 1) { continue; } else if (res == 0) { return i; } else if (res < 0) { return i; } else if (res > 0 && i == len - 1) { return i + 1; } } return 0; } private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException { SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job); List<Text> parts = new ArrayList<Text>(); Text key = new Text(); NullWritable value = NullWritable.get(); while (reader.next(key, value)) { parts.add(key); } reader.close(); return parts.toArray(new Text[parts.size()]); } } public int run(String[] args) throws Exception { JobConf job = (JobConf) getConf(); // job.set(name, value); Path inputDir = new Path(args[0]); inputDir = inputDir.makeQualified(inputDir.getFileSystem(job)); Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME); URI partitionUri = new URI(partitionFile.toString() + "#" + SamplerInputFormat.PARTITION_FILENAME); SamplerInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJobName("SamplerTotalSort"); job.setJarByClass(SamplerSort.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormat(SamplerInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setNumReduceTasks(4); SamplerInputFormat.writePartitionFile(job, partitionFile); // 數據采集并寫入文件 DistributedCache.addCacheFile(partitionUri, job); // 將這個文件作為共享文件 DistributedCache.createSymlink(job); // SamplerInputFormat.setFinalSync(job, true); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new JobConf(), new SamplerSort(), args); System.exit(res); } }
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!