解讀2015之Spark篇:新生態系統的形成
2015年的Spark社區的進展實在是太快了,我發現1月份出版的一本參考書到現在已經有很多內容是過時的了。社區大踏步前行的同時,用戶和應用案例也越來越多,應用行業越來越廣泛。到年底了我們來梳理下Spark這快速發展的一年。
先從全局有個認識,我嘗試用三句話來概括下Spark最主要的變化,然后在接下來的篇幅選取一些重點內容展開。
- Spark生態系統漸趨完善。支持的外部數據源越來越多,支持的算子越來越豐富,自身的機器學習算法越來越完善。同時在API支持上也有很大進步,新增加的R語言API使得Spark能被更多的行業所接受。
- Spark的應用范圍和規模在不斷擴大。在互聯網和電子商務行業的應用不斷增多、規模不斷擴大的基礎上,越來越多的金融、電信、制造業等傳統行業也開始使用Spark解決他們遇到的大數據問題。
- Spark自身的性能和穩定性在不斷提升。Tungsten項目讓Spark跑的越來越快,越來越多的代碼貢獻者和使用經驗讓Spark越來越穩定。相信明年發布的Spark 2.0將是一個里程碑式的版本。
轉向以DataFrame為核心
在傳統意義上Spark的核心是RDD和RDD之上的各種transformation和action,也就是各種算子,RDD可以認為是分布式的Java對象的集合。2013年推出了DataFrame,可以看做分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點就是有執行計劃的優化器,這樣用戶只需要指定自己的操作邏輯,DataFrame的優化器會幫助用戶選擇一條效率最優的執行路徑。同時Tungsten優化(下一章重點講)使得DataFrame的存儲和計算效率比RDD高很多。Spark的機器學習項目MLlib的ML pipeline就是完全基于DataFrame的,而且未來Streaming也會以DataFrame為核心。
(圖片引自Databricks)
Tungsten讓Spark越來越快
那么為什么DataFrame比RDD在存儲和計算上的效率更高呢?這主要得益于Tungsten項目。Tungsten做的優化概括起來說就是由Spark自己來管理內存而不是使用JVM,這樣可以避免JVM GC帶來的性能損失;內存中的Java對象被存儲成Spark自己的二進制格式,更加緊湊,節省內存空間,而且能更好的估計數據量大小和內存使用情況;計算直接發生在二進制格式上,省去了序列化和反序列化時間。
像傳統的Hadoop/Hive系統,磁盤IO是一個很大的瓶頸。而對于像Spark這樣的計算框架,主要的瓶頸在于CPU和內存。下面看看Tungsten主要做了哪些優化:
1,基于JVM的語言帶來的問題:GC問題和Java對象的內存開銷。例如一個字符串”abcd”理論上只有4個bytes,但是用Java String類型來存儲卻需要48個bytes。Spark的改進就是自己管理內存,不用JVM來管理了,使用的工具是sun.misc.Unsafe。DataFrame的每一行就是一個UnsafeRow,這塊內存存的啥東西只有Spark自己能讀懂。有了這種特有的二進制存儲格式后,DataFrame的算子直接操控二進制數據,同時又省去了很多序列化和反序列化的開銷。
2,Cache-aware的計算。現在Spark已經是內存計算引擎了,但是能不能更進一步呢,能不能更好的利用CPU的L1/L2/L3緩存的優勢呢,因為CPU緩存的訪問效率更高。這個優化點也不是意淫出來的,是在profile了很多Spark應用之后得到的結論,發現很多CPU的時間浪費在等待從內存中取數據的過程。所以在Tungsten中就設計和實現了一系列的cache-friendly的算法和數據結構來加速這個過程,例如aggregations, joins和shuffle操作中進行快速排序和hash操作。
以sort為例,Spark已經實現了cache-aware的sort算法,比原來的性能提升至少有3倍。在傳統的排序中是通過指針來索引數據的,但是缺點就是CPU cache命中率不夠高,因為我們需要隨機訪問record做比較。實際上quicksort算法是能夠非常好的利用cache的,主要是我們的record不是連續存儲的。Spark的優化就是存儲一個key prefix和指針在一起,那么就可以通過比較key prefix來直接實現排序,這樣CPU cache的命中率就會高很多。例如如果我們需要排序的列是一個string類型,那么我們可以拿這個string的UTF-8編碼的前8個字節來做key prefix,并進行排序。
關于這個優化可以參見SPARK-9457 和org.apache.spark.shuffle.sort下面的類,最重要的是ShuffleExternalSorter和ShuffleInMemorySorter兩個類。
3,運行時代碼生成
運行時代碼生成能免去昂貴的虛函數調用,同時也省去了對Java基本類型裝箱之類的操作了。Spark SQL將運行時代碼生成用于表達式的求值,效果顯著。
除了這些優化,我認為還有兩個很重要的變化:
1,Unified Memory Management
在以前Spark的內存顯式的被分為三部分:execution,storage和其他。execution內存用于shuffle, join, sort和aggregation等操作,而storage內存主要用于cache數據。在1.6版本之前是通過spark.shuffle.memoryFraction和spark.storage.memoryFraction兩個參數來配置用于execution和storage的內存份額。從1.6開始這兩部分內存合在一起統一管理了,也就是說如果現在沒有execution的需要,那么所有的內存都可以給storage用,反過來也是一樣的。同時execution可以evict storage的部分內存,但是反過來不行。在新的內存管理框架上使用兩個參數來控制spark.memory.fraction和spark.memory.storageFraction。
2,Adaptive query execution
這個特性說大了就是所有數據庫最核心的一個功能query execution optimization,可以做的東西非常多。我們自己寫Spark程序中經常會碰到一個job跑到最后每個分區的數據量很小的情況,這是因為以前的Spark不會估計下游RDD的每個分區的數據量大小,并根據數據量大小來調整分區個數。以前遇到這種問題就需要手工repartition,用戶自己要心里有數到哪個階段的RDD的partition數據變多了還是變少了,需要跟著調整分區的數目,非常不靈活。從1.6版本開始有了部分支持,主要是能夠估計在join和aggregate操作中Shuffle之后的分區的數目,動態調整下游task的數目,從而提高執行效率。
DataFrame和SQL API
Spark從API的角度看,可以分為兩大類:
- 類似于Python的Pandas和R語言的DataFrame API,用戶可以使用Scala/Java/Python/R四種語言調用這個API處理數據;
- SQL語言API。又分為兩種:一個是普通的Spark SQL,一種是Hive SQL。
雖然API不同,但是背后解析出來的算子是一樣的,DataFrame的各種算子其實就是各種SQL的語法。Spark在SQL語法的支持越來越豐富的同時內置的SQL函數得到了很大的增強,目前已經有超過100個這樣的常用函數(string, math, date, time, type conversion, condition),可以說最常見的SQL內置函數都有了。
作為一個類SQL的分析工具,聚合函數是非常核心的。Spark 1.5和1.6在聚合函數上都有很大改進:實現了一個新的聚合函數接口,支持了一些build-in的聚合函數(例如max/min/count/sum/avg/first/corr/stddev/variance/skewness/kurtosis以及一些窗口函數等),同時基于新接口實現了相應的UDAF接口。新的聚合函數接口是AggregateFunction,有兩種具體的實現:ImperativeAggregate和DeclarativeAggregate。ImperativeAggregate類型的聚合操作就是通過用戶定義三個動作 initialize/update/merge的邏輯來實現聚合的;而DeclarativeAggregate則是通過指定initialValues/updateExpressions/mergeExpressions這三個表達式然后通過代碼生成的方式來做聚合的操作。這兩種方式各有利弊,一般來說代碼生成效率更高,但是像variance/stddev/skewness/kurtosis這樣的多個表達式需要依賴同一個中間表達式的場景下,代碼生成的執行路徑由于不能共享中間的結果,從而導致其不如ImperativeAggregate效率更高,所以在Spark內部的實現中這幾個聚合函數也是通過ImperativeAggregate來實現的。
SQL API上另一個變化是可以直接在文件上進行SQL操作,不需要把這個文件注冊成一個table。例如支持”select a, b from json.`path/to/json/files`”這樣的語法,這個應該是從Apache Drill借鑒過來的。
另外一個里程碑式的特性就是Dataset API(SPARK-9999)。Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record存儲的是一個強類型值而不是一個Row。這個強類型的值是以編碼的二進制形式被存儲的,這種存儲格式可以不用反序列化就直接可以被上面的算子(例如sort,Shuffle等)操作。所以在創建Dataset的時候需要指定用于這個編碼工作的Encoder。
這樣一些需要強類型的地方就可以使用Dataset API,不失DataFrame的那些優點,同時又可以幫我們做類型檢查。所以從某種角度上說這個Dataset API在將來是要替換掉RDD的。
外部數據源和Hive支持
這個feature可以說是建立起Spark生態系統的基礎,使得Spark與大數據生態圈的其他組件聯系起來了。可以這么理解,你無論數據是在HDFS上,還是在Cassandra里面,抑或關系型數據庫里面,我Spark都可以拿過來做分析和處理,或者機器學習,我這邊處理完了你讓我寫到哪去我就可以寫出去。這個特性使得Spark成為了大數據處理的核心一環。目前Spark支持的外部數據源有很多種,主流的像Parquet,JSON,JDBC,ORC,AVRO,HBase,Cassandra,AWS S3,AWS Redshift等。
在這些外部數據源中,Parquet是其中最核心的,Spark的Parquet支持也有了很大的改進:修復了越來越多的bug,Parquet的版本升級到1.7;更快的metadata discovery和schema merging;能夠讀取其他工具或者庫生成的非標準的parquet文件;以及更快更魯棒的動態分區插入;對于flat schema的Parquet格式的數據的讀性能提升了大約1倍(SPARK-11787)。
另外在Hive支持方面,越來越多的Hive特有的SQL語法被加入到Spark中,例如DISTRIBUTE BY... SORT等。支持連接Hive 1.2版本的metastore,同時支持metastore partition pruning(通過spark.sql.hive.metastorePartitionPruning=true開啟,默認為false)。因為很多公司的Hive集群都升級到了1.2以上,那么這個改進對于需要訪問Hive元數據的Spark集群來說非常重要。
機器學習算法
Spark在機器學習方面的發展很快,目前已經支持了主流的統計和機器學習算法。雖然和單機的機器學習庫相比MLlib還有一定的差距;但是縱觀所有基于分布式架構的開源機器學習庫,MLlib是我認為的計算效率最高的。下面列出了目前MLlib支持的主要的機器學習算法:
Discrete |
Continous |
|
Supervised |
Classification LogisticRegression(with Elastic-Net) SVM DecisionTree RandomForest GBT NaiveBayes MultilayerPerceptron OneVsRest |
Regression LinearRegression(with Elastic-Net) DecisionTree RandomForest GBT AFTSurvivalRegression IsotonicRegression |
Unsupervised |
Clustering KMeans GaussianMixture LDA PowerIterationClustering BisectingKMeans |
Dimensionality Reduction, matrix factorization PCA SVD ALS WLS |
下面簡單說下其中一些亮點:
1,MLlib已經有了對Generialized Linear Model(GLM)的初步支持:GLM是統計學里一系列應用非常廣泛的模型,指定不同的family可以得到不同的模型。例如“Gaussian” family相當于LinearRegression模型, “Bionomial” family相當于LogisticRegression模型,“Poisson” family相當于SurvivalRegression模型。目前MLlib已經提供了這三種模型的機器學習解法和LinearRegression的normal equation解法。
下面詳細說說這兩種解法:一種是利用WeightedLeastSquares(WLS)優化方法;另一種是利用L-BFGS優化方法。前者是通過解normal equation的思路求解,只需要對所有的數據過一遍(不需要迭代)即可得到最后的模型(好像很神奇),同時可以算出像coefficients standard errors, p-value, t-value等統計指標幫助用戶理解模型,這種解法是目前LinearRegression的默認解法。不過有個限制就是樣本feature的維度不能超過4096,但對樣本數目沒有限制。后者就是傳統機器學習的解法,是通過迭代尋找最優解的方法。
而且目前MLlib也在進一步研究IterativelyReweightedLeastSquares(IRLS)算法,然后結合WLS和IRLS就可以使Spark支持類似R GLM的大多數功能。這樣對于前面所有的三種模型都將提供兩種解法,用戶可以根據實際情況選擇合適的解法。這個全部做完預計要在下一個版本Spark 2.0了。
2,ALS算法可以說是目前MLlib被應用最多的算法,ALS的數學原理其實比較容易理解,但是如何在分布式系統中高效實現是一個比較復雜的問題。MLlib里使用分塊計算的思路,合理的設計數據分區和 RDD 緩存來減少數據交換,有效的降低了通信復雜度。這個算法的實現思路充分說明了一個問題:同樣的算法,在分布式系統上實現時,不同的選擇會帶來性能上巨大的差異。
3,目前的主要的分類模型都支持使用predictRaw, predictProbability, predict分別可以得到原始預測、概率預測和最后的分類預測。同時這些分類模型也支持通過設置thresholds指定各個類的閾值。不過目前這些預測函數還只支持批量預測,也就是對一個DataFrame進行預測,不支持對單個instance進行預測。不過單個instance的預測的支持也已經在roadmap中了。
4,RandomForestClassificationModel和RandomForestRegressionModel模型都支持輸出feature importance
5,GMM EM算法實現了當feature維度或者cluster數目比較大的時候的分布式矩陣求逆計算。實驗表明當feature維度>30,cluster數目>10的時候,這個優化性能提升明顯。
6,在深度學習方面,MLlib已經實現了神經網絡算法MultilayerPerceptronClassifier(MLPC)。 這是一個基于 前饋神經網絡 的分類器,它是一種在輸入層與輸出層之間含有一層或多層隱含結點的具有正向傳播機制的神經網絡模型,中間的節點使用sigmoid (logistic)函數,輸出層的節點使用softmax函數。輸出層的節點的數目表示分類器有幾類。MLPC學習過程中使用 BP算法 ,優化問題抽象成logistic loss function并使用L-BFGS進行優化。
7,除了我們經常說的機器學習和數據挖掘算法,MLlib在統計檢驗算法上也有很大的進步。例如 A/B test , Kolmogorov–Smirnov 檢驗等。A/B測試可以說是很多大數據應用的基礎,很多結論最終都是通過A/B測試得到的。
機器學習Pipeline
MLlib最大的變化就是從一個機器學習的library開始轉向構建一個機器學習工作流的系統,這些變化發生在ML包里面。MLlib模塊下現在有兩個包:MLlib和ML。ML把整個機器學習的過程抽象成 Pipeline ,一個Pipeline是由多個Stage組成,每個Stage是Transformer或者Estimator。
以前機器學習工程師要花費大量時間在training model之前的feature的抽取、轉換等準備工作。ML提供了多個Transformer,極大提高了這些工作的效率。在1.6版本之后,已經有了30+個feature transformer,像OneHotEncoder, StringIndexer, PCA, VectorSlicer, VectorAssembler, SQLTransformer, HashingTF, IDF, Word2Vec等。這些工具使得各種feature提取、轉換等準備工作。
原始數據經過上面的各種feature transformer之后就可以丟到算法里去訓練模型了,這些算法叫Estimator,得到的模型是Transformer。上一章節已經提到了主流的算法支持,所以現在可以得到模型了。得到的模型怎么評價好壞呢?MLlib提供了像BinaryClassificationEvaluator等一系列的evaluation工具,這些工具里面定義了像AUC等一系列的指標用于評價模型的好壞。
這樣一個機器學習的pipeline就可以跑起來了,MLlib進一步提供了像CrossValidator這樣的工具幫助我們做模型和pipeline的調優,選出最佳的模型參數等。另外一個重要特性就是pipeline的持久化。現在的ML pipeline和大多數的Transformers/Estimators都支持了持久化,可以save/load。這樣就可以把模型或者pipeline存儲、導出到其他應用,掃除了MLlib在生產環境應用的最后一個障礙。
另外一個顯著變化就是ML框架下所有的數據源都是基于DataFrame,所有的模型也盡量都基于Spark的數據類型(Vector, Matrix)表示。在ML里面的public API下基本上看不到對RDD的直接操作了,這也與前面講的Spark的未來變化是一致的。
Python是機器學習領域最流行的語言,ML/MLlib的Python API也在不斷加強,越來越多的算法和功能的Python API基本上與Scala API對等了。
SparkR
R語言在統計領域應用非常廣泛,R語言本身的架構可以比較容易支持其他執行后端。SparkR就是把Spark作為R語言的執行后端。目前SparkR提供的接口包括了DataFrame的絕大多數函數以及機器學習GLM函數,未來還會支持更多的功能。SparkR既可以利用R接口的易用性以及在傳統行業的既有市場空間,又可以利用Spark強大的分布式數據處理能力。SparkR在推出不久就獲得了很高的用戶關注度和使用率。筆者在和一些傳統行業的大數據從業者交流中經常會被問到這樣的問題:我以前用R寫的程序,現在數據量大了,傳統R跑不動了,能不能直接放到Spark上跑。相信SparkR就是在朝著解決這個問題的方向努力。
在統計和機器學習方面一個重要的feature就是R formula的支持,R用戶可以用他們非常熟悉的formula來定義一個GLM模型。目前已經支持最基本的'.', '~', ':', '+'和 '-',未來還會增強這項功能。同時SparkR的GLM函數不只提供了訓練一個GLM模型和預測的能力,也提供了對模型的R類似的統計指標輸出。目前SparkR跑出的GLM模型和傳統R跑出的模型的結果是一樣的,同時也會輸出coefficients standard errors, p-values, t-values等統計信息。
Spark 2.0
Spark streaming等組件在這一年也有很大的變化,筆者由于精力有限,在此不一一列出。Spark 2.0預計明年三四月份發布,將會確立以DataFrame和Dataset為核心的體系架構,RDD會慢慢退出歷史舞臺;同時在各方面的性能上會有很大的提升,當然我們也期待著穩定性方面的提升。從1.x到2.x還會放棄Hadoop 1.x的支持,RPC系統從Akka遷移到Netty等。快速發展的社區,越來越多的應用,性能和穩定性方面的不斷提升使得Spark在未來的若干年內還是大數據處理工具的首選。
作者介紹:
梁堰波,明略數據技術合伙人,開源愛好者,Apache Spark項目核心貢獻者。北京航空航天大學計算機碩士,曾就職于Yahoo!、美團網、法國電信從事機器學習和推薦系統相關的工作,在大數據、機器學習和分布式系統領域具備豐富的項目經驗。
來自: http://www.infoq.com/cn/articles/2015-Review-Spark