從 MapReduce 到 Hive —— 一次遷移過程小記

jopen 11年前發布 | 32K 次閱讀 MapReduce Hive 分布式/云計算/大數據

1、背景介紹

早先的工作中,有很多比較復雜的分析工作,當時對hive還不熟悉,但是java比較熟悉,所以在進行處理的時候,優先選擇了MR.
但是隨著工作的數據內容越來越多,越來越復雜,對應的調整也越來越多,越來越復雜.純使用MR方式整個流程就比較復雜,如果需要修改某個部分,那首先需要修改代碼中的邏輯,然后把代碼打包上傳到某個可訪問路徑上(一般就是hdfs),然后在調度平臺內執行.如果改動較大的情況,可能還會需要在測試環境中多次調試. 總之就是會花比較多的時間在非業務邏輯改動的工作上.
考慮到維護的成本的增大,慢慢的開始準備將MR的作業,逐漸的移植到一些腳本平臺上去,hive成了我們的首選。

2、實戰場景

先看這樣一個場景. 每一個用戶在登錄到網站上的時候會帶有一個ip地址,多次登錄可能會有多個不同的ip地址.
假設,我們已經有一個 用戶->ip地址這樣的一份數據.我們需要對此進行分析,得到一份來自相同ip的用戶的關系表,數據格式類似
用戶->用戶,具體的ip我們不保留了。

第1步 用udf取最頻繁ip

我們先看一下原始數據的字段,是user_id,ips,我們再來看看ips內容的格式,我們執行
Select * from iptable limit 100
你會發現,雖然我們limit了100而且是沒有任何復雜條件的查詢,hive竟然也會去掃描所有的數據,這非常奇怪也很浪費。原來hive的limit在默認的情況下的執行過程就是把所有數據都跑出來,然后再一個reduce上,進行limit。這是為了保證在某些情況下篩選條件對結果的影響。
但是我們可以通過打開一個hive.limit.optimize.enable=true來簡化這個查詢,當這個選項打開以后hive會讀取hive.limit.row.max.size,hive.limit.optimize.limit.file的默認值來進行小數據量的計算。</span>
我們看到ips的原始數據的格式是ip,ip,… 用逗號分隔的多個ip字符串。
我們要從用戶->[ip地址] 這樣的數據中得到一個用戶使用最多的ip地址作為用戶的最常用ip。這里我們會使用hive的自定義udf來完成這一步的工作。
那么udf是什么呢,udf就是user define function的縮寫.通過它我們可以對hive進行擴展,hive本身已經帶了很多的基本的udf了,比如length(),sin(),unix_timestamp(),regexp_replace()等等.
這些都是一些比較通用的處理,如果有的時候我們要在字段上做一些特殊的邏輯就要自己動手寫了.
下面就是我們用來實現這個功能的udf代碼

@Description(name = “freq
ips”, value = “find most frequence ips from all login ip”, extended = “”)
public class FindFreqIps extends UDF {
    public String evaluate(String content, int limit) {
       // 計算最常用ip的代碼邏輯,并返回結果
       Return result;
    }
}
里面的邏輯主要就是找到前limit個最長使用的Ip,我們看到我們的類需要繼承自hive包中的UDF類,然后任意的定義輸入類型和返回類型,但是方法的名字一定要叫evaluate,hive會使用反射來得到這個方法的輸入輸出。當我們要在hive中使用它的時候,我們要首先把這個類打成jar包,然后讓hive可以訪問到。一般可以直接放在hdfs上,然后使用

Add jar hdfs_path/myjar.jar;
Create temporary function FindFreqIps as ‘FindFreqIps’
Select user_id, FindFreqIps(ips) as freqIps from tablexxx
另外還有一種是繼承自genericUDF,這種方式可以自由的控制輸入和返回類型處理,比起UDF來說更加的靈活些。但是我們這里普通的udf就足夠了。

第2步 列轉行,進行join

從第一步,我們得到了用戶最常用的N個ip,我們這里假設值3個。然后我們要找到這些用戶之間的關聯,即相同的ip的關系。
那么非常直接的方式,我們直接對用戶的ip進行join,但是現在ip是3個連在一起字符串的形式,無法直接join。那么我們就先把ip都分解開。
我們把這個ips的字段進行一個列轉行的轉換,如下

Select user_id,ip from tablexxx
Lateral view explode(split(ips, “,”))
subview as ip
這樣就會得到 user->ip的單條的記錄。這里的
這下要join就方便了,假設上面的結果表是singleIP我們

Select a.user_id as fromid, b.user_id as
toid
SingleIP a
Join  SingleIP b
On a.ip = b.ip and a.user_id <>
b.user_id;
什么,報錯了!
a.user_id <> b.user_id這個部分會報錯,因為hive中join的時候,是只能指定等式來進行匹配的,不支持不等式的條件。如果使用了不等式,會使join的數量變的非常大。
于是,我們就只能曲線救國了。

Select * from
(Select a.user_id as fromid, b.user_id as
toid
SingleIP a  Join
SingleIP b
On a.ip = b.ip) m
Where m.fromid <> m.toid;
你會發現,執行了1次join,2次select使用的mr的步驟還是一步。一般總感覺嵌套了一次select以后也會對應的產生2次mr,難道是hive自己進行了優化嗎?那么我們借助hive的分析工具來看看hive是如何執行的呢。
我們在剛才的語句前加上explain,來看看這個select的執行計劃。
Hive會通過antlr來對輸入的sql語句進行語法分析,產生一個執行計劃。
執行計劃會有三個部分

第一部分是ABSTRACT SYNTAX

TREE抽象語法樹
這里面顯示了hive把這個sql解析成什么樣的各個token。
類似這樣(TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))表示

第二部分是STAGE DEPENDENCIES

一個hive的執行過程會包含多個stage,他們之間有互相依賴的關系。比如下面

Stage-1 is a root
stage

Stage-0 depends on stages: Stage-1

Stage-3 depends on stages: Stage-0</pre>這里的stage-1是root stage。而0依賴于1,3依賴于0。

第三個部分是STAGE PLANS, 就是每個stage中的具體執行的步驟。

在stage plans里面每一個stage和普通的hadoop程序一樣會有map和reduce的過程。我們截取一段map過程的計劃看下。

Stage: Stage-1
    Map Reduce
      Alias ->
Map Operator Tree:
        a
          TableScan
            alias:
a
            Reduce
Output Operator
              key expressions:
expr: ip
type: string

         sort

order: + Map-reduce partition columns: expr: ip type: string tag: 0 value expressions: expr: user_id type: string</pre>這里是對a表也就是SingleIP表的一個map階段的操作。Reduce output operator這里會顯示使用ip作為key,自增排序,因為是string的所以是字典序的自增。Partition使用ip作為分發字段。tag指的是類似一個來源的概念,因為這里的join采用的是reduce join的方式,每一個從不同的map來的數據最后在reduce進行匯合,他們會被打上一個標記,代表他們的來源。然后就是value的內容,user_id。

然后再來看看reduce過程的計劃

Reduce Operator Tree:

    Join

Operator

      condition

map:

Inner Join 0 to 1

      condition

expressions:

        0

{VALUE._col0}

        1

{VALUE._col0}

handleSkewJoin: false

outputColumnNames: _col0, _col2</pre>這里顯示一個join的操作。這里表示把0的內容加到1上。后面有一個handleSkewJoin,這個是hive的一個應對數據傾斜的一種處理方式,默認是關閉的,我們后面再來詳細看。
這里也可以用explain extended,輸出的信息會更加詳細。那么看了這個我們再比較一下我們之前的第二個查詢計劃,我們來看看加上了嵌套查詢以后的執行計劃有什么變化呢?會發現hive在reduce的執行計劃里面會加上一段

Filter
Operator
predicate:
expr: (_col0 <> _col2)
type: boolean
在reduce最后輸出之前,進行了一個過濾的操作,過濾的條件就是外部的查詢的where條件。正如我們所料,hive發現這個過程是可以一次性完成的,所以進行了優化,放在了reduce階段來作了。
另外如果hive中有多張表進行join,如果他們的join key是一樣的,那么hive就會把他們都放在一次mr中完成。

第3步 數據傾斜

上一步中,我們計算出了所有的相同ip的人的點對點關系。但是這個結果集會有不少問題,比如如果某個ip是一個公共出口,那么就會出現同一個ip有上萬人都在使用,他們互相join展開以后,結果的數據量會非常大,時間上很慢不說,最終得到的數據實際上很多我們也用不上(這個是基于業務上得考慮),甚至有可能,在展開的時候會出現各種問題,導致計算時間過長,算不出來。這種情況,我們在hive里面稱之為數據傾斜。

在group by的時候,如果出現某一個reduce上得數據量過大的情況,hive有一個默認的hive.groupby.skewindata選項,當把它設置為true的時候,hive會將原來的一次MR變成2次,第一次,數據在reduce的時候會隨機分發到每個reduce,做部分的聚合,然后第二次的時候再按照group by的key進行分發。這樣可以有效的處理一般的傾斜情況。

而在join的時候,如果join的其中某個key的值非常的多,也會導致傾斜。有的時候,如果有null值,在hive看來null和null是相等的,它也會對他們進行join,也會錯誤的傾斜。由于join的時候,hive會把第一張表的內容放到一個內容map中,然后不斷的讀取后表的內容來進行join,所以如果左邊的表示小表這個過程就會非常的高效。當然使用mapjoin也一種有效的方式,直接把一張足夠小的表完全放到內存來后另一張表進行join。類似這樣

SELECT /*+ MAPJOIN(b) */
a.key, a.value FROM a join b on a.key = b.key;
我們的ip計算使用的是自己join自己,所有也沒有大小表之分,同時單表的數據量也大到無法完全放進內存,那么是不是就要進行硬算呢?在實際中,因為ip的分布沒有傾斜到太過火的程度,硬算也確實可以,但是這里我們換一種方式來稍稍優化一下。
首先我們采用bucket的方式來保存之間的用戶->ip的數據。使用ip來作為分桶鍵。

CREATE TABLE userip(user_id bigint, ip STRING)
CLUSTERED BY (ip) INTO 128 BUCKETS;
然后set hive.enforce.bucketing  = true;開啟bucket計算
from tableaaa
insert overwrite table tablebbb
select user_id, ip;
結果將會被保存到128個不同的桶中,默認根據ip的hashcode來取模。這樣每個桶內的數據基本大概是原數據量的1/100。當然如果原始數據量太大,還可以分桶更加多一些。
這個地方如果我們不開啟enforce.bucketing的話,也可以通過設置
set mapred.reduce.tasks=128.然后在查詢中cluster by來強制指定進行分桶。這步完成之后,我們再來進行設置
set hive.optimize.bucketmapjoin=true;
set  hive.optimize.bucketmapjoin.sortedmerge=true;
然后hive就能對每個分塊數據進行mapjoin。

第4步 用udaf取top N

好了,現在我們已經有所有的user->user的數據,我們希望要一個user->[users]的一對多的記錄,但是這個數據量有點大,實際上每個用戶大概關聯1000個已經足夠了。首先對數據進行排序,排序的依據就是按照用戶的相同的ip的數量。然后去最前面的1000個,不足的按實際數量取。
這個地方比較容易想到的就是,先group by fromid,toid,然后count一個總數作為新字段,如下
這里想到一種做法是用淘寶的一個類sql的row_number實現,然后用row_number來對fromid做主鍵,給按照count從大到小寫上序列編號seq。最后做一個嵌套查詢,只取seq<=1000的數據。Row_number的話標準的hive中沒有。那么這里就可以讓自定義udaf上場了。
Udaf顧名思義就是一個Aggregate的udf,和之前的udf的區別就是他一般是用來group by的場合中。

@Description(name = “myudaf”, value = “calc users has most same ips ” )

public class GenericUDAFCollect extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { return new MyUDAFEvaluator(); } }</pre>

自己定義一個evaluator,并且實現其中的一些方法。

public static class MyUDAFEvaluator extends GenericUDAFEvaluator {
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {

}
@Override
public void reset(AggregationBuffer agg) throws HiveException {

}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {

}
// Mapside
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {

}
// Mapside
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {

}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {

}
// Reduceside
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {

}

}</pre>在init階段會傳一個Mode進來,這個Mode中定義了以下的幾個階段
PARTIAL1: 這個是map階段,這個階段會調用iterate(),和terminatePartial()
PARTIAL2:  這個是map段得combiner階段,會將map端的數據進行合并,也可能沒有這個階段。會執行merge()和terminatePartial()

FINAL: 這個是reduce階段,會調用merge()和terminate()

COMPLETE: 這是純map處理,無reduce的情況出現的階段,它會調用iterate()和terminate()
而從函數方面來說,init是初始化,他會傳入mode作為參數。可以根據不同的階段采取不同的處理。getNewAggregationBuffer的處理是hive為了內存的復用,減少gc,他并不是每一次處理一條記錄都會新申請空間,而是在處理一批數據的時候重復使用一批內存。Terminate就是最終的輸出了。
Ok,了解了udaf,那么可以動手了。Sql如下

Select fromid, getTopN(toid,n) from tablexx3
Group by fromid
其中的getTopN首先在map端,將每一個fromid的關聯的toid的次數都記錄下來,記錄條數代表重復的ip數量,然后按照這個次數進行倒序排序,截取前n個。
在reduce端,將各個map端的結果再按照次數倒序排序,再進行截取n個并進行合并。最終輸出的就是每個fromid對應的toid的列表了。
從這次從mr轉換到hive的過程中,對我們目前的mr和hive進行了一些比較

3、mr和hive比較

1. 運算資源消耗

無論從時間,數據量,計算量上來看,一般情況下mr都是優于或者等于hive的。mr的靈活性是毋庸置疑的。在轉換到hive的過程中,會有一些為了實現某些場景的需求而不得不用多步hive來實現的時候。

2. 開發成本/維護成本

毫無疑問,hive的開發成本是遠低于mr的。如果能熟練的運用udf和transform會更加提高hvie開發的效率。另外對于數據的操作也非常的直觀,對于全世界程序員都喜聞樂見的sql語法的繼承也讓它更加的容易上手。
   hive獨有的分區管理,方便進行數據的管理。
   代碼的管理也很方便,就是直接的文本。</span>
   邏輯的修改和生效很方便。</span>
   但是當出現異常錯誤的時候,hive的調試會比較麻煩。特別是在大的生產集群上面的時候。</span>

3. 底層相關性

在使用hive以后,讀取文件的時候,再也不用關心文件的格式,文件的分隔符,只要指定一次,hive就會保存好。相比mr來說方便了很多。
當側重關心與業務相關的內容的時候,用hive會比較有優勢。而在一些性能要求高,算法研究的時候,mr會更加適合。

原文地址:http://rdc.taobao.org/?p=1457

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!