MapReduce業務 - 圖片關聯計算

jopen 9年前發布 | 14K 次閱讀 MapReduce 分布式/云計算/大數據

 

1.概述

最近在和人交流時談到數據相似度和數據共性問題,而剛好在業務層面有類似的需求,今天和大家分享這類問題的解決思路,分享目錄如下所示:

  • 業務背景
  • 編碼實踐
  • 預覽截圖

下面開始今天的內容分享。

2.業務背景

目前有這樣一個背景,在一大堆數據中,里面存放著圖片的相關信息,如下圖所示:

MapReduce業務 - 圖片關聯計算

上圖只是給大家列舉的一個示例數據格式,第一列表示自身圖片,第二、第三......等列表示與第一列相關聯的圖片信息。那么我們從這堆數據中如何找出他們擁有相同圖片信息的圖片。

2.1 實現思路

那么,我們在明確了上述需求后,下面我們來分析它的實現思路。首先,我們通過上圖所要實現的目標結果,其最終計算結果如下所示:

pic_001pic_002 pic_003,pic_004,pic_005
pic_001pic_003 pic_002,pic_005
pic_001pic_004 pic_002,pic_005
pic_001pic_005 pic_002,pic_003,pic_004
......

結果如上所示,找出兩兩圖片之間的共性圖片,結果未列完整,只是列舉了部分,具體結果大家可以參考截圖預覽的相關信息。

下面給大家介紹解決思路,通過觀察數據,我們可以發現在上述數據當中,我們要計算圖片兩兩的共性圖片,可以從關聯圖片入手,在關聯圖片中我們可 以找到共性圖片的關聯信息,比如:我們要計算pic001pic002圖片的共性圖片,我們可以在關聯圖片中找到兩者(pic001pic002組合)后 對應的自身圖片(key),最后在將所有的key求并集即為兩者的共性圖片信息,具體信息如下圖所示:

MapReduce業務 - 圖片關聯計算

通過上圖,我們可以知道具體的實現思路,步驟如下所示:

  • 第一步:拆分數據,關聯數據兩兩組合作為Key輸出。
  • 第二步:將相同Key分組,然后求并集得到計算結果。

這里使用一個MR來完成此項工作,在明白了實現思路后,我們接下來去實現對應的編碼。

3.編碼實踐

  • 拆分數據,兩兩組合。
public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
      throws IOException, InterruptedException {
  StringTokenizer strToken = new StringTokenizer(value.toString());
  Text owner = new Text();
  Set<String> set = new TreeSet<String>();
  owner.set(strToken.nextToken());
  while (strToken.hasMoreTokens()) {
      set.add(strToken.nextToken());
  }
  String[] relations = new String[set.size()];
  relations = set.toArray(relations);
  for (int i = 0; i < relations.length; i++) {
      for (int j = i + 1; j < relations.length; j++) {
    String outPutKey = relations[i] + relations[j];
    context.write(new Text(outPutKey), owner);
      }
  }
    }
}

按Key分組,求并集
public static class PictureReduce extends Reducer<Text, Text, Text, Text> {
  @Override
  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
      throws IOException, InterruptedException {
    String common = "";
    for (Text val : values) {
      if (common == "") {
        common = val.toString();
      } else {
        common = common + "," + val.toString();
      }
    }
    context.write(key, new Text(common));
  }
}

完整示例
package cn.hadoop.hdfs.example;
import java.io.IOException;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hadoop.hdfs.util.HDFSUtils;
import cn.hadoop.hdfs.util.SystemConfig;
/**
 * @Date Aug 31, 2015
 *
 * @Author dengjie
 *
 * @Note Find picture relations
 */
public class PictureRelations extends Configured implements Tool {
  private static Logger log = LoggerFactory.getLogger(PictureRelations.class);
  private static Configuration conf;
  static {
    String tag = SystemConfig.getProperty("dev.tag");
    String[] hosts = SystemConfig.getPropertyArray(tag + ".hdfs.host", ",");
    conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://cluster1");
    conf.set("dfs.nameservices", "cluster1");
    conf.set("dfs.ha.namenodes.cluster1", "nna,nns");
    conf.set("dfs.namenode.rpc-address.cluster1.nna", hosts[0]);
    conf.set("dfs.namenode.rpc-address.cluster1.nns", hosts[1]);
    conf.set("dfs.client.failover.proxy.provider.cluster1",
        "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
  }
  public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
      StringTokenizer strToken = new StringTokenizer(value.toString());
      Text owner = new Text();
      Set<String> set = new TreeSet<String>();
      owner.set(strToken.nextToken());
      while (strToken.hasMoreTokens()) {
        set.add(strToken.nextToken());
      }
      String[] relations = new String[set.size()];
      relations = set.toArray(relations);
      for (int i = 0; i < relations.length; i++) {
        for (int j = i + 1; j < relations.length; j++) {
          String outPutKey = relations[i] + relations[j];
          context.write(new Text(outPutKey), owner);
        }
      }
    }
  }
  public static class PictureReduce extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
      String common = "";
      for (Text val : values) {
        if (common == "") {
          common = val.toString();
        } else {
          common = common + "," + val.toString();
        }
      }
      context.write(key, new Text(common));
    }
  }
  public int run(String[] args) throws Exception {
    final Job job = Job.getInstance(conf);
    job.setJarByClass(PictureMap.class);
    job.setMapperClass(PictureMap.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(PictureReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    int status = job.waitForCompletion(true) ? 0 : 1;
    return status;
  }
  public static void main(String[] args) {
    try {
      if (args.length != 1) {
        log.warn("args length must be 1 and as date param");
        return;
      }
      String tmpIn = SystemConfig.getProperty("hdfs.input.path.v2");
      String tmpOut = SystemConfig.getProperty("hdfs.output.path.v2");
      String inPath = String.format(tmpIn, "t_pic_20150801.log");
      String outPath = String.format(tmpOut, "meta/" + args[0]);
      // bak dfs file to old
      HDFSUtils.bak(tmpOut, outPath, "meta/" + args[0] + "-old", conf);
      args = new String[] { inPath, outPath };
      int res = ToolRunner.run(new Configuration(), new PictureRelations(), args);
      System.exit(res);
    } catch (Exception ex) {
      ex.printStackTrace();
      log.error("Same friend task has error,msg is" + ex.getMessage());
    }
  }
}

4.截圖預覽

關于計算結果,如下圖所示:

MapReduce業務 - 圖片關聯計算

5.總結

本篇博客只是從思路上實現了圖片關聯計算,在數據量大的情況下,是有待優化的,這里就不多做贅述了,后續有時間在為大家分析其中的細節。

6.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!