Spark性能調優
通常我們對一個系統進行性能優化無怪乎兩個步驟——性能監控和參數調整,本文主要分享的也是這兩方面內容。
性能監控工具
【Spark監控工具】
Spark提供了一些基本的Web監控頁面,對于日常監控十分有用。
1. Application Web UI
http://master:4040(默認端口是4040,可以通過spark.ui.port修改)可獲得這些信息:(1)stages和tasks調度情況;(2)RDD大小及內存使用;(3)系統環境信息;(4)正在執行的executor信息。
2. history server
當Spark應用退出后,仍可以獲得歷史Spark應用的stages和tasks執行信息,便于分析程序不明原因掛掉的情況。配置方法如下:
(1)$SPARK_HOME/conf/spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50
Dspark.history.fs.logDirectory=hdfs://hadoop000:8020/directory"
說明:spark.history.retainedApplica-tions僅顯示最近50個應用spark.history.fs.logDirectory:Spark History Server頁面只展示該路徑下的信息。
(2)$SPARK_HOME/conf/spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop000:8020/directory #應用在運行過程中所有的信息均記錄在該屬性指定的路徑下
3. spark.eventLog.compress true
(1)HistoryServer啟動
$SPARK_HOMR/bin/start-histrory-server.sh
(2)HistoryServer停止
$SPARK_HOMR/bin/stop-histrory-server.sh
4. ganglia
通過配置ganglia,可以分析集群的使用狀況和資源瓶頸,但是默認情況下ganglia是未被打包的,需要在mvn編譯時添加-Pspark-ganglia-lgpl,并修改配置文件$SPARK_HOME/conf/metrics.properties。
5. Executor logs
Standalone模式:$SPARK_HOME/logs
YARN模式:在yarn-site.xml文件中配置了YARN日志的存放位置:yarn.nodemanager.log-dirs,或使用命令獲取yarn logs -applicationId。
【其他監控工具】
1. Nmon (http://www.ibm.com/developerworks/aix/library/au-analyze_aix/)
Nmon 輸入:c:CPU n:網絡 m:內存 d:磁盤
2. Jmeter (http://jmeter. apache.org/)
通常使用Jmeter做系統性能參數的實時展示,JMeter的安裝非常簡單,從官方網站上下載,解壓之后即可使用。運行命令在%JMETER_HOME%/bin下,對于 Windows 用戶,直接使用jmeter.bat。
啟動jmeter:創建測試計劃,設置線程組設置循環次數。
添加監聽器:jp@gc - PerfMon Metrics Collector。
設置監聽器:監聽主機端口及監聽內容,例如CPU。
啟動監聽:可以實時獲得節點的CPU狀態信息,從圖4可看出CPU已出現瓶頸。
3. Jprofiler (http://www.ej-technologies.com/products/jprofiler/overview.html)
JProfiler是一個全功能的Java剖析工具(profiler),專用于分析J2SE和J2EE應用程式。它把CPU、線程和內存的剖析 組合在一個強大的應用中。JProfiler的GUI可以更方便地找到性能瓶頸、抓住內存泄漏(memory leaks),并解決多線程的問題。例如分析哪個對象占用的內存比較多;哪個方法占用較大的CPU資源等;我們通常使用Jprofiler來監控 Spark應用在local模式下運行時的性能瓶頸和內存泄漏情況。
上述幾個工具可以直接通過提供的鏈接了解詳細的使用方法。
Spark調優
【Spark集群并行度】
在Spark集群環境下,只有足夠高的并行度才能使系統資源得到充分的利用,可以通過修改spark-env.sh來調整Executor的數量和使用資源,Standalone和YARN方式資源的調度管理是不同的。
在Standalone模式下:
1. 每個節點使用的最大內存數:SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY;
2. 每個節點的最大并發task數:SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES。
在YARN模式下:
1. 集群task并行度:SPARK_ EXECUTOR_INSTANCES* SPARK_EXECUTOR_CORES;
2. 集群內存總量:(executor個數) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+ (SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)。
重點強調:Spark對Executor和Driver額外添加堆內存大小,Executor端:由 spark.yarn.executor.memoryOverhead設置,默認值executorMemory * 0.07與384的最大值。Driver端:由spark.yarn.driver.memoryOverhead設置,默認值driverMemory * 0.07與384的最大值。
通過調整上述參數,可以提高集群并行度,讓系統同時執行的任務更多,那么對于相同的任務,并行度高了,可以減少輪詢次數。舉例說明:如果一個stage有100task,并行度為50,那么執行完這次任務,需要輪詢兩次才能完成,如果并行度為100,那么一次就可以了。
但是在資源相同的情況,并行度高了,相應的Executor內存就會減少,所以需要根據實際實況協調內存和core。此外,Spark能夠非常有 效的支持短時間任務(例如:200ms),因為會對所有的任務復用JVM,這樣能減小任務啟動的消耗,Standalone模式下,core可以允許 1-2倍于物理core的數量進行超配。
【Spark任務數量調整】
Spark的任務數由stage中的起始的所有RDD的partition之和數量決定,所以需要了解每個RDD的partition的計算方 法。以Spark應用從HDFS讀取數據為例,HadoopRDD的partition切分方法完全繼承于MapReduce中的 FileInputFormat,具體的partition數量由HDFS的塊大小、mapred.min.split.size的大小、文件的壓縮方式 等多個因素決定,詳情需要參見FileInputFormat的代碼。
【Spark內存調優】
內存優化有三個方面的考慮:對象所占用的內存,訪問對象的消耗以及垃圾回收所占用的開銷。
1. 對象所占內存,優化數據結構
Spark 默認使用Java序列化對象,雖然Java對象的訪問速度更快,但其占用的空間通常比其內部的屬性數據大2-5倍。為了減少內存的使用,減少Java序列 化后的額外開銷,下面列舉一些Spark官網(http://spark.apache.org/docs/latest /tuning.html#tuning-data-structures)提供的方法。
(1)使用對象數組以及原始類型(primitive type)數組以替代Java或者Scala集合類(collection class)。fastutil 庫為原始數據類型提供了非常方便的集合類,且兼容Java標準類庫。
(2)盡可能地避免采用含有指針的嵌套數據結構來保存小對象。
(3)考慮采用數字ID或者枚舉類型以便替代String類型的主鍵。
(4)如果內存少于32GB,設置JVM參數-XX:+UseCompressedOops以便將8字節指針修改成4字節。與此同時,在 Java 7或者更高版本,設置JVM參數-XX:+UseCompressedStrings以便采用8比特來編碼每一個ASCII字符。
2. 內存回收
(1)獲取內存統計信息:優化內存前需要了解集群的內存回收頻率、內存回收耗費時間等信息,可以在spark-env.sh中設置 SPARK_JAVA_OPTS=“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps $ SPARK_JAVA_OPTS”來獲取每一次內存回收的信息。
(2)優化緩存大小:默認情況Spark采用運行內存(spark.executor.memory)的60%來進行RDD緩存。這表明在任務執 行期間,有40%的內存可以用來進行對象創建。如果任務運行速度變慢且JVM頻繁進行內存回收,或者內存空間不足,那么降低緩存大小設置可以減少內存消 耗,可以降低spark.storage.memoryFraction的大小。
3. 頻繁GC或者OOM
針對這種情況,首先要確定現象是發生在Driver端還是在Executor端,然后在分別處理。
Driver端:通常由于計算過大的結果集被回收到Driver端導致,需要調大Driver端的內存解決,或者進一步減少結果集的數量。
Executor端:
(1)以外部數據作為輸入的Stage:這類Stage中出現GC通常是因為在Map側進行map-side-combine時,由于group 過多引起的。解決方法可以增加partition的數量(即task的數量)來減少每個task要處理的數據,來減少GC的可能性。
(2)以shuffle作為輸入的Stage:這類Stage中出現GC的通常原因也是和shuffle有關,常見原因是某一個或多個group 的數據過多,也就是所謂的數據傾斜,最簡單的辦法就是增加shuffle的task數量,比如在SparkSQL中設置SET spark.sql.shuffle.partitions=400,如果調大shuffle的task無法解決問題,說明你的數據傾斜很嚴重,某一個 group的數據遠遠大于其他的group,需要你在業務邏輯上進行調整,預先針對較大的group做單獨處理。
【修改序列化】
使用Kryo序列化,因為Kryo序列化結果比Java標準序列化更小,更快速。具體方法:spark-default.conf 里設置spark.serializer為org.apache.spark.serializer.KryoSerializer 。
參考官方文檔(http://spark.apache.org/docs/latest/tuning.html#summary):對于大多數程序而言,采用Kryo框架以及序列化能夠解決性能相關的大部分問題。
【Spark 磁盤調優】
在集群環境下,如果數據分布不均勻,造成節點間任務分布不均勻,也會導致節點間源數據不必要的網絡傳輸,從而大大影響系統性能,那么對于磁盤調優最好先將數據資源分布均勻。除此之外,還可以對源數據做一定的處理:
1. 在內存允許范圍內,將頻繁訪問的文件或數據置于內存中;
2. 如果磁盤充裕,可以適當增加源數據在HDFS上的備份數以減少網絡傳輸;
3. Spark支持多種文件格式及壓縮方式,根據不同的應用環境進行合理的選擇。如果每次計算只需要其中的某幾列,可以使用列式文件格式,以減少磁盤I/O, 常用的列式有parquet、rcfile。如果文件過大,將原文件壓縮可以減少磁盤I/O,例如:gzip、snappy、lzo。
【其他】
廣播變量(broadcast)
當task中需要訪問一個Driver端較大的數據時,可以通過使用SparkContext的廣播變量來減小每一個任務的大小以及在集群中啟動 作業的消耗。參考官方文檔http://spark.apache.org/docs/latest /tuning.html#broadcasting-large-variables。
開啟推測機制
推測機制后,如果集群中,某一臺機器的幾個task特別慢,推測機制會將任務分配到其他機器執行,最后Spark會選取最快的作為最終結果。
在spark-default.conf 中添加:spark.speculation true
推測機制與以下幾個參數有關:
1. spark.speculation.interval 100:檢測周期,單位毫秒;
2. spark.speculation.quantile 0.75:完成task的百分比時啟動推測;
3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啟動推測。
總結
Spark系統的性能調優是一個很復雜的過程,需要對Spark以及Hadoop有足夠的知識儲備。從業務應用平臺(Spark)、存儲 (HDFS)、操作系統、硬件等多個層面都會對性能產生很大的影響。借助于多種性能監控工具,我們可以很好地了解系統的性能表現,并根據上面介紹的經驗進 行調整。
作者簡介:田毅,亞信科技大數據平臺部門研發經理,Spark Contributor,北京Spark Meetup發起人,主要關注SparkSQL與Spark Streaming。
