Pig 常用操作

jopen 10年前發布 | 12K 次閱讀 Pig 分布式/云計算/大數據

我們看看Pig的常用操作。

所有命令和腳本都在Pig 0.12.0 & Hadoop 2.2.0下測試通過。

準備兩個數據文件:

1)student.txt 結構為(班級號,學號,成績),字段間逗號分隔。

C01,N0101,82

C01,N0102,59

C01,N0103,65

C02,N0201,81

C02,N0202,82

C02,N0203,79

C03,N0301,56

C03,N0302,92

C03,N0306,72

2)teacher.txt 結構為(班級號,教師),字段間逗號分隔。

C01,Zhang

C02,Sun

C03,Wang

C04,Dong

 

加載和存儲(Load,Store)

執行以下命令

records = load'hdfs://localhost:9000/input/student.txt' using PigStorage(',') as(classNo:chararray, studNo:chararray, score:int);

dump records;

store records into ' hdfs://localhost:9000/input/student_out' using PigStorage(':');

然后查看hdfs://localhost:9000/input/student_out目錄下的part-m-00000文件,其內容如下:

C01:N0101:82

C01:N0102:59

C01:N0103:65

C02:N0201:81

C02:N0202:82

C02:N0203:79

C03:N0301:56

C03:N0302:92

C03:N0306:72

其中的load是加載操作,store是存儲操作。他們分別可以指定其分隔符,比如上例中的逗號和分號。

篩選(Filter)

執行以下命令:

records_c01 = filter records byclassNo=='C01';

dump records_c01;

結果如下:

(C01,N0101,82)

(C01,N0102,59)

(C01,N0103,65)

注意:判斷是否相等要用兩個等號。

Foreach Generate

Foreach對關系中的每一個記錄循環,然后按指定模式生成一個新的關系。

執行以下命令:

score_c01 = foreach records_c01generate 'Teacher',$1,score;

dump score_c01;

結果如下:

(Teacher,N0101,82)

(Teacher,N0102,59)

(Teacher,N0103,65)

生成的新的關系中包括三個字段,第一個字段是常量,第二個字段是學號(我們是通過索引號引用的),第三個字段是分數(我們通過字段名引用的)。

分組(group)

執行以下命令:

grouped_records = group recordsby classNo parallel 2;

dump grouped_records;

結果如下:

(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)})

(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)})

(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)})

其中的Paraller 2表示啟用2個Reduce操作。

如何統計每個班級及格和優秀的學生人數呢?執行以下兩個命令:

result = foreach grouped_records {

         fail =filter records by score < 60;

         excellent =filter records by score >=90;

         generategroup, COUNT(fail) as fail, COUNT(excellent) as excellent;

};

dump result;

結果如下:

(C01,1,0)

(C02,0,0)

(C03,1,1)

 

題外話:

flatten操作,可以將數據格式扁平化。我們分別通過tuple和bag來看看flatten的作用:

1)  Flatten對tuple的作用

執行以下命令:

a= foreach records generate $0,($1,$2);

dumpa;

輸出結果如下:

(C01,(N0101,82))

(C01,(N0102,59))

(C01,(N0103,65))

(C02,(N0201,81))

(C02,(N0202,82))

(C02,(N0203,79))

(C03,(N0301,56))

(C03,(N0302,92))

(C03,(N0306,72))

然后,執行:

b = foreach a generate $0,flatten($1);

dump b;

結果如下:

(C01,N0101,82)

(C01,N0102,59)

(C01,N0103,65)

(C02,N0201,81)

(C02,N0202,82)

(C02,N0203,79)

(C03,N0301,56)

(C03,N0302,92)

(C03,N0306,72)

由此看見,flatten作用于tuple時,將flatten對應的字段(tuple)中的字段扁平化為關系中的字段。(不知道該如何解釋比較好)

2)  Flatten對bag的作用

執行以下命令

c = foreach records generate $0,{($1),($1,$2)};

dump c;

結果如下:

(C01,{(N0101),(N0101,82)})

(C01,{(N0102),(N0102,59)})

(C01,{(N0103),(N0103,65)})

(C02,{(N0201),(N0201,81)})

(C02,{(N0202),(N0202,82)})

(C02,{(N0203),(N0203,79)})

(C03,{(N0301),(N0301,56)})

(C03,{(N0302),(N0302,92)})

(C03,{(N0306),(N0306,72)})

接下來執行:

d = foreach c generate $0,flatten($1);

dump d;

結果如下:

(C01,N0101)

(C01,N0101,82)

(C01,N0102)

(C01,N0102,59)

(C01,N0103)

(C01,N0103,65)

(C02,N0201)

(C02,N0201,81)

(C02,N0202)

(C02,N0202,82)

(C02,N0203)

(C02,N0203,79)

(C03,N0301)

(C03,N0301,56)

(C03,N0302)

(C03,N0302,92)

(C03,N0306)

(C03,N0306,72)

可以看出,flatten作用于bag時,會消除嵌套關系,生成類似于笛卡爾乘積的結果。(不好表達,讀者可以細細體會)。

Stream操作

可以將Python程序嵌入到Pig中使用。

建立一個Python文件pass.py,內容如下:

#! /usr/bin/envpython

import sys

 

for line insys.stdin:

         (c,n,s) = line.split()  

         if int(s) >= 60:

                   print "%s\t%s\t%s"%(c,n,s)

 

執行以下命令:

define pass `pass.py` SHIP('/home/user/pass.py');

records_pass = stream records through pass as(classNo:chararray, studNo:chararray, score:int);

dump records_pass;

結果如下:

(C01,N0101,82)

(C01,N0103,65)

(C02,N0201,81)

(C02,N0202,82)

(C02,N0203,79)

(C03,N0302,92)

(C03,N0306,72)

可以看出,統計結果為所有及格的記錄(>=60)。

其中,ship用于將python程序提交到Hadoop集群中去。

請注意第一個命令中的`pass.py`不是用單引號括起來的,是用鍵盤1左邊的那個鍵上的字符括起來的。(不知道這個字符怎么稱呼,只知道是一種標注符號)

Join

先執行以下兩條命令:

r_student = load'hdfs://localhost:9000/input/student.txt' using PigStorage(',') as (classNo:chararray, studNo: chararray, score: int);

r_teacher2 = load'hdfs://localhost:9000/input/teacher.txt' using PigStorage(',') as (classNo:chararray, teacher: chararray);

回到本文開頭,我們有兩個數據文件,分別為學生(班級,學號,成績);老師(班級,姓名)。

執行以下命令:

r_joined = join r_student by classNo,r_teacher by classNo;

dump r_joined;

(C01,N0103,65,C01,Zhang)

(C01,N0102,59,C01,Zhang)

(C01,N0101,82,C01,Zhang)

(C02,N0203,79,C02,Sun)

(C02,N0202,82,C02,Sun)

(C02,N0201,81,C02,Sun)

(C03,N0306,72,C03,Wang)

(C03,N0302,92,C03,Wang)

(C03,N0301,56,C03,Wang)

類似于SQL中的內連接Inner Join。當然你也可以使用外連接,比如:

r_joined = join r_student by classNo left outer,r_teacher by classNo;

dump r_joined;

注意:left outer/right outer要寫在第一個關系名的后面。以下語法是錯誤的:

r_joined = join r_student by classNo, r_teacher by classNo leftouter; //錯誤

 

COGROUP

Join的操作結果是平面的(一組元組),而COGROUP的結果是有嵌套結構的。

運行以下命令:

r1 = cogroup r_student by classNo,r_teacher by classNo;

dump r1;

結果如下:

(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})

(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})

(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})

(C04,{},{(C04,Dong)})

由結果可以看出:

1)  cogroup和join操作類似。

2)  生成的關系有3個字段。第一個字段為連接字段;第二個字段是一個包,值為關系1中的滿足匹配關系的所有元組;第三個字段也是一個包,值為關系2中的滿足匹配關系的所有元組。

3)  類似于Join的外連接。比如結果中的第四個記錄,第二個字段值為空包,因為關系1中沒有滿足條件的記錄。實際上第一條語句和以下語句等同:

r1= cogroup r_student by classNo outer,r_teacher by classNo outer;

 

如果你希望關系1或2中沒有匹配記錄時不在結果中出現,則可以分別在關系中使用inner而關鍵字進行排除。

執行以下語句:

r1 = cogroup r_student by classNo inner,r_teacher byclassNo outer;

dump r1;

結果為:

(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})

(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})

(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})

 

如先前我們講到的flatten,執行以下命令:

r2 = foreach r1 generate flatten($1),flatten($2);

dump r2;

結果如下:

(C01,N0103,65,C01,Zhang)

(C01,N0102,59,C01,Zhang)

(C01,N0101,82,C01,Zhang)

(C02,N0203,79,C02,Sun)

(C02,N0202,82,C02,Sun)

(C02,N0201,81,C02,Sun)

(C03,N0306,72,C03,Wang)

(C03,N0302,92,C03,Wang)

(C03,N0301,56,C03,Wang)

 

Cross

執行以下命令:

r = cross r_student,r_teacher;

dump r;

結果如下:

(C03,N0306,72,C04,Dong)

(C03,N0306,72,C03,Wang)

(C03,N0306,72,C02,Sun)

(C03,N0306,72,C01,Zhang)

(C03,N0302,92,C04,Dong)

(C03,N0302,92,C03,Wang)

(C03,N0302,92,C02,Sun)

(C03,N0302,92,C01,Zhang)

(C03,N0301,56,C04,Dong)

(C03,N0301,56,C03,Wang)

(C03,N0301,56,C02,Sun)

(C03,N0301,56,C01,Zhang)

(C02,N0203,79,C04,Dong)

(C02,N0203,79,C03,Wang)

(C02,N0203,79,C02,Sun)

(C02,N0203,79,C01,Zhang)

(C02,N0202,82,C04,Dong)

(C02,N0202,82,C03,Wang)

(C02,N0202,82,C02,Sun)

(C02,N0202,82,C01,Zhang)

(C02,N0201,81,C04,Dong)

(C02,N0201,81,C03,Wang)

(C02,N0201,81,C02,Sun)

(C02,N0201,81,C01,Zhang)

(C01,N0103,65,C04,Dong)

(C01,N0103,65,C03,Wang)

(C01,N0103,65,C02,Sun)

(C01,N0103,65,C01,Zhang)

(C01,N0102,59,C04,Dong)

(C01,N0102,59,C03,Wang)

(C01,N0102,59,C02,Sun)

(C01,N0102,59,C01,Zhang)

(C01,N0101,82,C04,Dong)

(C01,N0101,82,C03,Wang)

(C01,N0101,82,C02,Sun)

(C01,N0101,82,C01,Zhang)

由此可以看出,cross類似于笛卡爾乘積。一般情況下不建議直接使用cross,而應該事前對數據集進行篩選,提高效率。

排序(Order)

執行以下命令:

r = order r_student by score desc, classNo asc;

dump r;

結果如下:

(C03,N0302,92)

(C01,N0101,82)

(C02,N0202,82)

(C02,N0201,81)

(C02,N0203,79)

(C03,N0306,72)

(C01,N0103,65)

(C01,N0102,59)

(C03,N0301,56)

聯合(Union)

執行以下語句:

r_union = union r_student, r_teacher;

dump r_union;

結果如下:

(C01,N0101,82)

(C01,N0102,59)

(C01,N0103,65)

(C02,N0201,81)

(C02,N0202,82)

(C02,N0203,79)

(C03,N0301,56)

(C03,N0302,92)

(C03,N0306,72)

(C01,Zhang)

(C02,Sun)

(C03,Wang)

(C04,Dong)

可以看出:

1)  union是取兩個記錄集的并集。

2)  關系r_union的schema為未知(unknown),這是因為被union的兩個關系的schema是不一樣的。如果兩個關系的schema是一致的,則union后的關系將和被union的關系的schema一致。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!