Spark中的rollup

jopen 8年前發布 | 8K 次閱讀 分布式/云計算/大數據

Spark中的rollup

在對數據進行小計或合計運算時,rollup和cube一樣,算是常用的操作了。Spark的DataFrame提供了rollup函數支持此功能。

假設準備了如下數據:

trait SalesDataFrameFixture extends DataFrameFixture
with SparkSqlSupport {
  implicit class StringFuncs(str: String) {
    def toTimestamp = new Timestamp(Date.valueOf(str).getTime)
  }

  import sqlContext.implicits._

  val sales = Seq(
    (1, "Widget Co", 1000.00, 0.00, "廣東省", "深圳市", "2014-02-01".toTimestamp),
    (2, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-11".toTimestamp),
    (3, "Acme Widgets", 1000.00, 500.00, "四川省", "綿陽市", "2014-02-12".toTimestamp),
    (4, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-13".toTimestamp),
    (5, "Widget Co", 1000.00, 0.00, "廣東省", "廣州市", "2015-01-01".toTimestamp),
    (6, "Acme Widgets", 1000.00, 500.00, "四川省", "瀘州市", "2015-01-11".toTimestamp),
    (7, "Widgetry", 1000.00, 200.00, "四川省", "成都市", "2015-02-11".toTimestamp),
    (8, "Widgets R Us", 3000.00, 0.0, "四川省", "綿陽市", "2015-02-19".toTimestamp),
    (9, "Widgets R Us", 2000.00, 0.0, "廣東省", "深圳市", "2015-02-20".toTimestamp),
    (10, "Ye Olde Widgete", 3000.00, 0.0, "廣東省", "深圳市", "2015-02-28".toTimestamp),
    (11, "Ye Olde Widgete", 3000.00, 0.0, "廣東省", "廣州市", "2015-02-28".toTimestamp)
  )

  val saleDF = sqlContext.sparkContext.parallelize(sales, 4).toDF("id", "name", "sales", "discount", "province", "city", "saleDate")
}

注冊臨時表,并執行SQL語句:

saleDF.registerTempTable("sales")

val dataFrame = sqlContext.sql("select province,city,sales from sales")
dataFrame.show

執行的結果如下:

| province |city | sales |
|----------|-----|-------|
|     廣東省| 深圳市|1000.0|
|     四川省| 成都市|1000.0|
|     四川省| 綿陽市|1000.0|
|     四川省| 成都市|1000.0|
|     廣東省| 廣州市|1000.0|
|     四川省| 瀘州市|1000.0|
|     四川省| 成都市|1000.0|
|     四川省| 綿陽市|3000.0|
|     廣東省| 深圳市|2000.0|
|     廣東省| 深圳市|3000.0|
|     廣東省| 廣州市|3000.0|

對該DataFrame執行rollup:

val resultDF = dataFrame.rollup($"province", $"city").agg(Map("sales" -> "sum"))
resultDF.show

在這個例子中,rollup操作相當于對dataFrame中的province與city進行分組,并在此基礎上針對sales進行求和運算,故而獲得的結果為:

|province|city|sum(sales)|
|--------|----|----------|
|    null|null|   18000.0|
|     廣東省|null|   10000.0|
|     廣東省| 深圳市|    6000.0|
|     四川省|null|    8000.0|
|     四川省| 成都市|    3000.0|
|     四川省| 綿陽市|    4000.0|
|     廣東省| 廣州市|    4000.0|
|     四川省| 瀘州市|    1000.0|

操作非常簡單,然而遺憾地是并不符合我們產品的場景,因為我們需要根據某些元數據直接組裝為Spark SQL的sql語句。在Spark的hiveContext中,支持這樣的語法:

hiveContext.sql("select province, city, sum(sales) from sales group by province, city with rollup")

可惜,SQLContext并不支持這一功能。我在Spark User Mailing List中咨詢了這個問題。Intel的Cheng Hao(Spark的一位非常活躍的contributer)告訴了我為何不支持的原因。因為在Spark SQL 1.x版本中,對SQL語法的解析采用了Scala的Parser機制。這種實現方式較弱,對語法的解析支持不夠。Spark的Issue #5080嘗試提供此功能,然而并沒有被合并到Master中。Spark并不希望在1.x版本的SQLParser中添加新的關鍵字,它的計劃是在Spark 2.0中用HQL Parser來替代目前較為簡陋的SQL Parser。

如果希望在sql中使用rollup,那么有三個選擇:

  • 使用HQLContext;
  • pull #5080的代碼,自己建立一個Spark的分支;
  • 等待Spark 2.0版本發布。
2015-12-30 21:17 78

來自: http://zhangyi.farbox.com/post/kai-yuan-kuang-jia/rollup-in-spark

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!