Apache Spark 1.5新特性介紹
Apache Spark社區剛剛發布了 1.5 版本,大家一定想知道這個版本的主要變化,這篇文章告訴你答案。
DataFrame執行后端優化(Tungsten第一階段)
DataFrame可以說是整個Spark項目最核心的部分,在1.5這個開發周期內最大的變化就是Tungsten項目的 第一階段 已經完成。主要的變化是 由 Spark 自己來管理內存而不是使用JVM,這樣可以避免JVM GC帶來的性能損失。內存中的Java對象被存儲成Spark自己的二進制格式,計算直接發生在二進制格式上,省去了序列化和反序列化時間。同時這種格式 也更加緊湊,節省內存空間,而且能更好的估計數據量大小和內存使用情況。如果大家對這部分的代碼感興趣,可以在源代碼里面搜索那些Unsafe開頭的類即 可。在1.4版本只提供UnsafeShuffleManager等少數功能,剩下的大部分都是1.5版本新加入的功能。
其他優化還包括默認使用code generation; cache-aware算法對join, aggregation, shuffle, sorting的增強;window function性能的提高等。
那么性能到底能提升多少呢?可以參考DataBricks給出的這個例子。這是一個16 million行的記錄,有1 million的組合鍵的aggregation查詢分別使用Spark 1.4和1.5版本的性能對比,在這個測試中都是使用的默認配置。
那 么如果我們想自己測試下Tungsten第一階段的性能改如何測試呢?Spark 1.4以前的版本中spark.sql.codegen, spark.sql.unsafe.enabled等幾個參數在1.5版本里面合并成spark.sql.tungsten.enabled并默認為 true,只需要修改這一個參數就可以配置是否開啟tungsten優化(默認是開啟的)。
DataFrame/SQL/Hive
在DataFrame API方面,實現了新的聚合函數接口AggregateFunction2以及7個相應的build-in的聚合函數,同時基于新接口實現了相應的 UDAF接口。新的聚合函數接口把一個聚合函數拆解為三個動作: initialize/update/merge,然后用戶只需要定義其中的邏輯既可以實現不同的聚合函數功能。Spark的這個新的聚合函數實現方法和 Impala 里面非常類似。
Spark內置的 expression function 得 到了很大的增強,實現了100多個這樣的常用函數,例如string, math, unix_timestamp, from_unixtime, to_date等。同時在處理NaN值的一些特性也在增強,例如 NaN = Nan 返回true;NaN大于任何其他值等約定都越來越符合SQL界的規則了。
用戶可以在執行join操作的時候指定把左邊的表或者右邊的表broadcast出去,因為基于 cardinality的估計并不是每次都是很準的,如果用戶對數據了解可以直接指定哪個表更小從而被broadcast出去。
Hive 模塊最大的變化是支持連接Hive 1.2版本的metastore,同時支持metastore partition pruning(通過spark.sql.hive.metastorePartitionPruning=true開啟,默認為false)。因為很多 公司的Hive集群都升級到了1.2以上,那么這個改進對于需要訪問Hive元數據的Spark集群來說非常重要。Spark 1.5支持可以連接Hive 0.13, 0.14, 1.0/0.14.1, 1.1, 1.2的metastore。
在External Data Source方面,Parquet的支持有了很大的加強。Parquet的版本升級到1.7;更快的metadata discovery和schema merging;同時能夠讀取其他工具或者庫生成的非標準合法的parquet文件;以及更快更魯棒的動態分區插入。
由于Parquet升級到1.7,原來的一個重要 bug 被修復,所以Spark SQL的Filter Pushdown默認改為開啟狀態(spark.sql.parquet.filterPushdown=true),能夠幫助查詢過濾掉不必要的IO。
Spark 1.5可以通過指定spark.sql.parquet.output.committer.class參數選擇不同的output committer類,默認是org.apache.parquet.hadoop.ParquetOutputCommitter,用戶可以繼承這個類 實現自己的output committer。由于HDFS和S3這兩種文件存儲系統的區別,如果需要向S3里面寫入數據,可以使用 DirectParquetOutputCommitter,能夠有效提高寫效率,從而加快Job執行速度。
另外還有一些改動,包括:StructType支持排序功能;TimestampType的精度減小到1us;Spark現在的checkpoint是基于HDFS的,從1.5版本開始支持 基于memory和local disk的checkpoint 。這種類型的checkpoint性能更快,雖然不如基于HDFS的可靠,但是對于迭代型機器學習運算還是很有幫助的。
機器學習MLlib
MLlib最大的變化就是從一個機器學習的library開始轉向構建一個機器學習工作流的系統,這些變化發生在ML包里面。MLlib模塊下現在有兩個包:MLlib和ML。ML把整個機器學習的過程抽象成 Pipeline ,一個Pipeline是由多個Stage組成,每個Stage是Transformer或者Estimator。
以前機器學習工程師要花費大量時間在training model之前的feature的抽取、轉換等準備工作。ML提供了多個Transformer,極大提高了這些工作的效率。在1.5版本之后,已經有了25+個feature transformer,其中 CountVectorizer, Discrete Cosine Transformation, MinMaxScaler, NGram, PCA, RFormula, StopWordsRemover, and VectorSlicer這些feature transformer都是1.5版本新添加的,做機器學習的朋友可以看看哪些滿足你的需求。
這里面的一個亮點就是RFormula的支持,目標是使用戶可以把原來用R寫的機器學習程序(目前只支持GLM算法)不用修改直接搬到Spark平臺上來執行。不過目前只支持集中簡單 的R公式(包括'.', '~', '+'和 '-'),社區在接下來的版本中會增強這項功能。
另外越來越多的算法也作為Estimator搬到了ML下面,在1.5版本中新搬過來的有Naive Bayes, K-means, Isotonic Regression等。大家不要以為只是簡單的在ML下面提供一個調用相應算法的API,這里面變換還是挺多的。例如Naive Bayes原來的模型分別用Array[Double]和Array[Array[Double]]來存儲pi和theta,而在ML下面新的API里面 使用的是Vector和Matrix來存儲。從這也可以看出,新的ML框架下所有的數據源都是基于DataFrame,所有的模型也盡量都基于Spark 的數據類型表示。在ML里面的public API下基本上看不到對RDD的直接操作了,這也與Tungsten項目的設計目標是一致的。
除了 這些既有的算法在ML API下的實現,ML里面也增加了幾個新算法:
-
MultilayerPerceptronClassifier(MLPC) 這是一個基于 前饋神經網絡 的分類 器,它是一種在輸入層與輸出層之間含有一層或多層隱含結點的具有正向傳播機制的神經網絡模型,中間的節點使用sigmoid (logistic)函數,輸出層的節點使用softmax函數。輸出層的節點的數目表示分類器有幾類。MLPC學習過程中使用 BP算法 ,優化問題抽象成logistic loss function并使用L-BFGS進行優化。
-
MLlib包里面增加了一個頻繁項挖掘算法PrefixSpan,AssociationRules能夠把FreqItemset生成 關聯式規則 。
-
在MLlib的 統計包里面實現了 Kolmogorov–Smirnov 檢驗, 用以檢驗兩個經驗分布是否不同或一個經驗分布與另一個理想分布是否不同。
另外還有一些現有算法的增強:LDA算法,決策樹和 ensemble算法,GMM算法。
-
ML里面的多個分類模型現在都支持預測結果的概率而不像過去只支持預測結果,像 LogisticRegressionModel, NaiveBayesModel, DecisionTreeClassificationModel, RandomForestClassificationModel, GBTClassificationModel等,分別使用predictRaw, predictProbability, predict分別可以得到原始預測、概率預測和最后的分類預測。同時這些分類模型也支持通過設置thresholds指定各個類的閾值。
-
RandomForestClassificationModel和RandomForestRegressionModel模型都支持輸出feature importance
-
GMM EM算法實現了當feature維度 或者cluster數目比較大的時候的分布式矩陣求逆計算。實驗表明當feature維度>30,cluster數目>10的時候,這個優化性能提升明顯。
-
對于LinearRegressionModel和LogisticRegr essionModel實現了LinearRegressionTrainingSummary和LogisticRegressionTrainingSummary用來記錄模型訓練過程中的一些統計指標。
1.5版本的Python API也在不斷加強,越來越多的算法和功能的Python API基本上與Scala API對等了。此外在tuning和evaluator上也有增強。
其他
從1.5開始,Standalone, YARN和Mesos三種部署方式全部支持了動態資源分配。
SparkR支持運行在YARN集群上,同時DataFrame的函數也提供了一些R風格的別名,可以降低熟悉R的用戶的遷移成本。
在Streaming和Graphx方面也有非常大的改進,在這里不在一一贅述,詳細可以參考 release note 。