用Hadoop構建電影推薦系統
Hadoop家族系列文章,主要介紹Hadoop家族產品,常用的項目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的項目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。
從2011年開始,中國進入大數據風起云涌的時代,以 Hadoop為代表的家族軟件,占據了大數據處理的廣闊地盤。開源界及廠商,所有數據軟件,無一不向Hadoop靠攏。Hadoop也從小眾的高富帥領域,變成了大數據開發的標準。在Hadoop原有技術基礎之上,出現了Hadoop家族產品,通過“大數據”概念不斷創新,推出科技進步。
作為IT界的開發人員,我們也要跟上節奏,抓住機遇,跟著Hadoop一起雄起!
關于作者:
- 張丹(Conan), 程序員Java,R,PHP,Javascript
- weibo:@Conan_Z
- blog: http://blog.fens.me
- email: bsspirit@gmail.com
轉載請注明出處:
http://blog.fens.me/hadoop-mapreduce-recommend/
前言
Netflix電影推薦的百萬美金比賽,把“推薦”變成了時下最熱門的數據挖掘算法之一。也正是由于Netflix的比賽,讓企業界和學科界有了更深層次的技術碰撞。引發了各種網站“推薦”熱,個性時代已經到來。
目錄
- 推薦系統概述
- 需求分析:推薦系統指標設計
- 算法模型:Hadoop并行算法
- 架構設計:推薦系統架構
- 程序開發:MapReduce程序實現
1. 推薦系統概述
電子商務網站是個性化推薦系統重要地應用的領域之一,亞馬遜就是個性化推薦系統的積極應用者和推廣者,亞馬遜的推薦系統深入到網站的各類商品,為亞馬遜帶來了至少30%的銷售額。
不光是電商類,推薦系統無處不在。QQ,人人網的好友推薦;新浪微博的你可能感覺興趣的人;優酷,土豆的電影推薦;豆瓣的圖書推薦;大從點評的餐飲推薦;世紀佳緣的相親推薦;天際網的職業推薦等。
推薦算法分類:
按數據使用劃分:
- 協同過濾算法:UserCF, ItemCF, ModelCF
- 基于內容的推薦: 用戶內容屬性和物品內容屬性
- 社會化過濾:基于用戶的社會網絡關系
按模型劃分:
- 最近鄰模型:基于距離的協同過濾算法
- Latent Factor Mode(SVD):基于矩陣分解的模型
- Graph:圖模型,社會網絡圖模型
基于用戶的協同過濾算法UserCF
基于用戶的協同過濾,通過不同用戶對物品的評分來評測用戶之間的相似性,基于用戶之間的相似性做出推薦。簡單來講就是:給用戶推薦和他興趣相似的其他用戶喜歡的物品。
用例說明:
算法實現及使用介紹,請參考文章:Mahout推薦算法API詳解
基于物品的協同過濾算法ItemCF
基于item的協同過濾,通過用戶對不同item的評分來評測item之間的相似性,基于item之間的相似性做出推薦。簡單來講就是:給用戶推薦和他之前喜歡的物品相似的物品。
用例說明:
算法實現及使用介紹,請參考文章:Mahout推薦算法API詳解
注:基于物品的協同過濾算法,是目前商用最廣泛的推薦算法。
協同過濾算法實現,分為2個步驟
- 1. 計算物品之間的相似度
- 2. 根據物品的相似度和用戶的歷史行為給用戶生成推薦列表
有關協同過濾的另一篇文章,請參考:RHadoop實踐系列之三 R實現MapReduce的協同過濾算法
2. 需求分析:推薦系統指標設計
下面我們將從一個公司案例出發來全面的解釋,如何進行推薦系統指標設計。
案例介紹
Netflix電影推薦百萬獎金比賽,http://www.netflixprize.com/
Netflix官方網站:www.netflix.com
Netflix,2006年組織比賽是的時候,是一家以在線電影租賃為生的公司。他們根據網友對電影的打分來判斷用戶有可能喜歡什么電影,并結合會員看過的電影以及口味偏好設置做出判斷,混搭出各種電影風格的需求。
收集會員的一些信息,為他們指定個性化的電影推薦后,有許多冷門電影竟然進入了候租榜單。從公司的電影資源成本方面考量,熱門電影的成本一般較高,如果Netflix公司能夠在電影租賃中增加冷門電影的比例,自然能夠提升自身盈利能力。
Netflix公司曾宣稱60%左右的會員根據推薦名單定制租賃順序,如果推薦系統不能準確地猜測會員喜歡的電影類型,容易造成多次租借冷門電影而并不符合個人口味的會員流失。為了更高效地為會員推薦電影,Netflix一直致力于不斷改進和完善個性化推薦服務,在2006年推出百萬美元大獎,無論是誰能最好地優化Netflix推薦算法就可獲獎勵 100萬美元。到2009年,獎金被一個7人開發小組奪得,Netflix隨后又立即推出第二個百萬美金懸賞。這充分說明一套好的推薦算法系統是多么重要,同時又是多么困難。
上圖為比賽的各支隊伍的排名!
補充說明:
- 1. Netflix的比賽是基于靜態數據的,就是給定“訓練級”,匹配“結果集”,“結果集”也是提前就做好的,所以這與我們每天運營的系統,其實是不一樣的。
- 2. Netflix用于比賽的數據集是小量的,整個全集才666MB,而實際的推薦系統都要基于大量歷史數據的,動不動就會上GB,TB等
Netflix數據下載
部分訓練集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.train_.gz
部分結果集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.validate.gz
完整數據集:http://www.lifecrunch.biz/wp-content/uploads/2011/04/nf_prize_dataset.tar.gz
所以,我們在真實的環境中設計推薦的時候,要全面考量數據量,算法性能,結果準確度等的指標。
- 推薦算法選型:基于物品的協同過濾算法ItemCF,并行實現
- 數據量:基于Hadoop架構,支持GB,TB,PB級數據量
- 算法檢驗:可以通過 準確率,召回率,覆蓋率,流行度 等指標評判。
- 結果解讀:通過ItemCF的定義,合理給出結果解釋
3. 算法模型:Hadoop并行算法
這里我使用”Mahout In Action”書里,第一章第六節介紹的分步式基于物品的協同過濾算法進行實現。Chapter 6: Distributing recommendation computations
測試數據集:small.csv
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.0
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0 每行3個字段,依次是用戶ID,電影ID,用戶對電影的評分(0-5分,每0.5為一個評分點!)
算法的思想:
- 1. 建立物品的同現矩陣
- 2. 建立用戶對物品的評分矩陣
- 3. 矩陣計算推薦結果
1). 建立物品的同現矩陣
按用戶分組,找到每個用戶所選的物品,單獨出現計數及兩兩一組計數。
[101] [102] [103] [104] [105] [106] [107]
[101] 5 3 4 4 2 2 1
[102] 3 3 3 2 1 1 0
[103] 4 3 4 3 1 2 0
[104] 4 2 3 4 2 2 1
[105] 2 1 1 2 2 1 1
[106] 2 1 2 2 1 2 0
[107] 1 0 0 1 1 0 1 2). 建立用戶對物品的評分矩陣
按用戶分組,找到每個用戶所選的物品及評分
U3
[101] 2.0
[102] 0.0
[103] 0.0
[104] 4.0
[105] 4.5
[106] 0.0
[107] 5.0 3). 矩陣計算推薦結果
同現矩陣*評分矩陣=推薦結果
圖片摘自”Mahout In Action”
MapReduce任務設計
圖片摘自”Mahout In Action”
解讀MapRduce任務:
- 步驟1: 按用戶分組,計算所有物品出現的組合列表,得到用戶對物品的評分矩陣
- 步驟2: 對物品組合列表進行計數,建立物品的同現矩陣
- 步驟3: 合并同現矩陣和評分矩陣
- 步驟4: 計算推薦結果列表
4. 架構設計:推薦系統架構
上圖中,左邊是Application業務系統,右邊是Hadoop的HDFS, MapReduce。
- 業務系統記錄了用戶的行為和對物品的打分
- 設置系統定時器CRON,每xx小時,增量向HDFS導入數據(userid,itemid,value,time)。
- 完成導入后,設置系統定時器,啟動MapReduce程序,運行推薦算法。
- 完成計算后,設置系統定時器,從HDFS導出推薦結果數據到數據庫,方便以后的及時查詢。
5. 程序開發:MapReduce程序實現
win7的開發環境 和 Hadoop的運行環境 ,請參考文章:用Maven構建Hadoop項目
新建Java類:
- Recommend.java,主任務啟動程序
- Step1.java,按用戶分組,計算所有物品出現的組合列表,得到用戶對物品的評分矩陣
- Step2.java,對物品組合列表進行計數,建立物品的同現矩陣
- Step3.java,合并同現矩陣和評分矩陣
- Step4.java,計算推薦結果列表
- HdfsDAO.java,HDFS操作工具類
1). Recommend.java,主任務啟動程序
源代碼:
package org.conan.myhadoop.recommend;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
public class Recommend {
public static final String HDFS = "hdfs://192.168.1.210:9000";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
public static void main(String[] args) throws Exception {
Map<String, String> path = new HashMap<String, String>();
path.put("data", "logfile/small.csv");
path.put("Step1Input", HDFS + "/user/hdfs/recommend");
path.put("Step1Output", path.get("Step1Input") + "/step1");
path.put("Step2Input", path.get("Step1Output"));
path.put("Step2Output", path.get("Step1Input") + "/step2");
path.put("Step3Input1", path.get("Step1Output"));
path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
path.put("Step3Input2", path.get("Step2Output"));
path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
path.put("Step4Input1", path.get("Step3Output1"));
path.put("Step4Input2", path.get("Step3Output2"));
path.put("Step4Output", path.get("Step1Input") + "/step4");
Step1.run(path);
Step2.run(path);
Step3.run1(path);
Step3.run2(path);
Step4.run(path);
System.exit(0);
}
public static JobConf config() {
JobConf conf = new JobConf(Recommend.class);
conf.setJobName("Recommend");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
return conf;
}
} 2). Step1.java,按用戶分組,計算所有物品出現的組合列表,得到用戶對物品的評分矩陣
源代碼:
package org.conan.myhadoop.recommend;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Step1 {
public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> {
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
@Override
public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(value.toString());
int userID = Integer.parseInt(tokens[0]);
String itemID = tokens[1];
String pref = tokens[2];
k.set(userID);
v.set(itemID + ":" + pref);
output.collect(k, v);
}
}
public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
private final static Text v = new Text();
@Override
public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
StringBuilder sb = new StringBuilder();
while (values.hasNext()) {
sb.append("," + values.next());
}
v.set(sb.toString().replaceFirst(",", ""));
output.collect(key, v);
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step1Input");
String output = path.get("Step1Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(path.get("data"), input);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Step1_ToItemPreMapper.class);
conf.setCombinerClass(Step1_ToUserVectorReducer.class);
conf.setReducerClass(Step1_ToUserVectorReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
} 計算結果:
~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000
1 102:3.0,103:2.5,101:5.0
2 101:2.0,102:2.5,103:5.0,104:2.0
3 107:5.0,101:2.0,104:4.0,105:4.5
4 101:5.0,103:3.0,104:4.5,106:4.0
5 101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0 3). Step2.java,對物品組合列表進行計數,建立物品的同現矩陣
源代碼:
package org.conan.myhadoop.recommend;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Step2 {
public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static Text k = new Text();
private final static IntWritable v = new IntWritable(1);
@Override
public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
for (int i = 1; i < tokens.length; i++) {
String itemID = tokens[i].split(":")[0];
for (int j = 1; j < tokens.length; j++) {
String itemID2 = tokens[j].split(":")[0];
k.set(itemID + ":" + itemID2);
output.collect(k, v);
}
}
}
}
public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step2Input");
String output = path.get("Step2Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
} 計算結果:
~ hadoop fs -cat /user/hdfs/recommend/step2/part-00000
101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 3
102:103 3
102:104 2
102:105 1
102:106 1
103:101 4
103:102 3
103:103 4
103:104 3
103:105 1
103:106 2
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 1
105:103 1
105:104 2
105:105 2
105:106 1
105:107 1
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:104 1
107:105 1
107:107 1 4). Step3.java,合并同現矩陣和評分矩陣
源代碼:
package org.conan.myhadoop.recommend;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Step3 {
public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
@Override
public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
for (int i = 1; i < tokens.length; i++) {
String[] vector = tokens[i].split(":");
int itemID = Integer.parseInt(vector[0]);
String pref = vector[1];
k.set(itemID);
v.set(tokens[0] + ":" + pref);
output.collect(k, v);
}
}
}
public static void run1(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step3Input1");
String output = path.get("Step3Output1");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Step31_UserVectorSplitterMapper.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static Text k = new Text();
private final static IntWritable v = new IntWritable();
@Override
public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
k.set(tokens[0]);
v.set(Integer.parseInt(tokens[1]));
output.collect(k, v);
}
}
public static void run2(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step3Input2");
String output = path.get("Step3Output2");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
} 計算結果:
~ hadoop fs -cat /user/hdfs/recommend/step3_1/part-00000
101 5:4.0
101 1:5.0
101 2:2.0
101 3:2.0
101 4:5.0
102 1:3.0
102 5:3.0
102 2:2.5
103 2:5.0
103 5:2.0
103 1:2.5
103 4:3.0
104 2:2.0
104 5:4.0
104 3:4.0
104 4:4.5
105 3:4.5
105 5:3.5
106 5:4.0
106 4:4.0
107 3:5.0
~ hadoop fs -cat /user/hdfs/recommend/step3_2/part-00000
101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 3
102:103 3
102:104 2
102:105 1
102:106 1
103:101 4
103:102 3
103:103 4
103:104 3
103:105 1
103:106 2
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 1
105:103 1
105:104 2
105:105 2
105:106 1
105:107 1
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:104 1
107:105 1
107:107 1 5). Step4.java,計算推薦結果列表
源代碼:
package org.conan.myhadoop.recommend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Step4 {
public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>();
@Override
public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
String[] v1 = tokens[0].split(":");
String[] v2 = tokens[1].split(":");
if (v1.length > 1) {// cooccurrence
int itemID1 = Integer.parseInt(v1[0]);
int itemID2 = Integer.parseInt(v1[1]);
int num = Integer.parseInt(tokens[1]);
List list = null;
if (!cooccurrenceMatrix.containsKey(itemID1)) {
list = new ArrayList();
} else {
list = cooccurrenceMatrix.get(itemID1);
}
list.add(new Cooccurrence(itemID1, itemID2, num));
cooccurrenceMatrix.put(itemID1, list);
}
if (v2.length > 1) {// userVector
int itemID = Integer.parseInt(tokens[0]);
int userID = Integer.parseInt(v2[0]);
double pref = Double.parseDouble(v2[1]);
k.set(userID);
for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
v.set(co.getItemID2() + "," + pref * co.getNum());
output.collect(k, v);
}
}
}
}
public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
private final static Text v = new Text();
@Override
public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
Map<String, Double> result = new HashMap<String, Double>();
while (values.hasNext()) {
String[] str = values.next().toString().split(",");
if (result.containsKey(str[0])) {
result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
} else {
result.put(str[0], Double.parseDouble(str[1]));
}
}
Iterator iter = result.keySet().iterator();
while (iter.hasNext()) {
String itemID = iter.next();
double score = result.get(itemID);
v.set(itemID + "," + score);
output.collect(key, v);
}
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input1 = path.get("Step4Input1");
String input2 = path.get("Step4Input2");
String output = path.get("Step4Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Step4_PartialMultiplyMapper.class);
conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
}
class Cooccurrence {
private int itemID1;
private int itemID2;
private int num;
public Cooccurrence(int itemID1, int itemID2, int num) {
super();
this.itemID1 = itemID1;
this.itemID2 = itemID2;
this.num = num;
}
public int getItemID1() {
return itemID1;
}
public void setItemID1(int itemID1) {
this.itemID1 = itemID1;
}
public int getItemID2() {
return itemID2;
}
public void setItemID2(int itemID2) {
this.itemID2 = itemID2;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
} 計算結果:
~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000
1 107,5.0
1 106,18.0
1 105,15.5
1 104,33.5
1 103,39.0
1 102,31.5
1 101,44.0
2 107,4.0
2 106,20.5
2 105,15.5
2 104,36.0
2 103,41.5
2 102,32.5
2 101,45.5
3 107,15.5
3 106,16.5
3 105,26.0
3 104,38.0
3 103,24.5
3 102,18.5
3 101,40.0
4 107,9.5
4 106,33.0
4 105,26.0
4 104,55.0
4 103,53.5
4 102,37.0
4 101,63.0
5 107,11.5
5 106,34.5
5 105,32.0
5 104,59.0
5 103,56.5
5 102,42.5
5 101,68.0 6). HdfsDAO.java,HDFS操作工具類
詳細解釋,請參考文章:Hadoop編程調用HDFS
源代碼:
package org.conan.myhadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
public class HdfsDAO {
private static final String HDFS = "hdfs://192.168.1.210:9000/";
public HdfsDAO(Configuration conf) {
this(HDFS, conf);
}
public HdfsDAO(String hdfs, Configuration conf) {
this.hdfsPath = hdfs;
this.conf = conf;
}
private String hdfsPath;
private Configuration conf;
public static void main(String[] args) throws IOException {
JobConf conf = config();
HdfsDAO hdfs = new HdfsDAO(conf);
hdfs.copyFile("datafile/item.csv", "/tmp/new");
hdfs.ls("/tmp/new");
}
public static JobConf config(){
JobConf conf = new JobConf(HdfsDAO.class);
conf.setJobName("HdfsDAO");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
return conf;
}
public void mkdirs(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
public void rmr(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.deleteOnExit(path);
System.out.println("Delete: " + folder);
fs.close();
}
public void ls(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FileStatus[] list = fs.listStatus(path);
System.out.println("ls: " + folder);
System.out.println("==========================================================");
for (FileStatus f : list) {
System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
}
System.out.println("==========================================================");
fs.close();
}
public void createFile(String file, String content) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
byte[] buff = content.getBytes();
FSDataOutputStream os = null;
try {
os = fs.create(new Path(file));
os.write(buff, 0, buff.length);
System.out.println("Create: " + file);
} finally {
if (os != null)
os.close();
}
fs.close();
}
public void copyFile(String local, String remote) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyFromLocalFile(new Path(local), new Path(remote));
System.out.println("copy from: " + local + " to " + remote);
fs.close();
}
public void download(String remote, String local) throws IOException {
Path path = new Path(remote);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyToLocalFile(path, new Path(local));
System.out.println("download: from" + remote + " to " + local);
fs.close();
}
public void cat(String remoteFile) throws IOException {
Path path = new Path(remoteFile);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FSDataInputStream fsdis = null;
System.out.println("cat: " + remoteFile);
try {
fsdis =fs.open(path);
IOUtils.copyBytes(fsdis, System.out, 4096, false);
} finally {
IOUtils.closeStream(fsdis);
fs.close();
}
}
} 這樣我們就自己編程實現了MapReduce化基于物品的協同過濾算法。
RHadoop的實現方案,請參考文章:RHadoop實踐系列之三 R實現MapReduce的協同過濾算法
Mahout的實現方案,請參考文章:Mahout分步式程序開發 基于物品的協同過濾ItemCF
我已經把整個MapReduce的實現都放到了github上面:
https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend






