Spark的性能調優
基本概念和原則
首先,要搞清楚Spark的幾個基本概念和原則,否則系統的性能調優無從談起:
- 每一臺host上面可以并行N個worker,每一個worker下面可以并行M個executor,task們會被分配到executor上面去執行。Stage指的是一組并行運行的task,stage內部是不能出現shuffle的,因為shuffle的就像籬笆一樣阻止了并行task的運行,遇到shuffle就意味著到了stage的邊界。
- CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor并不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一臺host上面增加更多的worker來增加并行執行的executor的數量,從而增加CPU利用率。但是增加executor的時候需要考慮好內存消耗,因為一臺機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。
- partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。
- 上面這兩條原理上看起來很簡單,但是卻非常重要,根據硬件和任務的情況選擇不同的取值。想要取一個放之四海而皆準的配置是不現實的。看這樣幾個例子:(1)實踐中跑的EMR Spark job,有的特別慢,查看CPU利用率很低,我們就嘗試減少每個executor占用CPU core的數量,增加并行的executor數量,同時配合增加分片,整體上增加了CPU的利用率,加快數據處理速度。(2)發現某job很容易發生內存溢出,我們就增大分片數量,從而減少了每片數據的規模,同時還減少并行的executor數量,這樣相同的內存資源分配給數量更少的executor,相當于增加了每個task的內存分配,這樣運行速度可能慢了些,但是總比OOM強。(3)數據量特別少,有大量的小文件生成,就減少文件分片,沒必要創建那么多task,這種情況,如果只是最原始的input比較小,一般都能被注意到;但是,如果是在運算過程中,比如應用某個reduceBy或者某個filter以后,數據大量減少,這種低效情況就很少被留意到。
- 最后再補充一點,隨著參數和配置的變化,性能的瓶頸是變化的,在分析問題的時候不要忘記。例如在每臺機器上部署的executor數量增加的時候,性能一開始是增加的,同時也觀察到CPU的平均使用率在增加;但是隨著單臺機器上的executor越來越多,性能下降了,因為隨著executor的數量增加,被分配到每個executor的內存數量減小,在內存里直接操作的越來越少,spill over到磁盤上的數據越來越多,自然性能就變差了。
下面給這樣一個直觀的例子,當前總的cpu利用率并不高:
但是經過根據上述原則的的調整之后,可以顯著發現cpu總利用率增加了:
其次,涉及性能調優我們經常要改配置,在Spark里面有三種常見的配置方式,雖然有些參數的配置是可以互相替代,但是作為最佳實踐,還是需要遵循不同的情形下使用不同的配置:
- 設置環境變量 ,這種方式主要用于和環境、硬件相關的配置;
- 命令行參數,這種方式主要用于不同次的運行會發生變化的參數,用雙橫線開頭;
- 代碼里面(比如Scala)顯式設置(SparkConf對象),這種配置通常是application級別的配置,一般不改變。
舉一個配置的具體例子。slave、worker和executor之間的比例調整。我們經常需要調整并行的executor的數量,那么簡單說有兩種方式:
- 每個worker內始終跑一個executor,但是調整單臺slave上并行的worker的數量。比如, SPARK_WORKER_INSTANCES 可以設置每個slave的worker的數量,但是在改變這個參數的時候,比如改成2,一定要相應設置SPARK_WORKER_CORES的值,讓每個worker使用原有一半的core,這樣才能讓兩個worker一同工作;
- 每臺slave內始終只部署一個worker,但是worker內部署多個executor。我們是在YARN框架下采用這個調整來實現executor數量改變的,一種典型辦法是,一個host只跑一個worker,然后配置spark.executor.cores為host上CPU core的N分之一,同時也設置spark.executor.memory為host上分配給Spark計算內存的N分之一,這樣這個host上就能夠啟動N個executor。
有的配置在不同的MR框架/工具下是不一樣的,比如YARN下有的參數的默認取值就不同,這點需要注意。
明確這些基礎的事情以后,再來一項一項看性能調優的要點。
內存
Memory Tuning,Java對象會占用原始數據2~5倍甚至更多的空間。最好的檢測對象內存消耗的辦法就是創建RDD,然后放到cache里面去,然后在UI上面看storage的變化;當然也可以使用SizeEstimator來估算。使用-XX:+UseCompressedOops選項可以壓縮指針(8字節變成4字節)。在調用collect等等API的時候也要小心——大塊數據往內存拷貝的時候心里要清楚。內存要留一些給操作系統,比如20%,這里面也包括了OS的buffer cache,如果預留得太少了,會見到這樣的錯誤:
“ Required executor memory (235520+23552 MB) is above the max threshold (241664 MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.
或者干脆就沒有這樣的錯誤,但是依然有因為內存不足導致的問題,有的會有警告,比如這個:
“ 16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
有的時候連這樣的日志都見不到,而是見到一些不清楚原因的executor丟失信息:
“ Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)
Reduce Task的內存使用。在某些情況下reduce task特別消耗內存,比如當shuffle出現的時候,比如sortByKey、groupByKey、reduceByKey和join等,要在內存里面建立一個巨大的hash table。其中一個解決辦法是增大level of parallelism,這樣每個task的輸入規模就相應減小。另外,注意shuffle的內存上限設置,有時候有足夠的內存,但是shuffle內存不夠的話,性能也是上不去的。我們在有大量數據join等操作的時候,shuffle的內存上限經常配置到executor的50%。
注意原始input的大小,有很多操作始終都是需要某類全集數據在內存里面完成的,那么并非拼命增加parallelism和partition的值就可以把內存占用減得非常小的。我們遇到過某些性能低下甚至OOM的問題,是改變這兩個參數所難以緩解的。但是可以通過增加每臺機器的內存,或者增加機器的數量都可以直接或間接增加內存總量來解決。
在選擇EC2機器類型的時候,要明確瓶頸(可以借由測試來明確),比如我們遇到的情況就是使用r3.8 xlarge和c3.8 xlarge選擇的問題,運算能力相當,前者比后者貴50%,但是內存是后者的5倍。
另外,有一些RDD的API,比如cache,persist,都會把數據強制放到內存里面,如果并不明確這樣做帶來的好處,就不要用它們。
CPU
Level of Parallelism。指定它以后,在進行reduce類型操作的時候,默認partition的數量就被指定了。這個參數在實際工程中通常是必不可少的,一般都要根據input和每個executor內存的大小來確定。設置level of parallelism或者屬性spark.default.parallelism來改變并行級別,通常來說,每一個CPU核可以分配2~3個task。
CPU core的訪問模式是共享還是獨占。即CPU核是被同一host上的executor共享還是瓜分并獨占。比如,一臺機器上共有32個CPU core的資源,同時部署了兩個executor,總內存是50G,那么一種方式是配置spark.executor.cores為16,spark.executor.memory為20G,這樣由于內存的限制,這臺機器上會部署兩個executor,每個都使用20G內存,并且各使用“獨占”的16個CPU core資源;而在內存資源不變的前提下,也可以讓這兩個executor“共享”這32個core。根據我的測試,獨占模式的性能要略好與共享模式。
GC調優。打印GC信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。要記得默認60%的executor內存可以被用來作為RDD的緩存,因此只有40%的內存可以被用來作為對象創建的空間,這一點可以通過設置spark.storage.memoryFraction改變。如果有很多小對象創建,但是這些對象在不完全GC的過程中就可以回收,那么增大Eden區會有一定幫助。如果有任務從HDFS拷貝數據,內存消耗有一個簡單的估算公式——比如HDFS的block size是64MB,工作區內有4個task拷貝數據,而解壓縮一個block要增大3倍大小,那么估算內存消耗就是:4*3*64MB。另外,工作中遇到過這樣的一個問題:GC默認情況下有一個限制,默認是GC時間不能超過2%的CPU時間,但是如果大量對象創建(在Spark里很容易出現,代碼模式就是一個RDD轉下一個RDD),就會導致大量的GC時間,從而出現“OutOfMemoryError: GC overhead limit exceeded”,對于這個,可以通過設置-XX:-UseGCOverheadLimit關掉它。
序列化和傳輸
Data Serialization,默認使用的是Java Serialization,這個程序員最熟悉,但是性能、空間表現都比較差。還有一個選項是Kryo Serialization,更快,壓縮率也更高,但是并非支持任意類的序列化。在Spark UI上能夠看到序列化占用總時間開銷的比例,如果這個比例高的話可以考慮優化內存使用和序列化。
Broadcasting Large Variables。在task使用靜態大對象的時候,可以把它broadcast出去。Spark會打印序列化后的大小,通常來說如果它超過20KB就值得這么做。有一種常見情形是,一個大表join一個小表,把小表broadcast后,大表的數據就不需要在各個node之間瘋跑,安安靜靜地呆在本地等小表broadcast過來就好了。
Data Locality。數據和代碼要放到一起才能處理,通常代碼總比數據要小一些,因此把代碼送到各處會更快。Data Locality是數據和處理的代碼在屋里空間上接近的程度:PROCESS_LOCAL(同一個JVM)、NODE_LOCAL(同一個node,比如數據在HDFS上,但是和代碼在同一個node)、NO_PREF、RACK_LOCAL(不在同一個server,但在同一個機架)、ANY。當然優先級從高到低,但是如果在空閑的executor上面沒有未處理數據了,那么就有兩個選擇:
(1)要么等如今繁忙的CPU閑下來處理盡可能“本地”的數據,
(2)要么就不等直接啟動task去處理相對遠程的數據。
默認當這種情況發生Spark會等一會兒(spark.locality),即策略(1),如果繁忙的CPU停不下來,就會執行策略(2)。
代碼里對大對象的引用。在task里面引用大對象的時候要小心,因為它會隨著task序列化到每個節點上去,引發性能問題。只要序列化的過程不拋出異常,引用對象序列化的問題事實上很少被人重視。如果,這個大對象確實是需要的,那么就不如干脆把它變成RDD好了。絕大多數時候,對于大對象的序列化行為,是不知不覺發生的,或者說是預期之外的,比如在我們的項目中有這樣一段代碼:
rdd.map(r => { println(BackfillTypeIndex) })
其實呢,它等價于這樣:
rdd.map(r => { println(this.BackfillTypeIndex) })
不要小看了這個this,有時候它的序列化是非常大的開銷。
對于這樣的問題,一種最直接的解決方法就是:
val dereferencedVariable = this.BackfillTypeIndex rdd.map(r => println(dereferencedVariable)) // "this" is not serialized
相關地,注解@transient用來標識某變量不要被序列化,這對于將大對象從序列化的陷阱中排除掉是很有用的。另外,注意class之間的繼承層級關系,有時候一個小的case class可能來自一棵大樹。
文件讀寫
文件存儲和讀取的優化。比如對于一些case而言,如果只需要某幾列,使用rcfile和parquet這樣的格式會大大減少文件讀取成本。再有就是存儲文件到S3上或者HDFS上,可以根據情況選擇更合適的格式,比如壓縮率更高的格式。另外,特別是對于shuffle特別多的情況,考慮留下一定量的額外內存給操作系統作為操作系統的buffer cache,比如總共50G的內存,JVM最多分配到40G多一點。
文件分片。比如在S3上面就支持文件以分片形式存放,后綴是partXX。使用coalesce方法來設置分成多少片,這個調整成并行級別或者其整數倍可以提高讀寫性能。但是太高太低都不好,太低了沒法充分利用S3并行讀寫的能力,太高了則是小文件太多,預處理、合并、連接建立等等都是時間開銷啊,讀寫還容易超過throttle。
任務
Spark的Speculation。通過設置spark.speculation等幾個相關選項,可以讓Spark在發現某些task執行特別慢的時候,可以在不等待完成的情況下被重新執行,最后相同的task只要有一個執行完了,那么最快執行完的那個結果就會被采納。
減少Shuffle。其實Spark的計算往往很快,但是大量開銷都花在網絡和IO上面,而shuffle就是一個典型。舉個例子,如果(k, v1) join (k, v2) => (k, v3),那么,這種情況其實Spark是優化得非常好的,因為需要join的都在一個node的一個partition里面,join很快完成,結果也是在同一個node(這一系列操作可以被放在同一個stage里面)。但是如果數據結構被設計為(obj1) join (obj2) => (obj3),而其中的join條件為obj1.column1 == obj2.column1,這個時候往往就被迫shuffle了,因為不再有同一個key使得數據在同一個node上的強保證。在一定要shuffle的情況下,盡可能減少shuffle前的數據規模,比如 這個避免groupByKey的例子 。下面這個比較的圖片來自 Spark Summit 2013的一個演講 ,講的是同一件事情:
Repartition。運算過程中數據量時大時小,選擇合適的partition數量關系重大,如果太多partition就導致有很多小任務和空任務產生;如果太少則導致運算資源沒法充分利用,必要時候可以使用repartition來調整,不過它也不是沒有代價的,其中一個最主要代價就是shuffle。再有一個常見問題是數據大小差異太大,這種情況主要是數據的partition的key其實取值并不均勻造成的(默認使用HashPartitioner),需要改進這一點,比如重寫hash算法。測試的時候想知道partition的數量可以調用rdd.partitions().size()獲知。
Task時間分布。關注Spark UI,在Stage的詳情頁面上,可以看得到shuffle寫的總開銷,GC時間,當前方法棧,還有task的時間花費。如果你發現task的時間花費分布太散,就是說有的花費時間很長,有的很短,這就說明計算分布不均,需要重新審視數據分片、key的hash、task內部的計算邏輯等等,瓶頸出現在耗時長的task上面。
重用資源。有的資源申請開銷巨大,而且往往相當有限,比如建立連接,可以考慮在partition建立的時候就創建好(比如使用mapPartition方法),這樣對于每個partition內的每個元素的操作,就只要重用這個連接就好了,不需要重新建立連接。
編者按:本文由作者授權轉載, 原文地址 ,基于作者本身的經驗以及官方和別的工程師的總結,作者提供了可供參考的文檔:官方調優文檔 Tuning Spark ,Spark配置的 官方文檔 ,Spark Programming Guide ,JVMGC 調優文檔 ,JVM性能調優文檔,How-to: Tune Your Apache Spark Jobs part-1 & part-2 。
感謝杜小芳對本文的審校。
給InfoQ中文站投稿或者參與內容翻譯工作,請郵件至editors@cn.infoq.com。也歡迎大家通過新浪微博(@InfoQ,@丁曉昀),微信(微信號: InfoQChina )關注我們,并與我們的編輯和其他讀者朋友交流(歡迎加入InfoQ讀者交流群 (已滿),InfoQ讀者交流群(#2)
)。
來自: http://www.infoq.com/cn/news/2016/01/Spark-performance-tuning