Nutch的HDFS文件輸出

jopen 10年前發布 | 24K 次閱讀 搜索引擎 nutch

以1.7為例,之前Nutch的輸出可以自定義其它存儲系統中,具體原理不贅述。

項目有個需求,就是文件仍然保存在HDFS中,而不是索引到其它存儲系統中。

也就是說,不用寫

public class XXX implements IndexWriter

這樣的插件了,

那么,問題來了,怎么修改Nutch的源碼,使得結果順利存在HDFS中呢?

----------那就讓我們從源頭一步一步來修改,碰到問題就解決問題。

首先Crawl.java中,原先在索引階段有這么一些代碼。

if (i > 0) {
      linkDbTool.invert(linkDb, segments, true, true, false); // invert links

      if (solrUrl != null) {
        // index, dedup & merge
        FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));

        IndexingJob indexer = new IndexingJob(getConf());
        indexer.index(crawlDb, linkDb, 
                Arrays.asList(HadoopFSUtil.getPaths(fstats)));

        SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();
        dedup.setConf(getConf());
        dedup.dedup(solrUrl);
      }

最關鍵的是

IndexingJob indexer = new IndexingJob(getConf());
        indexer.index(crawlDb, linkDb, 
                Arrays.asList(HadoopFSUtil.getPaths(fstats)));

也就是說,這里是索引的入口處。

這里把這些代碼屏蔽掉,我個人的方法是 if (solrUrl != null) {------》if (false) {

這樣還能保持原先的代碼存在,這樣如果后面的代碼有問題還可以恢復此代碼。

---------------接下來呢?添加我們自己的索引任務代碼如下:

if (true) {
  // add my index job
  // index, dedup & merge
  FileStatus[] fstats = fs.listStatus(segments,
  HadoopFSUtil.getPassDirectoriesFilter(fs));
  IndexingJob indexer = new IndexingJob(getConf());
  indexer.index(crawlDb, linkDb,Arrays.asList(HadoopFSUtil.getPaths(fstats)), true,false, null);
}

這樣,就完成了索引任務外圍的改造,這里只是改了個外觀,還沒傷筋動骨。

下面我們開始對內部進行改造!

-------------------------------------------------------------------------------

首先,我們得找到MR的方法吧,入口在哪呢?

IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);

這句話,跟進去,就能看到具體的MR類,如下:

 job.setMapperClass(IndexerMapReduce.class);
 job.setReducerClass(IndexerMapReduce.class);

也就是說,MR類都是IndexerMapReduce.class.

那么我們就開始分析這個類的map和reduce函數。

備注: 我的URL文件的格式是 url   \t   sender=xxx   \t   receiver=xxx   \t   oldname=xxx     \t   newname=xxx   \n

---------------

改動的幾個地方如下:

1 對于reduce的函數聲明

  public void reduce(Text key, Iterator<NutchWritable> values,
                     OutputCollector<Text, NutchIndexAction> output, Reporter reporter)

修改為

  public void reduce(Text key, Iterator<NutchWritable> values,
                     OutputCollector<Text, Text> output, Reporter reporter)

這會導致出現3個錯誤,把這3個地方屏蔽掉即可。

2 看reduce的最后兩行

NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
output.collect(key, action);

這里需要做一個改動如下:

// NutchIndexAction action = new NutchIndexAction(doc,
        // NutchIndexAction.ADD);
        // output.collect(key, action);
        Object senderObject = doc.getFieldValue("sender");
        Object receiverObject = doc.getFieldValue("receiver");
        Object singerObject = doc.getFieldValue("singer");
        if (null != senderObject && null != receiverObject
                && null != singerObject) {
            String sender = senderObject.toString();
            String receiver = receiverObject.toString();
            String singer = singerObject.toString();
            // output it
            output.collect(new Text(sender), new Text(singer));
            output.collect(new Text(receiver), new Text(singer));
        }

 如果此時進行ant編譯,自然會報錯,如下:

    [javac] /usr/local/music_Name_to_Singer/nutch-1.7/src/java/org/apache/nutch/indexer/IndexerMapReduce.java:53: error: IndexerMapReduce is not abstract and does not override abstract method reduce(Text,Iterator<NutchWritable>,OutputCollector<Text,NutchIndexAction>,Reporter) in Reducer
    [javac] public class IndexerMapReduce extends Configured implements
    [javac]        ^
    [javac] 1 error
    [javac] 1 warning

那是因為我們需要修改一個地方:

IndexerMapReduce.java中的

原先的代碼:

job.setOutputFormat(IndexerOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class);
job.setOutputValueClass(NutchWritable.class);

現在要修改為:

 

job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class);
job.setOutputValueClass(Text.class);


以及

public class IndexerMapReduce extends Configured implements
  Mapper<Text, Writable, Text, NutchWritable>,
  Reducer<Text, NutchWritable, Text, NutchIndexAction> {

修改為

public class IndexerMapReduce extends Configured implements
  Mapper<Text, Writable, Text, NutchWritable>,
  Reducer<Text, NutchWritable, Text, Text> {

然后ant

就可以看到

BUILD SUCCESSFUL
Total time: 15 seconds

表明編譯成功!

別急著運行,還有一個地方需要修改!

---------------------- 在InexingJob中有如下一些代碼:

 final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
                + new Random().nextInt());

        FileOutputFormat.setOutputPath(job, tmp);
        try {
            JobClient.runJob(job);
            // do the commits once and for all the reducers in one go
            if (!noCommit) {
                writers.open(job,"commit");
                writers.commit();
            }
            long end = System.currentTimeMillis();
            LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
                    + TimingUtil.elapsedTime(start, end));
        } finally {
            FileSystem.get(job).delete(tmp, true);
        }

表明,Nutch1.7默認是把輸出導向到其它輸出的,而不是本地HDFS.

所以FileSystem.get(job).delete(tmp, true);是用來刪除此文件的,此時我們需要修改這個地方來保留文件。

不然咱辛辛苦苦寫的文件,全被一句話任性的刪掉了。

------------------------代碼如下:

注意:我這里的需求是輸出為當天的目錄。所以代碼為:

//final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
               // + new Random().nextInt());
        Calendar cal = Calendar.getInstance();
        int year = cal.get(Calendar.YEAR);
        int month = cal.get(Calendar.MONTH) + 1;
        int day = cal.get(Calendar.DAY_OF_MONTH);
        final Path tmp = new Path(getConf().get("pathPrefix"),"year="+year+"/month="+month+"/day="+day);

        FileOutputFormat.setOutputPath(job, tmp);
        try {
            JobClient.runJob(job);
            // do the commits once and for all the reducers in one go
            if (!noCommit) {
                writers.open(job,"commit");
                writers.commit();
            }
            long end = System.currentTimeMillis();
            LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
                    + TimingUtil.elapsedTime(start, end));
        } finally {
            //FileSystem.get(job).delete(tmp, true);
        }

此時編譯是可以通過的。

好,暫時就是這樣,效果圖:

184940_gqwq_1382024.jpg

 

來自:http://my.oschina.net/qiangzigege/blog/355014

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!