Flink 原理與實現:Table & SQL API

ThaliaXGJZ 7年前發布 | 57K 次閱讀 SQL API

Flink 已經擁有了強大的 DataStream/DataSet API,可以基本滿足流計算和批計算中的所有需求。為什么還需要 Table & SQL API 呢?

首先 Table API 是一種關系型API,類 SQL 的API,用戶可以像操作表一樣地操作數據,非常的直觀和方便。用戶只需要說需要什么東西,系統就會自動地幫你決定如何最高效地計算它,而不需要像 DataStream 一樣寫一大堆 Function,優化還得純靠手工調優。另外,SQL 作為一個“人所皆知”的語言,如果一個引擎提供 SQL,它將很容易被人們接受。這已經是業界很常見的現象了。值得學習的是,Flink 的 Table API 與 SQL API 的實現,有 80% 的代碼是共用的。所以當我們討論 Table API 時,常常是指 Table & SQL API。

Table & SQL API 還有另一個職責,就是流處理和批處理統一的API層。Flink 在runtime層是統一的,因為Flink將批任務看做流的一種特例來執行,這也是 Flink 向外鼓吹的一點。然而在編程模型上,Flink 卻為批和流提供了兩套API (DataSet 和 DataStream)。為什么 runtime 統一,而編程模型不統一呢? 在我看來,這是本末倒置的事情。用戶才不管你 runtime 層是否統一,用戶更關心的是寫一套代碼。這也是為什么現在 Apache Beam 能這么火的原因。所以 Table & SQL API 就扛起了統一API的大旗,批上的查詢會隨著輸入數據的結束而結束并生成有限結果集,流上的查詢會一直運行并生成結果流。Table & SQL API 做到了批與流上的查詢具有同樣的語法,因此不用改代碼就能同時在批和流上跑。

聊聊歷史

Table API 始于 Flink 0.9,Flink 0.9 是一個類庫百花齊放的版本,眾所周知的 Table API, Gelly, FlinkML 都是在這個版本加進去的。Flink 0.9 大概是在2015年6月正式發布的,在 Flink 0.9 發布之前,社區對 SQL 展開過好幾次爭論,不過當時社區認為應該首先完善 Table API 的功能,再去搞SQL,如果兩頭一起搞很容易什么都做不好。而且在整個Hadoop生態圈中已經有大量的所謂 “SQL-on-Hadoop” 的解決方案,譬如 Apache Hive , Apache Drill , Apache Impala 。”SQL-on-Flink”的事情也可以像 Hadoop 一樣丟給其他社區去搞。

不過,隨著 Flink 0.9 的發布,意味著抽象語法樹、代碼生成、運行時函數等都已經成熟,這為SQL的集成鋪好了前進道路。另一方面,用戶對 SQL 的呼聲越來越高。2015年下半年,Timo 大神也加入了 dataArtisans,于是對Table API的改造開始了。2016 年初的時候,改造基本上完成了。我們也是在這個時間點發現了 Table API 的潛力,并加入了社區。經過這一年的努力,Flink 已經發展成 Apache 中最火熱的項目之一,而 Flink 中最活躍的類庫目前非 Table API 莫屬。這其中離不開國內公司的支持,Table API 的貢獻者絕大多數都來自于阿里巴巴和華為,并且主導著 Table API 的發展方向,這是非常令國人自豪的。而我在社區貢獻了一年后,幸運地成為了 Flink Committer。

Table API & SQL 長什么樣?

這里不會詳細介紹 Table API & SQL 的使用,只是做一個展示。

下面這個例子展示了如何用 Table API 處理溫度傳感器數據。計算每天每個以 room 開頭的location的平均溫度。例子中涉及了如何使用window,event-time等。

val sensorData: DataStream[(String, Long, Double)] = ???

// convert DataSet into Table
val sensorTable: Table = sensorData
 .toTable(tableEnv, 'location, 'time, 'tempF)

// define query on Table
val avgTempCTable: Table = sensorTable 
 .window(Tumble over 1.day on 'rowtime as 'w) 
 .groupBy('location, 'w)
 .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC)
 .where('location like "room%")

下面的例子是展示了如何用 SQL 來實現。

val sensorData: DataStream[(String, Long, Double)] = ???

// register DataStream
tableEnv.registerDataStream("sensorData", sensorData, 'location, ’time, 'tempF)

// query registered Table
val avgTempCTable: Table = tableEnv.sql("""
 SELECT FLOOR(rowtime() TO DAY) AS day, location, 
 AVG((tempF - 32) * 0.556) AS avgTempC
 FROM sensorData
 WHERE location LIKE 'room%'
 GROUP BY location, FLOOR(rowtime() TO DAY) """)

Table API & SQL 原理

Flink 非常明智,沒有像Spark那樣重復造輪子(Spark Catalyst),而是將 SQL 校驗、SQL 解析以及 SQL 優化交給了 Apache Calcite 。Calcite 在其他很多開源項目里也都應用到了,譬如Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架構中處于核心的地位,如下圖所示。

新的架構中,構建抽象語法樹的事情全部交給了 Calcite 去做。SQL query 會經過 Calcite 解析器轉變成 SQL 節點樹,通過驗證后構建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)。另一邊,Table API 上的調用會構建成 Table API 的抽象語法樹,并通過 Calcite 提供的 RelBuilder 轉變成 Calcite 的抽象語法樹。

以上面的溫度計代碼為樣例,Table API 和 SQL 的轉換流程如下,綠色的節點代表 Flink Table Nodes,藍色的節點代表 Calcite Logical Nodes。最終都轉化成了相同的 Logical Plan 表現形式。

之后會進入優化器,Calcite 會基于優化規則來優化這些 Logical Plan,根據運行環境的不同會應用不同的優化規則(Flink提供了批的優化規則,和流的優化規則)。這里的優化規則分為兩類,一類是Calcite提供的內置優化規則(如條件下推,剪枝等),另一類是是將Logical Node轉變成 Flink Node 的規則。這兩類規則的應用體現為下圖中的①和②步驟,這兩步驟都屬于 Calcite 的優化階段。得到的 DataStream Plan 封裝了如何將節點翻譯成對應 DataStream/DataSet 程序的邏輯。步驟③就是將不同的 DataStream/DataSet Node 通過代碼生成(CodeGen)翻譯成最終可執行的 DataStream/DataSet 程序。

代碼生成是 Table API & SQL 中最核心的一塊內容。表達式、條件、內置函數等等是需要CodeGen出具體的Function 代碼的,這部分跟Spark SQL的結構很相似。CodeGen 出的Function以字符串的形式存在。在提交任務后會分發到各個 TaskManager 中運行,在運行時會使用 Janino 編譯器編譯代碼后運行。

Table API & SQL 現狀

目前 Table API 對于批和流都已經支持了基本的Selection, Projection, Union,以及 Window 操作(包括固定窗口、滑動窗口、會話窗口)。SQL 的話由于 Calcite 在最近的版本中才支持 Window 語法,所以目前 Flink SQL 還不支持 Window 的語法。并且 Table API 和 SQL 都支持了UDF,UDTF,UDAF(開發中)。

Table API & SQL 未來

  1. Dynamic Tables

    Dynamic Table 就是傳統意義上的表,只不過表中的數據是會變化更新的。Flink 提出 Stream <–> Dynamic Table 之間是可以等價轉換的。不過這需要引入Retraction機制。有機會的話,我會專門寫一篇文章來介紹。

  2. Joins

    包括了支持流與流的 Join,以及流與表的 Join。

  3. SQL 客戶端

    目前 SQL 是需要內嵌到 Java/Scala 代碼中運行的,不是純 SQL 的使用方式。未來需要支持 SQL 客戶端執行提交 SQL 純文本運行任務。

  4. 并行度設置

    目前 Table API & SQL 是無法設置并行度的,這使得 Table API 看起來仍像個玩具。

在我看來,Flink 的 Table & SQL API 是走在時代前沿的,在很多方面在做著定義業界標準的事情,比如 SQL 上Window的表達,時間語義的表達,流和批語義的統一等。在我看來,SQL 擁有更天然的流與批統一的特性,并且能夠自動幫用戶做很多SQL優化(下推、剪枝等),這是 Beam 所做不到的地方。當然,未來如果 Table & SQL API 發展成熟的話,剝離出來作為業界標準的流與批統一的API也不是不可能(叫BeamTable,BeamSQL ?),哈哈。這也是我非常看好 Table & SQL API,認為其大有潛力的一個原因。當然就目前來說,需要走的路還很長,Table API 現在還只是個玩具。

參考文獻

 

來自:http://wuchong.me/blog/2017/03/30/flink-internals-table-and-sql-api/

 

 本文由用戶 ThaliaXGJZ 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!