pig學習筆記
Pig是一種探索大規模數據集的腳本語言。
pig是在HDFS和MapReduce之上的數據流處理語言,它將數據流處理翻譯成多個map和reduce函數,提供更高層次的抽象將程序員從具體的編
程中解放出來。
Pig包括兩部分:用于描述數據流的語言,稱為Pig Latin;和用于運行Pig Latin程序的執行環境。
Pig Latin程序有一系列的operation和transformation組成。每個操作或變換對輸入進行數據處理,然后產生輸出結果。這些操作整體上
描述了一個數據流。Pig內部,這些變換操作被轉換成一系列的MapReduce作業。
Pig不適合所有的數據處理任務,和MapReduce一樣,它是為數據批處理而設計的。如果只想查詢大數據集中的一小部分數據,pig的實現
不會很好,因為它要掃描整個數據集或絕大部分。
1. Pig的運行
Pig是作為客戶端運行的程序,你需要將其連接到本地Hadoop或者集群上。當安裝Pig之后,有三種執行pig程序的方法:pig腳本
(將程序寫入.pig文件中),Grunt(運行Pig命令的交互式shell環境)和嵌入式方式。
records = Load ‘sample.txt’ as (year:chararray, temperature:int, quality:int);
filter_records = FILTER records BY temperature != 9999 AND quality == 0;
group_records = GROUP filter_records BY year;
max_temp = FOREACH group_records GENERATE group, MAX(filter_records.temperature);
DUMP max_temp;
生成上面程序的創建的數據集結構: grunt> ILLUSTRATE max_temp;
Pig和數據庫的比較:
1)Pig是數據流編程語言,而SQL是一種描述型編程語言。Pig是相對于輸入的一步步操作,其中每一步都是對數據的一個簡單的變換;
而SQL語句是一個約束的集合,這些約束結合在一起定義了輸出。Pig更像RDBMS中的查詢規劃器。
2)RDBMS把數據存儲在嚴格定義了模式的表內,但pig對數據的要求更寬松,可以在運行時定義模式,而且是可選的。
3)pig對復雜、嵌套數據結構的支持更強;
4)Pig不支持事務和索引,也不支持隨機讀和幾十毫秒級別的查詢,它是針對數據批量處理的。
5)Hive是介于Pig和RDBMS之間的系統。Hive以HDFS為存儲,但是查詢語言是基于SQL的,而且Hive要求所有數據必須存儲在表中,
表必須有模式,而模式由Hive管理。但Hive允許為預先存在HDFS中的數據關聯一個模式,因此數據加載步驟是可選的。
2 .Pig Latin
程序有一系列語句構成。操作和命令是大小寫無關的,而別名和函數名是大小寫敏感的。
Pig處理多行語句時,在整個程序邏輯計劃沒有構造完畢前,pig并不處理數據。
Pig Latin關系操作
類型 操作 描述
加載與存儲
LOAD 將數據從外部文件或其它存儲中加載數據,存入關系
STORE 將一個關系存放到文件系統或其它存儲中
DUMP 將關系打印到控制臺
過濾
FILTER 從關系中刪除不需要的行
DISTINCT 從關系中刪除重復的行
FOREACH…GENERATE 對于集合的每個元素,生成或刪除字段
STREAM 使用外部程序對關系進行變換
SAMPLE 從關系中隨機取樣
分組與連接
JOIN 連接兩個或多個關系
COGROUP 在兩個或多個關系中分組
GROUP 在一個關系中對數據分組
CROSS 獲取兩個或更多關系的乘積(叉乘)
排序
ORDER 根據一個或多個字段對某個關系進行排序
LIMIT 限制關系的元組個數
合并與分割
UNION 合并兩個或多個關系
SPLIT 把某個關系切分成兩個或多個關系
Pig Latin的診斷操作
操作 描述
DESCRIBE 打印關系的模式
EXPLAIN 打印邏輯和物理計劃
ILLUSTRATE 使用生成的輸入子集顯示邏輯計劃的試運行結果
Pig Latin UDF語句
REGISTER 在Pig運行時環境中注冊一個JAR文件
DEFINE 為UDF、流式腳本或命令規范新建別名
Pig Latin命令類型
kill 中止某個MapReduce任務
exec 在一個新的Grunt shell程序中以批處理模式運行一個腳本
run 在當前Grunt外殼程序中運行程序
quit 退出解釋器
set 設置Pig選項
Pig Latin表達式
類型 表達式 描述 示例
字段 $n 第n個字段 $0
字段 f 字段名f year
投影 c.$n, c.f 在關系、包或元組中的字段 records.$0, records.year
Map查找 m#k 在映射m中鍵k對應的值 items’Coat’
類型轉換 (t)f 將字段t轉換成f類型 (int)year
函數型平面化 fn(f1, f2, …) 在字段上應用函數 fn isGood(quality)
FLATTEN(f) 從包和元組中去除嵌套 flatten(group)
其它的表達式,如算術、條件、比較和布爾型類似其它語言,不詳述.
Pig Latin類型
數據類型包括int (32位有符號整數), long(64位有符號整數), float(32位浮點數), double(64位浮點數),
chararray(UTF16格式的字符數組), Bytearray(字節數組), tuple(元組), bag(包), map(鍵值對).
tuple: (1, ‘hello’) //任何類型的字段序列
bag: {(1, ‘hello’), (2)} //元組的無序多重集合(允許重復元組)
map: [‘a’ ‘hello’] //一組鍵值對,鍵必須是字符數組
關系和包在概念上是相同的,但是有細微差別。關系是頂層構造結構,只能從上表中的關系操作中創建關系,包必須在某個關系中。
舉例:
A = {(1, 2), (3, 4)} //錯,使用load語句從文件中加載數據
B = A.$0 //錯, B = foreach A generate $0;
模式(Schema)
Pig的一個關系可以有一個關聯的模式,模式為關系的字段指定名稱和類型。Pig的這種模式聲明方式和SQL數據庫要求數據加載前必須
先聲明模式截然不同,Pig設計的目的是用于分析不包含數據類型信息的純文本輸入文件的。但是盡量定義模式,會讓程序運行地更高效。
缺點:在查詢中聲明模式的方式是靈活的,但不利于模式重用。每個查詢中維護重復出現的模式會很困難。處理這一問題的辦法是寫
自己的加載函數來封裝模式。
SQL數據庫在加載數據時,會強制檢查表模式中的約束。在pig中,如果一個值無法被強制轉換為模式中申明的類型,pig會用空值null代替,
顯示一個空位。大數據集普遍都有被損壞的值、無效值或意料之外的值,簡單的方法是過濾掉無效值:
grunt>good_records = filter records by temperature is not null;
另一種技巧是使用SPLIT操作把數據劃分成好和壞兩個關系,然后在分別進行分析:
grunt> split records into good_records if temperature is not null,
bad_records if temperature is null;
grunt> dump good_records;
在Pig中,不用為數據流中的每個新產生的關系聲明模式。大多數情況下,Pig能夠根據關系操作的輸入關系的模式來確定輸出結果的模式。
有些操作不改變模式,如Limit。而Union會自動生成新的模式。
如果要重新定義一個關系的模式,可以使用帶as子句的FOREACH…GENERATE操作來定義輸入關系的一部分或全部字段的模式。
函數
Pig的函數分為計算函數,過濾函數,加載函數和存儲函數。
計算函數: AVG, COUNT, CONCAT, COUNTSTAR, DIFF, MAX, MIN, SIZE, SUM, TOKENIZE
過濾函數: IsEmpty
加載/存儲函數:PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump
3 用戶自定義函數(UDF)
public abstract class EvalFunc<T> {
public abstract T exec(Tuple input) throws IOException;
public List<FuncSpec> getAvgToFuncMapping() throws FrontendException;
public FuncSpec outputSchema() throws FrontendException;
}
輸入元組的字段包含傳遞給函數的表達式,輸出是泛型;對于過濾函數輸出就是Boolean類型。建議盡量在
getAvgToFuncMapping()/outputSchema()申明輸入和輸出數據的類型,以便Pig進行類型轉換或過濾不匹配類型的錯誤值。
Grunt>REGISTER pig-examples.jar;
DEFINE isGood org.hadoopbook.pig.IsGoodQuality();
加載UDF
public LoadFunc {
public void setLocation(String location, Job job);
public InputFormat getInputFormat();
public void prepareToRead(RecordReader reader, PigSplit split);
public Tuple next() throws IOException;
}
類似Hadoop,Pig的數據加載先于mapper的運行,所以保證數據可以被分割成能被各個mapper獨立處理的部分非常重要。從Pig 0.7開始,
加載和存儲函數接口已經進行了大幅修改,以便與Hadoop的InputFormat和OutputFormat類基本一致。
Grunt>Register loadfunc.jar
Define customLoad org.hadoopbook.pig.loadfunc()
records = load ‘input/sample.txt’ using customLoad(‘16-19, 88-92, 93-93’)
as (year:int, temperature:int, quality:int);
4 數據處理操作
加載和存儲數據: store A into ‘out’ using pigStorage(‘:’) ; // 將元組存儲為以分號分隔的純文本值
過濾數據
Foreach … generate // 逐個處理一個關系中的行,來生成一個新的關系包含部分或全部的字段
例子: B = foreach A generate $0, $2+1, ‘Constant’;
分組與連接數據
Join 連接
C = join A by $0, B by $1; // 默認為內連接,將A的第一個字段和B的第二個字段連接,輸出匹配的字段
// 連接后新關系的字段為輸入關系的字段和
C = join A by $0, B by $1 using “replicated”; // 分段復制鏈接,B表中的數據將會放在內存中
C= join A by $0 left outer, B by $1; // 左外連接,左邊的沒有匹配項也輸出
Cogroup 多關系分組
類似于Join,但默認是外連接,連接鍵為第一個字段,第二個字段為匹配的第一個關系中的所有元組的包,第三個字段為第二個表中匹配的
所有元組的包。
D = COGROUP A by $0, B by $1; // 新的關系的元組個數為連接鍵的并集(去除重復);
D= COGROUP A by $0 inner, B by $1 inner; // 新關系的元組個數是連接鍵取交集的個數(只輸出匹配的)。每個元組中的第二個和
第三個字段都是一個包含一個元組的包
COGROUP,inner和flatten組合使用相當于實現了內連接:
G = COGROUP A by $0 innner, B by $1 inner;
H = foreach G generate flatten($1), flatten($2)
// H和join A by $0, B by $1相同
cross叉乘
I = cross A, B; // 所有可能m*n行
Group 分組
B = group A by $0; // 第一個字段為group字段,第二個字段為一個包,包含元組的其它字段
B = group A by size($1); // 長度為第一個字段,第二個字段為一個包,包含所有長度為第一個字段的元組
C = group A all; // 只有一行,第一個字段為all,第二個字段為A中所有元組的包
D = group A any; // 對關系中的元組隨機分組,對取樣非常有用
排序數據
Pig按什么順序來處理關系的行是不確定的,只能在輸出前排序。
B = order A by $0, $1 DESC;
C = Limit B 2;
組合和切分數據
Union可以將幾個關系合在一起,即所有元組的集合,當關系的模式不匹配時,新關系就沒有模式。
C = union A, B;
Split 可以將一個關系的元組按某種條件分成幾個子集。
Split A into B if $0 is null, C if $0 is not null;
5 pig實用技巧
并行處理: 可以在很多語句中指定reducer的數量
group, join, cogroup, cross, distinct, order
(復習:reduce的任務個數設置為稍小于集群中的reduce任務槽數)
參數替換:在pig語句中使用$加變量名的方式使用外部定義的變量值,在運行時可以通過"-param input=”設置變量的值,
或者通過"-param_file ”來指定參數文件。
動態參數:很多Unix shell用反引號引用的命令來替換實際值,如`date “+%Y-%m-%d” `會按規定格式輸出日期。
這個可以放在-param或參數文件中來動態得到一個值。
=====================================================================
各種SQL在PIG中實現
我這里以Mysql 5.1.x為例,Pig的版本是0.8
同時我將數據放在了兩個文件,存放在/tmp/data_file_1和/tmp/data_file_2中.文件內容如下:
tmp_file_1:
Txt代碼
zhangsan 23 1
lisi 24 1
wangmazi 30 1
meinv 18 0
dama 55 0
tmp_file_2:
Txt代碼
1 a
23 bb
50 ccc
30 dddd
66 eeeee
1.從文件導入數據
1)Mysql (Mysql需要先創建表).
CREATE TABLE TMP_TABLE(USER VARCHAR(32),AGE INT,IS_MALE BOOLEAN);
CREATE TABLE TMP_TABLE_2(AGE INT,OPTIONS VARCHAR(50)); -- 用于Join
LOAD DATA LOCAL INFILE '/tmp/data_file_1' INTO TABLE TMP_TABLE ;
LOAD DATA LOCAL INFILE '/tmp/data_file_2' INTO TABLE TMP_TABLE_2;
2)Pig
tmp_table = LOAD '/tmp/data_file_1' USING PigStorage('\t') AS (user:chararray, age:int,is_male:int);
tmp_table_2= LOAD '/tmp/data_file_2' USING PigStorage('\t') AS (age:int,options:chararray);
2.查詢整張表
1)Mysql
SELECT * FROM TMP_TABLE;
2)Pig
DUMP tmp_table;
3. 查詢前50行
1)Mysql
SELECT * FROM TMP_TABLE LIMIT 50;
2)Pig
tmp_table_limit = LIMIT tmp_table 50;
DUMP tmp_table_limit;
4.查詢某些列
1)Mysql
SELECT USER FROM TMP_TABLE;
2)Pig
tmp_table_user = FOREACH tmp_table GENERATE user;
DUMP tmp_table_user;
5. 給列取別名
1)Mysql
SELECT USER AS USER_NAME,AGE AS USER_AGE FROM TMP_TABLE;
2)Pig
tmp_table_column_alias = FOREACH tmp_table GENERATE user AS user_name,age AS user_age;
DUMP tmp_table_column_alias;
6.排序
1)Mysql
SELECT * FROM TMP_TABLE ORDER BY AGE;
2)Pig
tmp_table_order = ORDER tmp_table BY age ASC;
DUMP tmp_table_order;
7.條件查詢
1)Mysql
SELECT * FROM TMP_TABLE WHERE AGE>20;
2) Pig
tmp_table_where = FILTER tmp_table by age > 20;
DUMP tmp_table_where;
8.內連接Inner Join
1)Mysql
SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_inner_join = JOIN tmp_table BY age,tmp_table_2 BY age;
DUMP tmp_table_inner_join;
9.左連接Left Join
1)Mysql
SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_left_join = JOIN tmp_table BY age LEFT OUTER,tmp_table_2 BY age;
DUMP tmp_table_left_join;
10.右連接Right Join
1)Mysql
SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_right_join = JOIN tmp_table BY age RIGHT OUTER,tmp_table_2 BY age;
DUMP tmp_table_right_join;
11.全連接Full Join
1)Mysql
SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE
UNION SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE
UNION SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_full_join = JOIN tmp_table BY age FULL OUTER,tmp_table_2 BY age;
DUMP tmp_table_full_join;
2.同時對多張表交叉查詢
1)Mysql
SELECT * FROM TMP_TABLE,TMP_TABLE_2;
2)Pig
tmp_table_cross = CROSS tmp_table,tmp_table_2;
DUMP tmp_table_cross;
3.分組GROUP BY
1)Mysql
SELECT * FROM TMP_TABLE GROUP BY IS_MALE;
2)Pig
tmp_table_group = GROUP tmp_table BY is_male;
DUMP tmp_table_group;
14.分組并統計
1)Mysql
SELECT IS_MALE,COUNT(*) FROM TMP_TABLE GROUP BY IS_MALE;
2)Pig
tmp_table_group_count = GROUP tmp_table BY is_male;
tmp_table_group_count = FOREACH tmp_table_group_count GENERATE group,COUNT($1);
DUMP tmp_table_group_count;
15.查詢去重DISTINCT
1)MYSQL
SELECT DISTINCT IS_MALE FROM TMP_TABLE;
2)Pig
tmp_table_distinct = FOREACH tmp_table GENERATE is_male;
tmp_table_distinct = DISTINCT tmp_table_distinct;
DUMP tmp_table_distinct;