hadoop 里執行 MapReduce 任務的幾種方式

jopen 12年前發布 | 40K 次閱讀 Hadoop 分布式/云計算/大數據

說明:

測試文件:
echo -e "aa\tbb \tcc
bb\tcc\tdd" > 3.txt

hadoop fs -mkdir /data
hadoop fs -put 3.txt /data

全文的例子均以該文件做測試用例,統計單詞出現的次數(WordCount)。

1、最原始的方式:java 源碼編譯打包成jar包后,由 hadoop 腳本調度執行,類似:

bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/input /tmp/output
java 代碼 100 多行,我就不貼代碼了,具體請見官方范例:

2、MR 腳本開發語言:pig
A1 = load '/data/3.txt';
A = stream A1 through `sed "s/\t/ /g"`;
B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;
C = filter B by word matches '\\w+';
D = group C by word;
E = foreach D generate COUNT(C), group;
dump E;

注意:不同分隔符對load及后面的$0的影響。
詳情請見:
https://gist.github.com/186460
http://www.slideshare.net/erikeldridge/a-brief-handson-introduction-to-hadoop-pig

3、構建數據倉庫的類 SQL 開發語言:hive
create table textlines(text string);
load data inpath '/data/3.txt' overwrite into table textlines;
SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'\t+')) wordTable AS wordColumn GROUP BY wordColumn;

詳情請見:
http://my.oschina.net/leejun2005/blog/83045
http://blog.csdn.net/techdo/article/details/7433222

4、跨平臺的腳本語言:python

map:
#!/usr/bin/python
import os,re,sys
for line in sys.stdin:
    for i in line.strip().split("\t"):
        print i

reduce:
#!/usr/bin/python
import os,re,sys
arr = {}
for words in sys.stdin:
    word = words.strip()
    if word not in arr:
        arr[word] = 1
    else:
        arr[word] += 1
for k, v in arr.items():
    print str(k) + ": " + str(v)

最后在shell下執行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py

注意:腳本開頭需要顯示指定何種解釋器以及賦予腳本執行權限
詳情請見:
http://blog.csdn.net/jiedushi/article/details/7390015

5、Linux 下的瑞士軍刀:shell 腳本
map:
#!/bin/bash
tr '\t' '\n'

reduce:
#!/bin/bash
sort|uniq -c

最后在shell下執行:

june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null
12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local]
12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run:
12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041
12/10/14 21:57:01 INFO streaming.StreamJob:  map 0%  reduce 0%
12/10/14 21:57:13 INFO streaming.StreamJob:  map 67%  reduce 0%
12/10/14 21:57:19 INFO streaming.StreamJob:  map 100%  reduce 0%
12/10/14 21:57:22 INFO streaming.StreamJob:  map 100%  reduce 22%
12/10/14 21:57:31 INFO streaming.StreamJob:  map 100%  reduce 100%
12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041
12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop fs -cat /data/py/part-00000
      1 aa  
      1 bb  
      1 bb  
      2 cc  
      1 dd  
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>

特別提示:上述有些方法對字段后的空格忽略或計算,請注意仔細甄別。

說明:列舉了上述幾種方法主要是給大家一個不同的思路,
在解決問題的過程中,開發效率、執行效率都是我們需要考慮的,不要太局限某一種方法了。
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!