Spark使用CombineTextInputFormat緩解小文件過多導致Task數目過多的問題
來自: http://www.cnblogs.com/yurunmiao/p/5195754.html
目前平臺使用Kafka + Flume的方式進行實時數據接入,Kafka中的數據由業務方負責寫入,這些數據一部分由Spark Streaming進行流式計算;另一部分數據則經由Flume存儲至HDFS,用于數據挖掘或機器學習。HDFS存儲數據時目錄的最小邏輯單位為“小時”,為了保證數據計算過程中的數據完整性(計算某個小時目錄中的數據時,該目錄的數據全部寫入完畢,且不再變化),我們在Flume中加入了如下策略:
每五分鐘關閉一次正在寫入的文件,即新創建文件進行數據寫入。
這樣的方式可以保證,當前小時的第五分鐘之后就可以開始計算上一小時目錄中的數據,一定程度上提高了離線數據處理的實時性。
隨著業務的增加,開始有業務方反饋:“HDFS中實際被分析的數據量很小,但是Spark App的Task數目卻相當多,不太正常”,我們跟進之后,發現問題的根源在于以下三個方面:
(1)Kafka的實時數據寫入量比較小;
(2)Flume部署多個實例,同時消費Kafka中的數據并寫入HDFS;
(3)Flume每五分鐘會重新創建文件寫入數據(如上所述);
這樣的場景直接導致HDFS中存儲著數目眾多但單個文件數據量很小的情況,間接影響著Spark App Task的數目。
我們以Spark WordCount為例進行說明,Spark版本為1.5.1。
假設HDFS目錄“/user/yurun/spark/textfile”中存在以下文件:
這個目錄下僅三個文件包含少量數據:part-00005、part-00010、part-00015,數據大小均為6 Byte,其余文件數據大小均為0 Byte,符合小文件的場景。
注意:_SUCCESS相當于一個“隱藏”文件,實際處理時通常會被忽略。
常規實現
我們使用SparkContext textFile完成數據輸入,應用運行完成之后,通過Spark History Server的頁面可以看到:應用執行過程中,會產生一個Job,包含兩個Stage,每個Stage包含16個Task,也就是說,Task的總數目為32,如下圖所示:
之所以每個Stage包含16個Task,是因為目錄中存有16個文本文件(_SUCCESS不參與計算)。
優化實現
在這個優化的版本中,我們使用SparkContext newAPIHadoopFile完成數據輸入,需要著重說明一下“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”,這個類可以將多個小文件合并生成一個Split,而一個Split會被一個Task處理,從而減少Task的數目。這個應用的執行過程中,會產生兩個Job,每個Job包含一個Stage,每個Stage包含一個Task,也就是說,Task的總數目為2,如下圖所示:
可以看出,通過使用“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”可以很大程度上緩解小文件導致Spark App Task數目過多的問題。
</div>