關于CarbonData+Spark SQL的一些應用實踐和調優經驗分享
大數據時代,中大型企業數據的爆發式增長,幾乎每天都能產生約 100GB 到 10TB 的數據。而企業數據分系統構建與擴張,導致不同應用場景下大數據冗余嚴重。行業亟需一個高效、統一的融合數倉,從海量數據中快速獲取有效信息,從而洞察機遇、規避風險。
在這樣的現狀下,CarbonData 誕生了,作為首個由中國貢獻給Apache社區的頂級開源項目,CarbonData 提供了一種新的融合數據存儲方案,以一份數據同時支持多種大數據應用場景,并通過豐富的索引技術、字典編碼、列存等特性提升了 IO 掃描和計算性能,實現了PB數據級的秒級響應。
為了幫助開發者深入了解并學習這項大數據開源技術,華為 CarbonData PMC 陳亮牽頭,攜手技術社區的核心開發者及合作伙伴,舉辦了一場Apache CarbonData+Spark 主題的技術交流會,就 CarbonData+Spark 的重要特性和使用介紹,做了全面而細致的分享,本文簡單整理了其中的部分精彩內容,同時,作為本次活動的承辦方,InfoQ整理上傳了所有講師的演講PPT+演講視頻,感興趣的同學可以免費獲取現場完整資料 。
Spark SQL的發展史概述 (講師PPT下載)
來自美國Databricks公司的范文臣首先講述了Spark SQL的發展史,范文臣同時也是Apache Spark PMC member,主導 Spark SQL 一些主要功能的設計和研發,定期審計項目代碼質量等。現場,他將Spark SQL過去的發展分為四個階段:
- 2009年,著名的Spark框架誕生。 它是一個圍繞速度、易用性和復雜分析構建的大數據處理框架,由伯克AMP實驗室創建。相比于當時流行的Hadoop,Spark提供了更高效的MapReduce模型,減少數據落地,也降低了編程難度。
- 2011年,Spark團隊將Hive的底層物理執行模塊從Hadoop切換成Shark,啟動了Shark項目。 然而,由于Hive自身的代碼復雜性以及和Hadoop MapReduce的耦合,Shark的開發舉步維艱,進展緩慢。
- 2014年,Spark團隊舍棄Shark,重新建立了一套完整的查詢框架Catalyst。 Catalyst利用了函數式風格的不可變特性,使Query Plan不可變,優化器通過遍歷優化策略生成新的 Query Plan。這樣優化規則之間的影響更容易理解,提升了代碼的可讀性和可維護性,也方便了新特性的開發。下圖為Spark SQL控制框架:
- 2015年,Spark團隊提出了鎢絲計劃,通過建立Tungsten格式、后端優化、代碼生成等手段,將Spark的查詢性能和執行速度提升到了一個新的臺階。
- 2017年,持續探索中……
那么,沿著查詢性能這條路,Spark的未來還會有哪些優化方向?范文臣在最后的演講中總結到:Spark的愿景是管理各種不同性質數據集和數據源的大數據處理的需求。Spark這樣一個角色,只關注于計算層,快速查詢處理是Spark唯一的衡量標準,也是未來不變的發展方向。也因此,在之后的Spark2.3里面,在計算框架下如何更快的和儲存系統橋接、Spark代碼生成都是未來著重關注的方向。
CarbonData應用實踐+2.0新技術規劃介紹 (講師PPT下載)
CarbonData誕生之初是希望以一份數據去滿足企業各種各樣的場景需求,包括詳單過濾和海量數倉以及數據集式操作等。那么,開發者該如何正確使用CarbonData技術?華為CarbonData總設計師李昆結合實際案例,詳細講解了CarbonData應用實踐+2.0新技術規劃。
CarbonData大數據生態
Carbondata在數據查詢方面選擇和Spark結合,據李昆現場介紹,Carbondata+Spark可以打造一個相對于傳統系統來說,更好的交互分析體驗,目前Carbondata和Spark1.5、1.6、2.1,Hive,Presto都做了集成,未來還將對Spark2.2做支持;在接口方面,Carbondata提供SQL接口,也支持Spark DataFrame API;在操作方面,支持查詢、數據管理如批量入庫、更新、刪除等操作。
隨后,李昆就CarbonData索引建立、CarbonData表格與物理存儲、SQL引擎對接、數據管理過程等技術內容做了詳細介紹。由于篇幅限制,本文不在此介紹,感興趣的讀者可以下載講師PPT對CarbonData的存儲原理進行深入了解。
成功案例介紹
隨后,李昆通過電信詳單分析場景的舉例介紹,詳細說明CarbonData如何以一份數據支持多種應用場景的。李昆表示,在電信跟金融領域經常需要明細數據分析,優化之前,老的系統需要用Impala和Hbase兩個系統,建立4個二級索引才可以完成業務需要的性能。這其中,Impala用來做報表輸出,Hbase做關鍵維度查詢。這兩個系統有各自存在不足:Impala沒有辦法很好的擴展,HBase要做很多二級索引,無法使用yarn統一資源管理,只能是一個個集群單獨維護。
用Carbondata+Spark數據優化后,可以解決既要點查又要處理報表的情況。下圖是一個從2000億到1萬億的性能測試數據,Q1是過濾查詢,Q2也是過濾查詢,Q1跟Q2數據查詢因為用了Carbondata索引,需要掃描的數據不會增長很多,數據量增長5倍,查詢時間增長不到1倍。第三個查詢是full scan查詢,主要考察的是spark和carbon的可擴展性,測試過程中發現擴展性是非常線性的,scalability很好。
CarbonData2.0未來規劃
現在,Carbondata的主要特性是對多場景的支持,不過在大數據時代,更多的場景正撲面而來。包括SQL分析、時間序列分析、位置軌跡、文本檢索、圖查詢和機器學習等。這就需要Carbondata2.0在各領域的應用上有更多的準備。包括:
- 入庫方面,需要考慮實時事件的流式入庫、歷史事件的批量入庫等;
- 存儲方面分三層,一層是界面,每一個領域有自己的術語,會針對領域常見操作做些SQL上的擴展;二是數據組織層,對不同領域做不同的分區、索引和預處理等,以便于它更高效地存儲領域數據;三是存儲格式層,Carbondata目前是列存,為了支撐更多查詢和分析,數據格式本身也需要具有擴展能力,比如行存、時序、面向AI的格式等;
Spark 2.2 核心特性CBO介紹 (講師PPT下載)
在Spark SQL的Catalyst優化器中,許多基于規則的優化技術已經實現,但優化器本身仍然有很大的改進空間。Spark 2.2在Spark SQL引擎內添加了一個基于成本的優化器框架,此框架通過可靠的統計和精確的估算,能夠在以下領域做出好的判定:選擇散列連接操作的正確構建端,選擇正確的連接算法,調整連接的順序等等,這個基于成本的優化器就是CBO。據華為研究工程師王振華介紹,CBO的目標是希望優化器能夠自動為用戶選擇最優的執行計劃,要達到這件事情,需要以下三個步驟:
第一步收集、推斷和傳播關于源/中間數據的表/列統計信息。用戶運行 ANALYZE TABLE 命令會收集表格信息比如表的行數、大小,列的統計信息比如最大值、最小值、不同值個數等,并將這些信息存儲到metastore里面。
第二步Cardinality Estimation,根據收集到的信息,計算每個操作符的成本,包括輸出行數、輸出大小等。如做filter時寫一個過濾條件,給定的條件會基于條件里面涉及列的統計信息,估算過濾條件執行完了以后,Operator有多少數據。
如下圖,為一個A小于等于某數字的估算,如果A的value比A的最小值更小,或者是比A的最大值更大,那么過濾率肯定是0或者100%,當落在定義域中間的時候,假設是均勻分布,概率則是A.min到B的區間所占A的定義域的百分比,這個是Filter條件最終的selectivity,有了selectivity,即可再相應的更新filter以后的統計信息。
第三步根據成本計算,選擇最優的查詢執行計劃。通過建造方選擇(Build Side Selection)、散列連接實現:廣播與洗牌(Hash Join Implementation: Broadcast vs. Shuffle)、多路連接重新排序(Multi-way Join Reorder)、連接成本計算公式(Join Cost Formula)四個方面闡述了最優計劃的選擇過程。
其中,在多路連接重新排序方法上,采用了動態規劃算法。以四表連接為例,首先,將所有項(基本連接節點)放到0級;然后,從第0級的計劃中構建所有的兩表連接;第三,從以前的層級(單節點和兩表連接)中構建出可能的三表連接;最后,構建所有的4路連接,并在其中選出最優的計劃。而在構建m-路徑連接時,只需保留同一組m項的最佳計劃(最優子解決方案)。如,對于A、B、C的三表連接順序,只保留三個候選計劃:(A J B)J C,(A J C)J B和(B J C)J A 當中最優的計劃。
Join cost計算方式如下,首先Cost一般來說傳統的數據庫里是基于CPU和IO,這兩個Cost是線性加合。在Spark中,用Cardinality模擬CPU的開銷,用size模擬IO的開銷。
王振華最后介紹到,華為在2016年7月份開始將CBO貢獻給Spark社區,并建立了umbrella ticket - SPARK-16026。截至目前為止,創建了超過40個sub-tasks、提交了50余個pull requests并被合入,同時吸引了十余個社區貢獻者的參與。
CBO的第一個版本已經在Spark 2.2中發布,感興趣的開發者和使用者,如要使用CBO,可以在收集統計信息之后,打開spark.sql.cbo.enable來使用CBO。
Partition 功能詳解+上汽實踐分享 (講師PPT下載)
CarbonData的partition特性將在Apache CarbonData 1.2.0版本里正式發布,此特性將顯著提升大數據查詢性能。上汽集團大數據將CarbonData作為平臺基礎組件,以應對迅猛增長的數據量,那么上汽集團在使用CarbonData過程中遇到了哪些問題?上汽集團大數據平臺開發經理曹魯就CarbonData的partition特性以及上汽集團在CarbonData項目的實踐和測試數據做了分享。
曹魯首先介紹了文件結構,索引生成過程,初次性能測試等主題內容,引出Partition特性帶來改變,主要包括兩點:1、數據將基于Partition列更為集中存儲,查詢時可過濾掉大量block,減少spark task數量;2、可以使其他列在排序中更靠前,提升查詢性能。
Partition Table的數據加載及查詢過程詳解
隨后,曹魯詳細介紹了CarbonData Partition相關的DDL語法,如Create Partition Table、Show Partition等,以及CarbonData Partition Table的數據加載以及查詢過程。下圖可以很清晰的看到CarbonData Partition的整個數據加載過程。
關于CarbonData Partition Table查詢過程,大概分為兩個部分:
- 根據SQL中的過濾條件=, <=, <, >, >=, in, not in以及表達式右值確定命中的partitionId
- 如果有其他在排過序的維度列有過濾條件,則在driver端根據B-tree索引獲取blocklet 所在的文件名,如沒有則獲取全部,再根據文件名中的partitionId,篩選得到需要讀取的文件,最后再下發spark task進行讀取;
之后,曹魯就Partition的新增(add)、拆分(split)及刪除(drop)功能的語法和實現過程展開了分析,其中重點區分了Drop Partition但保留數據RangePartition/ListPartition兩種Drop Partition類型的不同語法與實現,感興趣的讀者可以下載講師PPT深入了解。
上汽在CarbonData項目的實踐分享
在案例分享環節,曹魯以上汽的數據作為測試數據,分析了CarbonData Partition table和非Partition table條件下的加載性能和查詢性能對比。并給出了CarbonData Partition的性能調優建議。本文為大家展示其中的無排序維度列作為過濾條件,有partition列上的范圍過濾條件的聚合查詢情況的對比結果,如圖不難看出,原始查詢方式的耗時是添加partition性能查詢方式耗時的25倍。
曹魯給出的CarbonData Partition的性能調優建議:1、 選擇最合適的Partition列;2、盡可能的使用Partition列作為過濾條件,例如Partition列為A,開發者根據業務需求在Column B上有篩選條件,但注意到A與B列之間存在某種固定的mapping關系,這時就可以根據B列的過濾條件再新增一個partition列的過濾條件,以提高查詢效率。
現場精彩問答整理
Q: 客戶在使用Spark時不愿意編寫代碼,更喜歡給他一個頁面能能夠直接生成SQL,Spark后面會不會更多的偏向于業務人員做一些更易應用的東西出來,比如可以直接出來一個頁面?
A:Spark本身不會往這方面走,因為Spark只專注于做計算這層,這個模式一般是另外一個項目,比如有項目zpplin是專門做供應GIU的,可以在zpplin上面調Spark的一些接口,這些會單獨立項,而不是在Spark里面做。
Q:剛才提到carbon有一個目標,能夠盡量多的支持各種場景,目前我們也做過一些測試,某些特定情況下,不同的場景可能在響應速度和并發性上有比較大的差距,這一點后面有沒有改善?
A:這方面需要跟Spark一起聯合做優化,因為Spark是端到端的,從元數據查詢到SQL優化到DAG調度執行,有很多中間過程處理會耗時,建議你做一下打點分析,看主要瓶頸是哪一塊,同時carbon和spark我們也可以做一些聯合優化,相信基于社區的努力后面會有改善。
Q:如果有新的數據添加進來,CarbonData統計信息如何更新?
A:有兩種方式,一種是比較簡單的,每次數據表更新重新計算增量,這樣比較精確但是會比較慢,另外一種方式是增量的更新統計信息,這種方式較前一種可能會稍微復雜一些。
Q:在用Spark寫Carbondata Partition的時候,并行比較高,導致每個分區下出現很多小文件,這樣有什么好的解決辦法?
A:在CarbonData中每一個Block的大小是可以設置的,Blocklet也可以設置的,在load數據的時候,寫滿一個block的默認大小就會重新再寫一個文件,所以可以設置Block大小來解決這個問題。另外定期使用CarbonData的compaction功能也可以合并一些小文件,當然后面我們也會考慮開發merge partition的功能來給用戶提供更多選擇。
來自:http://www.infoq.com/cn/news/2017/09/CarbonData-Spark-huawei