mapreduce實現"瀏覽該商品的人大多數還瀏覽了"經典應用

jopen 11年前發布 | 15K 次閱讀 MapReduce

輸入:

日期    ...cookie id.        ...商品id..

xx            xx                        xx

輸出:

商品id         商品id列表(按優先級排序,用逗號分隔)

xx                   xx

比如:

id1              id3,id0,id4,id2

id2             id0,id5

整個計算過程分為4步

1、提取原始日志日期,cookie id,商品id信息,按天計算,最后輸出數據格式

商品id-0 商品id-1

xx           x x         

這一步做了次優化,商品id-0一定比商品id-1小,為了減少存儲,在最后匯總數據轉置下即可

reduce做局部排序及排重

 

2、基于上次的結果做匯總,按天計算

商品id-0 商品id-1  關聯值(關聯值即同時訪問這兩個商品的用戶數)

xx             x x                xx

 

3、匯總最近三個月數據,同時考慮時間衰減,時間越久關聯值的貢獻越低,最后輸出兩兩商品的關聯值(包括轉置后)

 

4、行列轉換,生成最后要的推薦結果數據,按關聯值排序生成

 

第一個MR

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 * 輸入:原始數據,會有重復
 *日期 cookie 樓盤id
 * 
 * 輸出:
 * 日期 樓盤id1 樓盤id2  //樓盤id1一定小于樓盤id2 ,按日期 cookie進行分組
 * 
 */

public class HouseMergeAndSplit {

    public static class Partitioner1 extends Partitioner<TextPair, Text> {
          @Override
          public int getPartition(TextPair key, Text value, int numParititon) {
                      return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;

          }
    }
          public static class Comp1 extends WritableComparator {
              public Comp1() {
               super(TextPair.class, true);
              }
              @SuppressWarnings("unchecked")
              public int compare(WritableComparable a, WritableComparable b) {
               TextPair t1 = (TextPair) a;
               TextPair t2 = (TextPair) b;
               int comp= t1.getFirst().compareTo(t2.getFirst());
               if (comp!=0)
                   return comp;
               return t1.getSecond().compareTo(t2.getSecond());
              }
            }
      public static class TokenizerMapper 
           extends Mapper<LongWritable, Text, TextPair, Text>{
                  Text val=new Text("test");
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
                       &nbsp; String s[]=value.toString().split("\001");            
             TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]); //thedate cookie city+houseid
             context.write(tp, val);
        }
      }

      public static class IntSumReducer 
           extends Reducer<TextPair,Text,Text,Text> {
          private static String comparedColumn[] = new String[3];
          ArrayList<String> houselist= new ArrayList<String>();
          private static Text keyv = new Text();

          private static Text valuev = new Text();
          static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());

        public void reduce(TextPair key, Iterable<Text> values, 
                           Context context
                           ) throws IOException, InterruptedException {

            houselist.clear();
            String thedate=key.getFirst().toString();
            String cookie=key.getSecond().toString();  

            for (int i=0;i<3;i++)
                comparedColumn[i]="";

            //first+second為分組鍵,每次不同重新調用reduce函數
            for (Text val:values)
            {

                if (thedate.equals(comparedColumn[0]) && cookie.equals(comparedColumn[1])&&  !key.getThree().toString().equals(comparedColumn[2]))
                 {
                    // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" first"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
                     houselist.add(key.getThree().toString());

                     comparedColumn[0]=key.getFirst().toString();
                       comparedColumn[1]=key.getSecond().toString();
                       comparedColumn[2]=key.getThree().toString();

                 }

                   if (!thedate.equals(comparedColumn[0])||!cookie.equals(comparedColumn[1]))

                       {

                     //  context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" second"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
                       houselist.add(key.getThree().toString());
                       comparedColumn[0]=key.getFirst().toString();
                       comparedColumn[1]=key.getSecond().toString();
                       comparedColumn[2]=key.getThree().toString();

                       }



            }



            keyv.set(comparedColumn[0]); //日期
            //valuev.set(houselist.toString());
            //logger.info(houselist.toString());
            //context.write(keyv,valuev);


            for (int i=0;i<houselist.size()-1;i++)
            {
                for (int j=i+1;j<houselist.size();j++)
                {    valuev.set(houselist.get(i)+"  "+houselist.get(j)); //關聯的樓盤
                    context.write(keyv,valuev);
                }
            } 

        }
      }

      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }

        FileSystem fstm = FileSystem.get(conf);   
        Path outDir = new Path(otherArgs[1]);   
        fstm.delete(outDir, true);

   conf.set("mapred.textoutputformat.separator", "\t"); //reduce輸出時key value中間的分隔符
        Job job = new Job(conf, "HouseMergeAndSplit");
        job.setNumReduceTasks(4);
        job.setJarByClass(HouseMergeAndSplit.class);
        job.setMapperClass(TokenizerMapper.class);

        job.setMapOutputKeyClass(TextPair.class);
        job.setMapOutputValueClass(Text.class);
        // 設置partition
        job.setPartitionerClass(Partitioner1.class);
        // 在分區之后按照指定的條件分組
        job.setGroupingComparatorClass(Comp1.class);
        // 設置reduce
        // 設置reduce的輸出
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setNumReduceTasks(18);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}
TextPair
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;
    private Text three;
    public TextPair() {
      set(new Text(), new Text(),new Text());
    }
    public TextPair(String first, String second,String three) {
      set(new Text(first), new Text(second),new Text(three));
    }
    public TextPair(Text first, Text second,Text Three) {
      set(first, second,three);
    }
    public void set(Text first, Text second,Text three) {
      this.first = first;
      this.second = second;
      this.three=three;
    }
    public Text getFirst() {
      return first;
    }
    public Text getSecond() {
      return second;
    }
    public Text getThree() {
          return three;
        }
    public void write(DataOutput out) throws IOException {
      first.write(out);
      second.write(out);
      three.write(out);
    }
    public void readFields(DataInput in) throws IOException {
      first.readFields(in);
      second.readFields(in);
      three.readFields(in);
    }
    public int compareTo(TextPair tp) {
      int cmp = first.compareTo(tp.first);
      if (cmp != 0) {
       return cmp;
      }
      cmp= second.compareTo(tp.second);
      if (cmp != 0) {
           return cmp;
          }
      return three.compareTo(tp.three);
    }
    }
TextPairSecond
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPairSecond implements WritableComparable<TextPairSecond> {
    private Text first;
    private FloatWritable second;
    public TextPairSecond() {
      set(new Text(), new FloatWritable());
    }
    public TextPairSecond(String first, float second) {
      set(new Text(first), new FloatWritable(second));
    }
    public TextPairSecond(Text first, FloatWritable second) {
      set(first, second);
    }
    public void set(Text first, FloatWritable second) {
      this.first = first;
      this.second = second;
    }
    public Text getFirst() {
      return first;
    }
    public FloatWritable getSecond() {
      return second;
    }
    public void write(DataOutput out) throws IOException {
      first.write(out);
      second.write(out);
    }
    public void readFields(DataInput in) throws IOException {
      first.readFields(in);
      second.readFields(in);
    }
    public int compareTo(TextPairSecond tp) {
      int cmp = first.compareTo(tp.first);
      if (cmp != 0) {
       return cmp;
      }
      return second.compareTo(tp.second);
    }

    }

第二個MR

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 *  統計樓盤之間共同出現的次數
 * 輸入:
 * 日期 樓盤1 樓盤2
 * 
 * 輸出:
 * 日期 樓盤1 樓盤2 共同出現的次數
 * 
 */

public class HouseCount {


      public static class TokenizerMapper 
           extends Mapper<LongWritable, Text, Text, IntWritable>{


    IntWritable iw=new IntWritable(1);
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {


         context.write(value, iw);
        }
      }

      public static class IntSumReducer 
           extends Reducer<Text,IntWritable,Text,IntWritable> {

         IntWritable result=new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {

             int sum=0;
             for (IntWritable iw:values)
             {
                 sum+=iw.get();
             }
             result.set(sum);
         context.write(key, result) ;

        }
      }

      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }

        FileSystem fstm = FileSystem.get(conf);   
        Path outDir = new Path(otherArgs[1]);   
        fstm.delete(outDir, true);

   conf.set("mapred.textoutputformat.separator", "\t"); //reduce輸出時key value中間的分隔符
        Job job = new Job(conf, "HouseCount");
        job.setNumReduceTasks(2);
        job.setJarByClass(HouseCount.class);
        job.setMapperClass(TokenizerMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設置reduce
        // 設置reduce的輸出
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //job.setNumReduceTasks(18);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}
第三個MR
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 * 匯總近三個月統計樓盤之間共同出現的次數,考慮衰減系數, 并最后a b 轉成 b a輸出一次
 * 輸入:
 * 日期  樓盤1 樓盤2 共同出現的次數
 * 
 * 輸出
 * 樓盤1 樓盤2 共同出現的次數(考慮了衰減系數,每天的衰減系數不一樣)
 * 
 */

public class HouseCountHz {


      public static class HouseCountHzMapper 
           extends Mapper<LongWritable, Text, Text, FloatWritable>{

    Text keyv=new Text();

    FloatWritable valuev=new FloatWritable();
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {

        String[] s=value.toString().split("\t");
        keyv.set(s[1]+" "+s[2]);//樓盤1,樓盤2
        Calendar date1=Calendar.getInstance();
          Calendar d2=Calendar.getInstance();

          Date b = null;
          SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
          try {
            b=sdf.parse(s[0]);
          } catch (ParseException e) {
           e.printStackTrace();
          }
          d2.setTime(b);
          long n=date1.getTimeInMillis();
          long birth=d2.getTimeInMillis();
          long sss=n-birth;
          int day=(int)((sss)/(3600*24*1000)); //該條記錄的日期與當前日期的日期差
          float factor=1/(1+(float)(day-1)/10); //衰減系數
        valuev.set(Float.parseFloat(s[3])*factor);

         context.write(keyv, valuev);
        }
      }

      public static class HouseCountHzReducer 
           extends Reducer<Text,FloatWritable,Text,FloatWritable> {

          FloatWritable result=new FloatWritable();
          Text keyreverse=new Text();
        public void reduce(Text key, Iterable<FloatWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {

             float sum=0;
             for (FloatWritable iw:values)
             {
                 sum+=iw.get();
             }
             result.set(sum);
             String[] keys=key.toString().split("\t");
             keyreverse.set(keys[1]+"   "+keys[0]);
         context.write(key, result) ;
         context.write(keyreverse, result)  ;

        }
      }

      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }

        FileSystem fstm = FileSystem.get(conf);   
        Path outDir = new Path(otherArgs[1]);   
        fstm.delete(outDir, true);

   conf.set("mapred.textoutputformat.separator", "\t"); //reduce輸出時key value中間的分隔符
        Job job = new Job(conf, "HouseCountHz");
        job.setNumReduceTasks(2);
        job.setJarByClass(HouseCountHz.class);
        job.setMapperClass(HouseCountHzMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);

        // 設置reduce
        // 設置reduce的輸出
        job.setReducerClass(HouseCountHzReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        //job.setNumReduceTasks(18);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}
第四個MR
import java.io.IOException;
import java.util.Iterator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;



/*
 * 輸入數據:
 * 樓盤1 樓盤2 共同出現的次數
 * 
 * 輸出數據
 *  樓盤1 樓盤2,樓盤3,樓盤4 (按次數排序)
 */

public class HouseRowToCol {

    public static class Partitioner1 extends Partitioner<TextPairSecond, Text> {
          @Override
          //分區
          public int getPartition(TextPairSecond key, Text value, int numParititon) {
                      return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;

          }
    }
    //分組
          public static class Comp1 extends WritableComparator {
              public Comp1() {
               super(TextPairSecond.class, true);
              }
              @SuppressWarnings("unchecked")
              public int compare(WritableComparable a, WritableComparable b) {
                  TextPairSecond t1 = (TextPairSecond) a;
                  TextPairSecond t2 = (TextPairSecond) b;
                return t1.getFirst().compareTo(t2.getFirst());

              }
            }

          //排序
          public static class KeyComp extends WritableComparator {
              public KeyComp() {
               super(TextPairSecond.class, true);
              }
              @SuppressWarnings("unchecked")
              public int compare(WritableComparable a, WritableComparable b) {
                  TextPairSecond t1 = (TextPairSecond) a;
                  TextPairSecond t2 = (TextPairSecond) b;
               int comp= t1.getFirst().compareTo(t2.getFirst());
               if (comp!=0)
                   return comp;
               return -t1.getSecond().compareTo(t2.getSecond());
              }
            } 
      public static class HouseRowToColMapper 
           extends Mapper<LongWritable, Text, TextPairSecond, Text>{

          Text houseid1=new Text();
          Text houseid2=new Text();
          FloatWritable weight=new FloatWritable();
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {

         String s[]=value.toString().split("\t");

           weight.set(Float.parseFloat(s[2]));
           houseid1.set(s[0]);
           houseid2.set(s[1]);
         TextPairSecond tp=new TextPairSecond(houseid1,weight); 
         context.write(tp, houseid2);
        }
      }

      public static class HouseRowToColReducer 
           extends Reducer<TextPairSecond,Text,Text,Text> {

       Text valuev=new Text();
        public void reduce(TextPairSecond key, Iterable<Text> values, 
                           Context context
                           ) throws IOException, InterruptedException {
            Text keyv=key.getFirst();
            Iterator<Text> it=values.iterator();
            StringBuilder sb=new StringBuilder(it.next().toString());
            while(it.hasNext())
            {
                sb.append(","+it.next().toString());
            }
            valuev.set(sb.toString());
            context.write(keyv, valuev);



        }
      }

      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }

        FileSystem fstm = FileSystem.get(conf);   
        Path outDir = new Path(otherArgs[1]);   
        fstm.delete(outDir, true);

   conf.set("mapred.textoutputformat.separator", "\t"); //reduce輸出時key value中間的分隔符
        Job job = new Job(conf, "HouseRowToCol");
        job.setNumReduceTasks(4);
        job.setJarByClass(HouseRowToCol.class);
        job.setMapperClass(HouseRowToColMapper.class);

        job.setMapOutputKeyClass(TextPairSecond.class);
        job.setMapOutputValueClass(Text.class);
        // 設置partition
        job.setPartitionerClass(Partitioner1.class);
        // 在分區之后按照指定的條件分組
        job.setGroupingComparatorClass(Comp1.class);
        job.setSortComparatorClass(KeyComp.class);
        // 設置reduce
        // 設置reduce的輸出
        job.setReducerClass(HouseRowToColReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setNumReduceTasks(18);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}
來自:http://blog.csdn.net/u011750989/article/details/12004065

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!