Spark使用CombineTextInputFormat緩解小文件過多導致Task數目過多的問題

FloridaStil 8年前發布 | 30K 次閱讀 Spark 分布式/云計算/大數據

來自: http://www.cnblogs.com/yurunmiao/p/5195754.html

目前平臺使用Kafka + Flume的方式進行實時數據接入,Kafka中的數據由業務方負責寫入,這些數據一部分由Spark Streaming進行流式計算;另一部分數據則經由Flume存儲至HDFS,用于數據挖掘或機器學習。HDFS存儲數據時目錄的最小邏輯單位為“小時”,為了保證數據計算過程中的數據完整性(計算某個小時目錄中的數據時,該目錄的數據全部寫入完畢,且不再變化),我們在Flume中加入了如下策略:

每五分鐘關閉一次正在寫入的文件,即新創建文件進行數據寫入。

這樣的方式可以保證,當前小時的第五分鐘之后就可以開始計算上一小時目錄中的數據,一定程度上提高了離線數據處理的實時性。

隨著業務的增加,開始有業務方反饋:“HDFS中實際被分析的數據量很小,但是Spark App的Task數目卻相當多,不太正常”,我們跟進之后,發現問題的根源在于以下三個方面:

(1)Kafka的實時數據寫入量比較小;

(2)Flume部署多個實例,同時消費Kafka中的數據并寫入HDFS;

(3)Flume每五分鐘會重新創建文件寫入數據(如上所述);

這樣的場景直接導致HDFS中存儲著數目眾多但單個文件數據量很小的情況,間接影響著Spark App Task的數目。

我們以Spark WordCount為例進行說明,Spark版本為1.5.1。

假設HDFS目錄“/user/yurun/spark/textfile”中存在以下文件:

這個目錄下僅三個文件包含少量數據:part-00005、part-00010、part-00015,數據大小均為6 Byte,其余文件數據大小均為0 Byte,符合小文件的場景。

注意:_SUCCESS相當于一個“隱藏”文件,實際處理時通常會被忽略。

常規實現

我們使用SparkContext textFile完成數據輸入,應用運行完成之后,通過Spark History Server的頁面可以看到:應用執行過程中,會產生一個Job,包含兩個Stage,每個Stage包含16個Task,也就是說,Task的總數目為32,如下圖所示:

之所以每個Stage包含16個Task,是因為目錄中存有16個文本文件(_SUCCESS不參與計算)。

優化實現

在這個優化的版本中,我們使用SparkContext newAPIHadoopFile完成數據輸入,需要著重說明一下“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”,這個類可以將多個小文件合并生成一個Split,而一個Split會被一個Task處理,從而減少Task的數目。這個應用的執行過程中,會產生兩個Job,每個Job包含一個Stage,每個Stage包含一個Task,也就是說,Task的總數目為2,如下圖所示:

可以看出,通過使用“org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat”可以很大程度上緩解小文件導致Spark App Task數目過多的問題。 

</div>

 本文由用戶 FloridaStil 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!