Pig用戶自定義函數(UDF)
我們以氣溫統計和詞頻統計為例,講解以下三種用戶自定義函數。
用戶自定義函數
什么時候需要用戶自定義函數呢?和其它語言一樣,當你希望簡化程序結構或者需要重用程序代碼時,函數就是你不二選擇。
Pig的用戶自定義函數可以用Java編寫,但是也可以用Python或Javascript編寫。我們接下來以Java為例。
自定義過濾函數
我們仍然以先前的代碼為例:
records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);
valid_records = filter records by temperature!=999;
第二個語句的作用就是篩選合法的數據。如果我們采用用戶自定義函數,則第二個語句可以寫成:
valid_records = filter records by isValid(temperature);
這種寫法更容易理解,也更容易在多個地方重用。接下來的問題就是如何定義這個isValid函數。代碼如下:
packagecom.oserp.pigudf;
importjava.io.IOException;
importorg.apache.pig.FilterFunc;
importorg.apache.pig.data.Tuple;
public class IsValidTemperature extends FilterFunc {
@Override
public Boolean exec(Tuple tuple)throws IOException {
Object object = tuple.get(0);
int temperature = (Integer)object;
return temperature != 999;
}
}
接下來,我們需要:
1) 編譯代碼并打包成jar文件,比如pigudf.jar。
2) 通過register命令將這個jar文件注冊到pig環境:
register/home/user/hadoop_jar/pigudf.jar //參數為jar文件的本地路徑
此時,我們就可以用以下語句調用這個函數:
valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
dump valid_records;
看起來這個函數名太長,不便輸入。我們可以用定義別名的方式代替:
define isValid com.oserp.pigudf.IsValidTemperature();
valid_records = filter records by isValid(temperature);
dump valid_records;
回到代碼,我們可發現:
1) 需要定義一個繼承自FilterFunc的類。
2) 重寫這個類的exec方法。這個方法的參數只有一個tuple,但是調用時可以傳遞多個參數,你可以通過索引號獲得對應的參數值,比如tuple.get(1)表示取第二個參數。
3) 調用時,需要使用類的全名。(當然你可以自定義別名)
4) 更多的驗證需要讀者自行在函數中添加,比如判斷是否為null等等。
備注:用Eclipse編寫Pig自定義函數時,你可能需要引用到一些Hadoop的庫文件。比較容易的方式是在新建項目時指定項目類型為MapReduce項目,這樣Eclipse就會自動設置庫引用的相關信息。
自定義運算函數(Eval function)
仍然以前面的數據文件為例:
1990 21
1990 18
1991 21
1992 30
1992 999
1990 23
假設我們希望通過溫度值獲得一個溫度的分類信息,比如我們把溫度大于劃分為以下類型:
溫度 分類
x>=30 hot
x>=10 and x<30 moderate
x<10 cool
則我們可以定義以下函數,代碼如下:
packagecom.oserp.pigudf;
importjava.io.IOException;
importorg.apache.pig.EvalFunc;
importorg.apache.pig.data.Tuple;
public class GetClassification extends EvalFunc<String> {
@Override
public String exec(Tuple tuple)throws IOException {
Object object = tuple.get(0);
int temperature = (Integer)object;
if (temperature >= 30){
return "Hot";
}
else if(temperature >=10){
return "Moderate";
}
else {
return "Cool";
}
}
}
依次輸入以下Pig語句:
records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);
register /home/user/hadoop_jar/pigudf.jar;
valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);
dump result;
輸出結果如下:
(1990,Moderate)
(1990,Moderate)
(1991,Moderate)
(1992,Hot)
(1990,Moderate)
代碼比較簡單,該類繼承自EvalFunc類,且我們要明確定義返回值類型。
有些時候其它類庫可能包含有功能相近的Java函數,我們是否可以直接將這些庫函數拿過來使用呢?可以。以下語句調用了trim函數,用于去掉name字段前后的空格:
DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');
B = FOREACH A GENERATE trim(name);
其中的InvokeForString是一個Invoker(不知道該如何翻譯啊),其通過反射機制調用,返回值是String類型。其它類似的還有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。
自定義加載函數
我們以詞頻統計為例,講解如何自定義加載函數。(統計各個單詞出現的頻率,由高到低排序)
一般情況下,load語句加載數據時,一行會被生成一個tuple。而統計詞頻時,我們希望每個單詞生成一個tuple。我們的測試數據文件只有兩行數據,如下:
Thisis a map a reduce program
mapreduce partition combiner
我們希望load后能得到如下形式的數據,每個單詞一個tuple:
(This)
(is)
(a)
(map)
(a)
(reduce)
先看代碼:
package com.oserp.pigudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
importorg.apache.pig.backend.executionengine.ExecException;
importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class WordCountLoadFunc extends LoadFunc {
private RecordReader reader;
TupleFactorytupleFactory = TupleFactory.getInstance();
BagFactorybagFactory = BagFactory.getInstance();
@Override
public InputFormatgetInputFormat() throws IOException {
return new TextInputFormat();
}
@Override
public Tuple getNext()throws IOException {
try {
// 當讀取到分區數據塊的末尾時,返回null表示數據已讀取完
if (!reader.nextKeyValue()){
return null;
}
Textvalue = (Text)reader.getCurrentValue();
Stringline = value.toString();
String[]words = line.split("\\s+");// 斷詞
// 因為getNext函數只能返回一個tuple,
// 而我們希望每個單詞一個單獨的tuple,
// 所以我們將多個tuple放到一個bag里面,
// 然后返回一個包含一個bag的tuple。
// 注:這只是一個用于演示用法的示例,實際中這樣使用不一定合理。
List<Tuple>tuples = new ArrayList<Tuple>();
Tupletuple = null;
for (String word : words) {
tuple= tupleFactory.newTuple();
tuple.append(word);
tuples.add(tuple);
}
DataBagbag = bagFactory.newDefaultBag(tuples);
Tupleresult = tupleFactory.newTuple(bag);
return result;
}
catch (InterruptedException e) {
throw new ExecException(e);
}
}
@Override
public void prepareToRead(RecordReader reader,PigSplit arg1)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job,location);
}
}
依次執行以下命令:
1) records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});
2) flatten_records= foreach records generate flatten($0);
3) grouped_records= group flatten_records by words::w;
4) result= foreach grouped_records generate group,COUNT(flatten_records);
5) final_result= order result by $1 desc,$0;
6) dumpfinal_result;
顯示結果如下:
(a,2)
(map,2)
(reduce,2)
(This,1)
(combiner,1)
(is,1)
(partition,1)
(program,1)
注意schema的定義格式:(words:bag{word:(w:chararray)})