Pig用戶自定義函數(UDF)

jopen 10年前發布 | 18K 次閱讀 Pig 分布式/云計算/大數據

我們以氣溫統計和詞頻統計為例,講解以下三種用戶自定義函數。

用戶自定義函數

什么時候需要用戶自定義函數呢?和其它語言一樣,當你希望簡化程序結構或者需要重用程序代碼時,函數就是你不二選擇。

Pig的用戶自定義函數可以用Java編寫,但是也可以用Python或Javascript編寫。我們接下來以Java為例。

自定義過濾函數

我們仍然以先前的代碼為例:

records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);

valid_records = filter records by temperature!=999;

第二個語句的作用就是篩選合法的數據。如果我們采用用戶自定義函數,則第二個語句可以寫成:

valid_records = filter records by isValid(temperature);

這種寫法更容易理解,也更容易在多個地方重用。接下來的問題就是如何定義這個isValid函數。代碼如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.FilterFunc;

importorg.apache.pig.data.Tuple;

 

public class IsValidTemperature extends FilterFunc {

         @Override

         public Boolean exec(Tuple tuple)throws IOException {            

                   Object object = tuple.get(0);

                   int temperature = (Integer)object;            

                   return temperature != 999;

         }

}

接下來,我們需要:

1)  編譯代碼并打包成jar文件,比如pigudf.jar。

2)  通過register命令將這個jar文件注冊到pig環境:

register/home/user/hadoop_jar/pigudf.jar //參數為jar文件的本地路徑

此時,我們就可以用以下語句調用這個函數:

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

dump valid_records;

看起來這個函數名太長,不便輸入。我們可以用定義別名的方式代替:

define isValid com.oserp.pigudf.IsValidTemperature();

valid_records = filter records by isValid(temperature);

dump valid_records;

回到代碼,我們可發現:

1)  需要定義一個繼承自FilterFunc的類。

2)  重寫這個類的exec方法。這個方法的參數只有一個tuple,但是調用時可以傳遞多個參數,你可以通過索引號獲得對應的參數值,比如tuple.get(1)表示取第二個參數。

3)  調用時,需要使用類的全名。(當然你可以自定義別名)

4)  更多的驗證需要讀者自行在函數中添加,比如判斷是否為null等等。

 

備注:用Eclipse編寫Pig自定義函數時,你可能需要引用到一些Hadoop的庫文件。比較容易的方式是在新建項目時指定項目類型為MapReduce項目,這樣Eclipse就會自動設置庫引用的相關信息。

 

自定義運算函數(Eval function)

仍然以前面的數據文件為例:

1990 21

1990 18

1991 21

1992 30

1992 999

1990 23

假設我們希望通過溫度值獲得一個溫度的分類信息,比如我們把溫度大于劃分為以下類型:

溫度                            分類

x>=30                          hot

x>=10 and x<30        moderate

x<10                                      cool

則我們可以定義以下函數,代碼如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.EvalFunc;

importorg.apache.pig.data.Tuple;

 

public class GetClassification extends EvalFunc<String> {

         @Override

         public String exec(Tuple tuple)throws IOException {               

                   Object object = tuple.get(0);

                   int temperature = (Integer)object;

                  

                   if (temperature >= 30){

                            return "Hot";

                   }

                   else if(temperature >=10){

                            return "Moderate";

                   }

                   else {

                            return "Cool";

                   }                

         }

}

依次輸入以下Pig語句:

records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);

register /home/user/hadoop_jar/pigudf.jar;

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);

dump result;

輸出結果如下:

(1990,Moderate)

(1990,Moderate)

(1991,Moderate)

(1992,Hot)    

(1990,Moderate)

代碼比較簡單,該類繼承自EvalFunc類,且我們要明確定義返回值類型。

 

有些時候其它類庫可能包含有功能相近的Java函數,我們是否可以直接將這些庫函數拿過來使用呢?可以。以下語句調用了trim函數,用于去掉name字段前后的空格:

DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');

B = FOREACH A GENERATE trim(name);

其中的InvokeForString是一個Invoker(不知道該如何翻譯啊),其通過反射機制調用,返回值是String類型。其它類似的還有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

 

自定義加載函數

我們以詞頻統計為例,講解如何自定義加載函數。(統計各個單詞出現的頻率,由高到低排序)

一般情況下,load語句加載數據時,一行會被生成一個tuple。而統計詞頻時,我們希望每個單詞生成一個tuple。我們的測試數據文件只有兩行數據,如下:

Thisis a map a reduce program

mapreduce partition combiner

我們希望load后能得到如下形式的數據,每個單詞一個tuple:

(This)

(is)

(a)

(map)

(a)

(reduce)

 

先看代碼:

package com.oserp.pigudf;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.RecordReader;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.pig.LoadFunc;

importorg.apache.pig.backend.executionengine.ExecException;

importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

import org.apache.pig.data.BagFactory;

import org.apache.pig.data.DataBag;

import org.apache.pig.data.Tuple;

import org.apache.pig.data.TupleFactory;

 

 

public class WordCountLoadFunc extends LoadFunc {

        

         private RecordReader reader;

         TupleFactorytupleFactory = TupleFactory.getInstance();

         BagFactorybagFactory = BagFactory.getInstance();

        

         @Override        

         public InputFormatgetInputFormat() throws IOException {

                   return new TextInputFormat();

         }      

 

         @Override

         public Tuple getNext()throws IOException {

                    

                   try {

                            // 當讀取到分區數據塊的末尾時,返回null表示數據已讀取完

                            if (!reader.nextKeyValue()){

                                     return null;

                                     }

                            Textvalue = (Text)reader.getCurrentValue();

                            Stringline = value.toString();

                            String[]words =  line.split("\\s+");// 斷詞

                           

                            // 因為getNext函數只能返回一個tuple

                            // 而我們希望每個單詞一個單獨的tuple

                            // 所以我們將多個tuple放到一個bag里面,

                            // 然后返回一個包含一個bagtuple

                            // 注:這只是一個用于演示用法的示例,實際中這樣使用不一定合理。

                            List<Tuple>tuples = new ArrayList<Tuple>();                    

                            Tupletuple = null;

                            for (String word : words) {

                                     tuple= tupleFactory.newTuple();

                                     tuple.append(word);

                                     tuples.add(tuple);

                                     }

                           

                            DataBagbag = bagFactory.newDefaultBag(tuples);

                            Tupleresult = tupleFactory.newTuple(bag);

                           

                            return result;

                   }

                   catch (InterruptedException e) {

                            throw new ExecException(e);

                   }

                  

         }

 

         @Override

         public void prepareToRead(RecordReader reader,PigSplit arg1)

                            throws IOException {

                   this.reader = reader;

         }

 

         @Override

         public void setLocation(String location, Job job) throws IOException {

                   FileInputFormat.setInputPaths(job,location);          

         }

 

}

 

依次執行以下命令:

1)       records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});

2)       flatten_records= foreach records generate flatten($0);

3)       grouped_records= group flatten_records by words::w;

4)       result= foreach grouped_records generate group,COUNT(flatten_records);

5)       final_result= order result by $1 desc,$0;

6)       dumpfinal_result;

顯示結果如下:

(a,2)                  

(map,2)            

(reduce,2)        

(This,1)             

(combiner,1)   

(is,1)                           

(partition,1)    

(program,1)     

注意schema的定義格式:(words:bag{word:(w:chararray)})

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!