Spark的性能調優

jopen 8年前發布 | 21K 次閱讀 Spark 分布式/云計算/大數據

下面這些關于Spark的性能調優項,有的是來自官方的,有的是來自別的的工程師,有的則是我自己總結的。

Data Serialization,默認使用的是Java Serialization,這個程序員最熟悉,但是性能、空間表現都比較差。還有一個選項是Kryo Serialization,更快,壓縮率也更高,但是并非支持任意類的序列化。

Memory Tuning,Java對象會占用原始數據2~5倍甚至更多的空間。最好的檢測對象內存消耗的辦法就是創建RDD,然后放到cache里面去,然后在UI 上面看storage的變化;當然也可以使用SizeEstimator來估算。使用-XX:+UseCompressedOops選項可以壓縮指針(8 字節變成4字節)。在調用collect等等API的時候也要小心——大塊數據往內存拷貝的時候心里要清楚。

GC調優。打印GC信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。默認60%的executor內存可以被用來作為RDD的緩存,因此只有40%的內存可以被用來作為對象創建的空間,這一點可以通過設置spark.storage.memoryFraction改變。如果有很多小對象創建,但是這些對象在不完全GC的過程中就可以回收,那么增大Eden區會有一定幫助。如果有任務從HDFS拷貝數據,內存消耗有一個簡單的估算公式——比如HDFS的block size是64MB,工作區內有4個task拷貝數據,而解壓縮一個block要增大3倍大小,那么內存消耗就是:4*3*64MB。另外,工作中遇到過這樣的一個問題:GC默認情況下有一個限制,默認是GC時間不能超過2%的CPU時間,但是如果大量對象創建(在Spark里很容易出現,代碼模式就是一個RDD轉下一個RDD),就會導致大量的GC時間,從而出現OutOfMemoryError: GC overhead limit exceeded,可以通過設置-XX:-UseGCOverheadLimit關掉它。

Level of Parallelism。Spark根據要處理的文件大小設置map task的數量(也可以通過SparkContext.textFile顯式指定),并且使用最大的parent RDD的分區數量來執行reduce操作。設置level of parallelism或者屬性spark.default.parallelism來改變并行級別,通常來說,每一個CPU核可以分配2~3個task

Reduce Task的內存使用。在某些情況下reduce task特別消耗內存,比如當shuffle出現的時候,比如sortByKey、groupByKey、reduceByKey和join等,要在內存里面建立一個巨大的hash table。其中一個解決辦法是增大level of parallelism,這樣每個task的輸入規模就相應減小。

Broadcasting Large Variables。在task使用靜態大對象的時候,可以把它broadcast出去。Spark會打印序列化后的大小,通常來說如果它超過20KB就值得這么做。有一種常見情形是,一個大表join一個小表,把小表broadcast后,大表的數據就不需要在各個node之間瘋跑,安安靜靜地呆在本地等小表broadcast過來就好了

Data Locality。數據和代碼要放到一起才能處理,通常代碼總比數據要小一些,因此把代碼送到各處會更快。Data Locality是數據和處理的代碼在屋里空間上接近的程度:PROCESS_LOCAL(同一個JVM)、NODE_LOCAL(同一個node,比如數據在HDFS上,但是和代碼在同一個node)、NO_PREF、RACK_LOCAL(不在同一個server,但在同一個機架)、ANY。當然優先級從高到低,但是如果在空閑的executor上面沒有未處理數據了,那么就有兩個選擇:(1)要么等如今繁忙的CPU閑下來處理盡可能“本地”的數據,(1)要么就不等直接啟動task去處理相對遠程的數據。默認當這種情況發生Spark會等一會兒(spark.locality),即策略(1),如果繁忙的CPU停不下來,就會執行策略(2)。

文件存儲和讀取的優化。比如對于一些case而言,如果只需要某幾列,使用rcfile和parquet這樣的格式會大大減少文件讀取成本。再有就是存儲文件到S3上或者HDFS上,可以根據情況選擇更合適的格式,比如壓縮率更高的格式。

文件分片。比如在S3上面就支持文件以分片形式存放,后綴是partXX。使用coalesce方法來設置分成多少片,這個調整成并行級別或者其整數倍可以提高讀寫性能。但是太高太低都不好,太低了沒法充分利用S3并行讀寫的能力,太高了則是小文件太多,預處理、合并、連接建立等等都是時間開銷啊,讀寫還容易超過throttle。

Spark的Speculation。通過設置spark.speculation等幾個相關選項,可以讓Spark在發現某些task執行特別慢的時候,可以在不等待完成的情況下被重新執行,最后相同的task只要有一個執行完了,那么最快執行完的那個結果就會被采納。

減少Shuffle。其實Spark的計算往往很快,但是大量開銷都花在網絡和IO上面,而shuffle就是一個典型。舉個例子,如果(k, v1) join (k, v2) => (k, v3),那么,這種情況其實Spark是優化得非常好的,因為需要join的都在一個node的一個partition里面,join很快完成,結果也是在同一個node(這一系列操作可以被放在同一個stage里面)。但是如果數據結構被設計為(obj1) join (obj2) => (obj3),而其中的join條件為obj1.column1 == obj2.column1,這個時候往往就被迫shuffle了,因為不再有同一個key使得數據在同一個node上的強保證。在一定要shuffle的情況下,盡可能減少shuffle前的數據規模,比如這個避免groupByKey的例子

合理的partition。運算過程中數據量時大時小,選擇合適的partition數量關系重大,如果太多partition就導致有很多小任務和空任務產生;如果太少則導致運算資源沒法充分利用,必要時候可以使用repartition來調整,不過它也不是沒有代價的,其中一個最主要代價就是shuffle。再有一個常見問題是數據大小差異太大,這種情況主要是數據的partition的key其實取值并不均勻造成的(默認使用 HashPartitioner),需要改進這一點,比如重寫hash算法。測試的時候想知道partition的數量可以調用 rdd.partitions().size()獲知。

其它一些內容。同事發現Spark1.0.1的速度居然比Spark1.1和1.2快很多,而Spark1.2則比前幾個版本要吃掉多得多的內存。

可供參考的文檔:官方調優文檔Tuning Spark,Spark配置的官方文檔,Spark Programming Guide,JVMGC調優文檔,JVM性能調優文檔,How-to: Tune Your Apache Spark Jobs part-1 & part-2

來源鏈接《四火的嘮叨》

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!