Spark中的rollup
Spark中的rollup
在對數據進行小計或合計運算時,rollup和cube一樣,算是常用的操作了。Spark的DataFrame提供了rollup函數支持此功能。
假設準備了如下數據:
trait SalesDataFrameFixture extends DataFrameFixture with SparkSqlSupport { implicit class StringFuncs(str: String) { def toTimestamp = new Timestamp(Date.valueOf(str).getTime) } import sqlContext.implicits._ val sales = Seq( (1, "Widget Co", 1000.00, 0.00, "廣東省", "深圳市", "2014-02-01".toTimestamp), (2, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-11".toTimestamp), (3, "Acme Widgets", 1000.00, 500.00, "四川省", "綿陽市", "2014-02-12".toTimestamp), (4, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-13".toTimestamp), (5, "Widget Co", 1000.00, 0.00, "廣東省", "廣州市", "2015-01-01".toTimestamp), (6, "Acme Widgets", 1000.00, 500.00, "四川省", "瀘州市", "2015-01-11".toTimestamp), (7, "Widgetry", 1000.00, 200.00, "四川省", "成都市", "2015-02-11".toTimestamp), (8, "Widgets R Us", 3000.00, 0.0, "四川省", "綿陽市", "2015-02-19".toTimestamp), (9, "Widgets R Us", 2000.00, 0.0, "廣東省", "深圳市", "2015-02-20".toTimestamp), (10, "Ye Olde Widgete", 3000.00, 0.0, "廣東省", "深圳市", "2015-02-28".toTimestamp), (11, "Ye Olde Widgete", 3000.00, 0.0, "廣東省", "廣州市", "2015-02-28".toTimestamp) ) val saleDF = sqlContext.sparkContext.parallelize(sales, 4).toDF("id", "name", "sales", "discount", "province", "city", "saleDate") }
注冊臨時表,并執行SQL語句:
saleDF.registerTempTable("sales") val dataFrame = sqlContext.sql("select province,city,sales from sales") dataFrame.show
執行的結果如下:
| province |city | sales | |----------|-----|-------| | 廣東省| 深圳市|1000.0| | 四川省| 成都市|1000.0| | 四川省| 綿陽市|1000.0| | 四川省| 成都市|1000.0| | 廣東省| 廣州市|1000.0| | 四川省| 瀘州市|1000.0| | 四川省| 成都市|1000.0| | 四川省| 綿陽市|3000.0| | 廣東省| 深圳市|2000.0| | 廣東省| 深圳市|3000.0| | 廣東省| 廣州市|3000.0|
對該DataFrame執行rollup:
val resultDF = dataFrame.rollup($"province", $"city").agg(Map("sales" -> "sum")) resultDF.show
在這個例子中,rollup操作相當于對dataFrame中的province與city進行分組,并在此基礎上針對sales進行求和運算,故而獲得的結果為:
|province|city|sum(sales)| |--------|----|----------| | null|null| 18000.0| | 廣東省|null| 10000.0| | 廣東省| 深圳市| 6000.0| | 四川省|null| 8000.0| | 四川省| 成都市| 3000.0| | 四川省| 綿陽市| 4000.0| | 廣東省| 廣州市| 4000.0| | 四川省| 瀘州市| 1000.0|
操作非常簡單,然而遺憾地是并不符合我們產品的場景,因為我們需要根據某些元數據直接組裝為Spark SQL的sql語句。在Spark的hiveContext中,支持這樣的語法:
hiveContext.sql("select province, city, sum(sales) from sales group by province, city with rollup")
可惜,SQLContext并不支持這一功能。我在Spark User Mailing List中咨詢了這個問題。Intel的Cheng Hao(Spark的一位非常活躍的contributer)告訴了我為何不支持的原因。因為在Spark SQL 1.x版本中,對SQL語法的解析采用了Scala的Parser機制。這種實現方式較弱,對語法的解析支持不夠。Spark的Issue #5080嘗試提供此功能,然而并沒有被合并到Master中。Spark并不希望在1.x版本的SQLParser中添加新的關鍵字,它的計劃是在Spark 2.0中用HQL Parser來替代目前較為簡陋的SQL Parser。
如果希望在sql中使用rollup,那么有三個選擇:
- 使用HQLContext;
- pull #5080的代碼,自己建立一個Spark的分支;
- 等待Spark 2.0版本發布。
來自: http://zhangyi.farbox.com/post/kai-yuan-kuang-jia/rollup-in-spark
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!