MapReduce初級案例
1、數據去重
"數據去重"主要是為了掌握和利用并行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日志中計算訪問地等這些看似龐雜的任務都會涉及數據去重。下面就進入這個實例的MapReduce程序設計。
1.1 實例描述
對數據文件中的數據進行去重。數據文件中的每行都是一個數據。
樣例輸入如下所示:
1)file1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2)file2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
樣例輸出如下所示:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
1.2 設計思路
        數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。我們自然而然會想到將同一個數據的所有記錄都交給一臺reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以數據作為key,而對value-list則沒有要求。當reduce接收到一個
        在MapReduce流程中,map的輸出
1.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Dedup {
//map將輸入中的value復制到輸出數據的key上,并直接輸出
public static class Map extends Mapper
private static Text line=new Text();//每行數據
//實現map函數
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
line=value;
context.write(line, new Text(""));
}
}
//reduce將輸入中的key復制到輸出數據的key上,并直接輸出
public static class Reduce extends Reducer
{ //實現reduce函數
public void reduce(Text key,Iterable
values,Context context) throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs=new String[]{"dedup_in","dedup_out"};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Deduplication
"); System.exit(2);
}
Job job = new Job(conf, "Data Deduplication");
job.setJarByClass(Dedup.class);
//設置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
//設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"dedup_in"文件夾(備注:"dedup_out"不需要創建。)如圖1.4-1所示,已經成功創建。
圖1.4-1 創建"dedup_in" 圖1.4.2 上傳"file*.txt"
然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/dedup_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖1.4-2所示,成功上傳之后。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。
查看兩個文件的內容如圖1.4-3所示:
圖1.4-3 文件"file*.txt"內容
2)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"dedup_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。
圖1.4-4 運行結果
此時,你可以對比一下和我們之前預期的結果是否一致。
2、數據排序
"數據排序"是許多實際任務執行時要完成的第一項工作,比如學生成績評比、數據建立索引等。這個實例和數據去重類似,都是先對原始數據進行初步處理,為進一步的數據操作打好基礎。下面進入這個示例。
2.1 實例描述
對輸入文件中數據進行排序。輸入文件中的每行內容均為一個數字,即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始數據在原始數據集中的位次,第二個代表原始數據。
樣例輸入:
1)file1:
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
樣例輸出:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
2.2 設計思路
這個實例僅僅要求對輸入數據進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個默認的排序,而不需要自己再實現具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的默認排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數字大小對key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序對字符串排序。
        了解了這個細節,我們就知道應該使用封裝int的IntWritable型數據結構了。也就是在map中將讀入的數據轉化成 IntWritable型,然后作為key值輸出(value任意)。reduce拿到
2.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
//map將輸入中的value化成IntWritable類型,作為輸出的key
public static class Map extends
Mapper
private static IntWritable data=new IntWritable();
//實現map函數
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
String line=value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
//reduce將輸入中的key復制到輸出數據的key上,
//然后根據輸入的value-list中元素的個數決定key的輸出次數
//用全局linenum來代表key的位次
public static class Reduce extends
Reducer
{ 
private static IntWritable linenum = new IntWritable(1);
//實現reduce函數
public void reduce(IntWritable key,Iterable
values,Context context) throws IOException,InterruptedException{
for(IntWritable val:values){
context.write(linenum, key);
linenum = new IntWritable(linenum.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs=new String[]{"sort_in","sort_out"};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Sort
"); System.exit(2);
}
Job job = new Job(conf, "Data Sort");
job.setJarByClass(Sort.class);
//設置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設置輸出類型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"sort_in"文件夾(備注:"sort_out"不需要創建。)如圖2.4-1所示,已經成功創建。
圖2.4-1 創建"sort_in" 圖2.4.2 上傳"file*.txt"
然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/sort_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖2.4-2所示,成功上傳之后。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
查看兩個文件的內容如圖2.4-3所示:
圖2.4-3 文件"file*.txt"內容
2)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"sort_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。
圖2.4-4 運行結果
3、平均成績
"平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該實例主要就是實現一個計算學生平均成績的例子。
3.1 實例描述
對輸入文件中數據進行就算學生平均成績。輸入文件中的每行內容均為一個學生的姓名和他相應的成績,如果有多門學科,則每門學科為一個文件。要求在輸出中每行有兩個間隔的數據,其中,第一個代表學生的姓名,第二個代表其平均成績。
樣本輸入:
1)math:
張三 88
李四 99
王五 66
趙六 77
2)china:
張三 78
李四 89
王五 96
趙六 67
3)english:
張三 80
李四 82
王五 84
趙六 86
樣本輸出:
張三 82
李四 90
王五 82
趙六 76
3.2 設計思路
計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程序的流程。程序包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。
    Map處理的是一個純文本文件, 文件中存放的數據時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的數據是由InputFormat分解過的數據集,其中 InputFormat的作用是將數據集切割成小數據集InputSplit,每一個InputSlit將由一個Mapper負責處理。此 外,InputFormat中還提供了一個RecordReader的實現,并將一個InputSplit解析成
Map的結果會通過partion分發到Reducer,Reducer做完Reduce操作后,將通過以格式OutputFormat輸出。
    Mapper最終處理的結果對
3.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Score {
public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
// 實現map函數
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 將輸入的純文本文件的數據轉化成String
String line = value.toString();
// 將輸入的數據首先按行進行分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
// 分別對每一行進行處理
while (tokenizerArticle.hasMoreElements()) {
// 每行按空格劃分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String strName = tokenizerLine.nextToken();// 學生姓名部分
String strScore = tokenizerLine.nextToken();// 成績部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
// 輸出姓名和成績
context.write(name, new IntWritable(scoreInt));
}
}
}
public static class Reduce extends
Reducer
{ // 實現reduce函數
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator
iterator = values.iterator(); while (iterator.hasNext()) {
sum += iterator.next().get();// 計算總分
count++;// 統計總的科目數
}
int average = (int) sum / count;// 計算平均成績
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[] { "score_in", "score_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Score Average
"); System.exit(2);
}
Job job = new Job(conf, "Score Average");
job.setJarByClass(Score.class);
// 設置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 將輸入的數據集分割成小數據塊splites,提供一個RecordReder的實現
job.setInputFormatClass(TextInputFormat.class);
// 提供一個RecordWriter的實現,負責數據輸出
job.setOutputFormatClass(TextOutputFormat.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"score_in"文件夾(備注:"score_out"不需要創建。)如圖3.4-1所示,已經成功創建。
圖3.4-1 創建"score_in" 圖3.4.2 上傳三門分數
然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/score_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖3.4-2所示,成功上傳之后。
備注:文本文件的編碼為"UTF-8",默認為"ANSI",可以另存為時選擇,不然中文會出現亂碼。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
查看三個文件的內容如圖3.4-3所示:
圖3.4.3 三門成績的內容
2)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"score_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖3.4-4所示。
圖3.4-4 運行結果
4、單表關聯
前面的實例都是在數據上進行一些簡單的處理,為進一步的操作打基礎。"單表關聯"這個實例要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘。下面進入這個實例。
4.1 實例描述
實例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。
樣例輸入如下所示。
file:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
家族樹狀關系譜:
圖4.2-1 家族譜
樣例輸出如下所示。
file:
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
4.2 設計思路
分析這個實例,顯然需要進行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個表。
連接結果中除去連接的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自連接;其次就是連接列的設置;最后是結果的整理。
考慮到MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結果的key設置成待連接的列,然后列中相同的值就自然會連接在一起了。再與最開始的分析聯系起來:
要連接的是左表的parent列和右表的child列,且左表和右表是同一個表,所以在map階段將讀入數據分割成child和parent之后,會將parent設置成key,child設置成value進行輸出,并作為左表;再將同一對child和parent中的child設置成key,parent設置成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表。這樣在map的結果中就形成了左表和右表,然后在shuffle過程中完成連接。reduce接收到連接的結果,其中每個key的value-list就包含了"grandchild--grandparent"關系。取出每個key的value-list進行解析,將左表中的child放入一個數組,右表中的parent放入一個數組,然后對兩個數組求笛卡爾積就是最后的結果了。
4.3 程序代碼
程序代碼如下所示。
package com.hebut.mr;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class STjoin {
public static int time = 0;
/*
* map將輸出分割child和parent,然后正序輸出一次作為右表,
* 反序輸出一次作為左表,需要注意的是在輸出的value中必須
* 加上左右表的區別標識。
*/
public static class Map extends Mapper
// 實現map函數
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String childname = new String();// 孩子名稱
String parentname = new String();// 父母名稱
String relationtype = new String();// 左右表標識
// 輸入的一行預處理文本
StringTokenizer itr=new StringTokenizer(value.toString());
String[] values=new String[2];
int i=0;
while(itr.hasMoreTokens()){
values[i]=itr.nextToken();
i++;
}
if (values[0].compareTo("child") != 0) {
childname = values[0];
parentname = values[1];
// 輸出左表
relationtype = "1";
context.write(new Text(values[1]), new Text(relationtype +
"+"+ childname + "+" + parentname));
// 輸出右表
relationtype = "2";
context.write(new Text(values[0]), new Text(relationtype +
"+"+ childname + "+" + parentname));
}
}
}
public static class Reduce extends Reducer
{ 
// 實現reduce函數
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
// 輸出表頭
if (0 == time) {
context.write(new Text("grandchild"), new Text("grandparent"));
time++;
}
int grandchildnum = 0;
String[] grandchild = new String[10];
int grandparentnum = 0;
String[] grandparent = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
// 取得左右表標識
char relationtype = record.charAt(0);
// 定義孩子和父母變量
String childname = new String();
String parentname = new String();
// 獲取value-list中value的child
while (record.charAt(i) != '+') {
childname += record.charAt(i);
i++;
}
i = i + 1;
// 獲取value-list中value的parent
while (i < len) {
parentname += record.charAt(i);
i++;
}
// 左表,取出child放入grandchildren
if ('1' == relationtype) {
grandchild[grandchildnum] = childname;
grandchildnum++;
}
// 右表,取出parent放入grandparent
if ('2' == relationtype) {
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
}
// grandchild和grandparent數組求笛卡爾兒積
if (0 != grandchildnum && 0 != grandparentnum) {
for (int m = 0; m < grandchildnum; m++) {
for (int n = 0; n < grandparentnum; n++) {
// 輸出結果
context.write(new Text(grandchild[m]), new Text(grandparent[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[] { "STjoin_in", "STjoin_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Single Table Join
"); System.exit(2);
}
Job job = new Job(conf, "Single Table Join");
job.setJarByClass(STjoin.class);
// 設置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"STjoin_in"文件夾(備注:"STjoin_out"不需要創建。)如圖4.4-1所示,已經成功創建。
圖4.4-1 創建"STjoin_in" 圖4.4.2 上傳"child-parent"表
然后在本地建立一個txt文件,通過Eclipse上傳到"/user/hadoop/STjoin_in"文件夾中,一個txt文件的內容如"實例描述"那個文件一樣。如圖4.4-2所示,成功上傳之后。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的文件,顯示其內容如圖4.4-3所示:
圖4.4-3 表"child-parent"內容
2)運行詳解
(1)Map處理:
map函數輸出結果如下所示。
child parent àà 忽略此行
Tom Lucy àà
Tom Jack àà
Jone Lucy àà
Jone Jack àà
Lucy Mary àà
Lucy Ben àà
Jack Alice àà
Jack Jesse àà
Terry Alice àà
Terry Jesse àà
Philip Terry àà
Philip Alma àà
Mark Terry àà
Mark Alma àà
(2)Shuffle處理
在shuffle過程中完成連接。
| map函數輸出 | 排序結果 | shuffle連接 | 
| ,2+Mark+Terry> ,2+Mark+Alma> | ,2+Mark+Terry> ,2+Mark+Alma> | 1+Terry+Alice , 1+Philip+Alma, 1+Mark+Alma > 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > 1+Terry+Jesse > 2+Jone+Jack> 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> 2+Mark+Terry, 2+Mark+Alma> 2+Philip+Alma> 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> 2+Tom+Jack> | 
(3)Reduce處理
首先由語句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中沒有左表或者右表,則不會做處理,可以根據這條規則去除無效的shuffle連接。
| 無效的shuffle連接 | 有效的shuffle連接 | 
| 1+Terry+Alice , 1+Philip+Alma, 1+Mark+Alma > 1+Terry+Jesse > 2+Jone+Jack> 2+Mark+Terry, 2+Mark+Alma> 2+Philip+Alma> 2+Tom+Jack> | 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> | 
然后根據下面語句進一步對有效的shuffle連接做處理。
// 左表,取出child放入grandchildren
if ('1' == relationtype) {
grandchild[grandchildnum] = childname;
grandchildnum++;
}
// 右表,取出parent放入grandparent
if ('2' == relationtype) {
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
針對一條數據進行分析:
1+Jone+Jack,
2+Jack+Alice,
2+Jack+Jesse >
    分析結果:左表用"字符1"表示,右表用"字符2"表示,上面的
根據上面針對左表與右表不同的處理規則,取得兩個數組的數據如下所示:
| grandchild | Tom、Jone(grandchild[grandchildnum] = childname;) | 
| grandparent | Alice、Jesse(grandparent[grandparentnum] = parentname;) | 
然后根據下面語句進行處理。
for (int m = 0; m < grandchildnum; m++) {
for (int n = 0; n < grandparentnum; n++) {
context.write(new Text(grandchild[m]), new Text(grandparent[n]));
}
}
處理結果如下面所示:
| Tom Jesse Tom Alice Jone Jesse Jone Alice | 
其他的有效shuffle連接處理都是如此。
3)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"STjoin_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖4.4-4所示。
圖4.4-4 運行結果
5、多表關聯
多表關聯和單表關聯類似,它也是通過對原始數據進行一定的處理,從其中挖掘出關心的信息。下面進入這個實例。
5.1 實例描述
輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,輸出"工廠名——地址名"表。
樣例輸入如下所示。
1)factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
2)address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
樣例輸出如下所示。
factoryname addressname
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
5.2 設計思路
多表關聯和單表關聯相似,都類似于數據庫中的自然連接。相比單表關聯,多表關聯的左右表和連接列更加清楚。所以可以采用和單表關聯的相同的處理方式,map識別出輸入的行屬于哪個表之后,對其進行分割,將連接的列值保存在key中,另一列和左右表標識保存在value中,然后輸出。reduce拿到連接結果之后,解析value內容,根據標志將左右表內容分開存放,然后求笛卡爾積,最后直接輸出。
這個實例的具體分析參考單表關聯實例。下面給出代碼。
5.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MTjoin {
public static int time = 0;
/*
* 在map中先區分輸入行屬于左表還是右表,然后對兩列值進行分割,
* 保存連接列在key值,剩余列和左右表標志在value中,最后輸出
*/
public static class Map extends Mapper
// 實現map函數
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();// 每行文件
String relationtype = new String();// 左右表標識
// 輸入文件首行,不處理
if (line.contains("factoryname") == true
|| line.contains("addressed") == true) {
return;
}
// 輸入的一行預處理文本
StringTokenizer itr = new StringTokenizer(line);
String mapkey = new String();
String mapvalue = new String();
int i = 0;
while (itr.hasMoreTokens()) {
// 先讀取一個單詞
String token = itr.nextToken();
// 判斷該地址ID就把存到"values[0]"
if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
mapkey = token;
if (i > 0) {
relationtype = "1";
} else {
relationtype = "2";
}
continue;
}
// 存工廠名
mapvalue += token + " ";
i++;
}
// 輸出左右表
context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));
}
}
/*
* reduce解析map輸出,將value中數據按照左右表分別保存,
* 然后求出笛卡爾積,并輸出。
*/
public static class Reduce extends Reducer
{ 
// 實現reduce函數
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
// 輸出表頭
if (0 == time) {
context.write(new Text("factoryname"), new Text("addressname"));
time++;
}
int factorynum = 0;
String[] factory = new String[10];
int addressnum = 0;
String[] address = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
// 取得左右表標識
char relationtype = record.charAt(0);
// 左表
if ('1' == relationtype) {
factory[factorynum] = record.substring(i);
factorynum++;
}
// 右表
if ('2' == relationtype) {
address[addressnum] = record.substring(i);
addressnum++;
}
}
// 求笛卡爾積
if (0 != factorynum && 0 != addressnum) {
for (int m = 0; m < factorynum; m++) {
for (int n = 0; n < addressnum; n++) {
// 輸出結果
context.write(new Text(factory[m]),
new Text(address[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[] { "MTjoin_in", "MTjoin_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Multiple Table Join
"); System.exit(2);
}
Job job = new Job(conf, "Multiple Table Join");
job.setJarByClass(MTjoin.class);
// 設置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"MTjoin_in"文件夾(備注:"MTjoin_out"不需要創建。)如圖5.4-1所示,已經成功創建。
圖5.4-1 創建"MTjoin_in" 圖5.4.2 上傳兩個數據表
然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/MTjoin_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖5.4-2所示,成功上傳之后。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。
圖5.4.3 兩個數據表的內容
2)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"MTjoin_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖5.4-4所示。
圖5.4-4 運行結果
6、倒排索引
"倒排索引"是文檔檢索系統中最常用的數據結構,被廣泛地應用于全文搜索引擎。它主要是用來存儲某個單詞(或詞組)在一個文檔或一組文檔中的存儲位置的映射,即提供了一種根據內容來查找文檔的方式。由于不是根據文檔來確定文檔所包含的內容,而是進行相反的操作,因而稱為倒排索引(Inverted Index)。
6.1 實例描述
通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的ID號,或者是指文檔所在位置的URL,如圖6.1-1所示。
圖6.1-1 倒排索引結構
從圖6.1-1可以看出,單詞1出現在{文檔1,文檔4,文檔13,……}中,單詞2出現在{文檔3,文檔5,文檔15,……}中,而單詞3出現在{文檔1,文檔8,文檔20,……}中。在實際應用中,還需要給每個文檔添加一個權值,用來指出每個文檔與搜索內容的相關度,如圖6.1-2所示。
圖6.1-2 添加權重的倒排索引
最常用的是使用詞頻作為權重, 即記錄單詞在文檔中出現的次數。以英文為例,如圖6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"這個單詞在文本 T0中出現過1次,T1中出現過1次,T2中出現過2次。當搜索條件為"MapReduce"、"is"、"Simple"時,對應的集合為: {T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文檔T0和T1包含了所要索引的單詞,而且只有T0是連續的。
圖6.1-3 倒排索引示例
更復雜的權重還可能要記錄單詞在多少個文檔中出現過,以實現TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考慮單詞在文檔中的位置信息(單詞是否出現在標題中,反映了單詞在文檔中的重要性)等。
樣例輸入如下所示。
1)file1:
MapReduce is simple
2)file2:
MapReduce is powerful is simple
3)file3:
Hello MapReduce bye MapReduce
樣例輸出如下所示。
MapReduce file1.txt:1;file2.txt:1;file3.txt:2;
is file1.txt:1;file2.txt:2;
simple file1.txt:1;file2.txt:1;
powerful file2.txt:1;
Hello file3.txt:1;
bye file3.txt:1;
6.2 設計思路
實現"倒排索引"只要關注的信息為:單詞、文檔URL及詞頻,如圖3-11所示。但是在實現過程中,索引文件的格式與圖6.1-3會略有所不同,以避免重寫OutPutFormat類。下面根據MapReduce的處理過程給出倒排索引的設計思路。
1)Map過程
    首先使用默認的TextInputFormat類對輸入文件進行處理,得到文本中每行的偏移量及其內容。顯然,Map過程首先必須分析輸入的
圖6.2-1 Map過程輸入/輸出
        這里存在兩個問題:第一,
這里講單詞和URL組成key值(如"MapReduce:file1.txt"),將詞頻作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文檔的相同單詞的詞頻組成列表,傳遞給Combine過程,實現類似于WordCount的功能。
2)Combine過程
經過map方法處理后,Combine過程將key值相同的value值累加,得到一個單詞在文檔在文檔中的詞頻,如圖6.2-2所示。如果直接將圖6.2-2所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值無法保證這一點,所以必須修改key值和value值。這次將單詞作為key值,URL和詞頻組成value值(如"file1.txt:1")。這樣做的好處是可以利用MapReduce框架默認的HashPartitioner類完成Shuffle過程,將相同單詞的所有記錄發送給同一個Reducer進行處理。
圖6.2-2 Combine過程輸入/輸出
3)Reduce過程
經過上述兩個過程后,Reduce過程只需將相同key值的value值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給MapReduce框架進行處理了。如圖6.2-3所示。索引文件的內容除分隔符外與圖6.1-3解釋相同。
4)需要解決的問題
本實例設計的倒排索引在文件數目上沒有限制,但是單詞文件不宜過大(具體值與默認HDFS塊大小及相關配置有關),要保證每個文件對應一個split。否則,由于Reduce過程沒有進一步統計詞頻,最終結果可能會出現詞頻未統計完全的單詞。可以通過重寫InputFormat類將每個文件為一個split,避免上述情況。或者執行兩次MapReduce,第一次MapReduce用于統計詞頻,第二次MapReduce用于生成倒排索引。除此之外,還可以利用復合鍵值對等實現包含更多信息的倒排索引。
圖6.2-3 Reduce過程輸入/輸出
6.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class Map extends Mapper
private Text keyInfo = new Text(); // 存儲單詞和URL組合
private Text valueInfo = new Text(); // 存儲詞頻
private FileSplit split; // 存儲Split對象
// 實現map函數
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 獲得
對所屬的FileSplit對象 split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// key值由單詞和URL組成,如"MapReduce:file1.txt"
// 獲取文件的完整路徑
// keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
// 這里為了好看,只獲取文件的名稱。
int splitIndex = split.getPath().toString().indexOf("file");
keyInfo.set(itr.nextToken() + ":"
+ split.getPath().toString().substring(splitIndex));
// 詞頻初始化為1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class Combine extends Reducer
{ 
private Text info = new Text();
// 實現reduce函數
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
// 統計詞頻
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新設置value值由URL和詞頻組成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新設置key值為單詞
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
public static class Reduce extends Reducer
{ 
private Text result = new Text();
// 實現reduce函數
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
// 生成文檔列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關鍵
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[] { "index_in", "index_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Inverted Index
"); System.exit(2);
}
Job job = new Job(conf, "Inverted Index");
job.setJarByClass(InvertedIndex.class);
// 設置Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
// 設置Map輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 設置Reduce輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6.4 代碼結果
1)準備測試數據
通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"index_in"文件夾(備注:"index_out"不需要創建。)如圖6.4-1所示,已經成功創建。
圖6.4-1 創建"index_in" 圖6.4.2 上傳"file*.txt"
然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/index_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖6.4-2所示,成功上傳之后。
從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
圖6.4.3 三個"file*.txt"的內容
2)查看運行結果
這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"index_out"文件夾,且里面有3個文件,然后打開雙 其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖6.4-4所示。
圖6.4-4 運行結果


































