Apache Flink源碼解析之stream-source
今天我們來解讀一下Flink stream里的 source 模塊。它是整個stream的入口,也是我們了解其流處理體系的入口。
SourceFunction
SourceFunction是所有stream source的根接口。
它繼承自一個標記接口(空接口) Function 。
SourceFunction 定義了兩個接口方法:
- run : 啟動一個source,即對接一個外部數據源然后emit元素形成stream(大部分情況下會通過在該方法里運行一個while循環的形式來產生stream)。
- cancel : 取消一個source,也即將run中的循環emit元素的行為終止。
正常情況下,一個 SourceFunction 實現這兩個接口方法就可以了。其實這兩個接口方法也固化了一種實現 模板 。
比如,實現一個XXXSourceFunction,那么大致的模板是這樣的:
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<T> ctx) throws Exception {
while (isRunning && otherCondition == true) {
ctx.collect(getElement());
}
}
@Override
public void cancel() {
isRunning = false;
}
SourceContext
Flink將Source的運行機制跟其如何emit元素進行了分離。具體如何emit元素,取決于另外一個獨立的接口 SourceContext 。 SourceFunction 以內部接口的方式定義了該上下文接口對象,將具體的實現拋給具體的sourceFunction。該接口中定義了emit元素的接口方法:
- collect : 從source emit一個元素,該元素的時間戳被自動設置為本地時鐘( System#currentTimeMillis() ),這種由當前source自動追加的時間戳,在Flink里稱之為 Ingress Time (即攝入時間)。
- collectWithTimestamp : 根據用戶提供的自定義的時間戳emit一個元素,這種被稱之為 Event Time (即用戶自行設置的事件時間)。
- emitWatermark : 手動發射一個 Watermark 。
這里有幾個時間概念可參考我之前的文章: http://vinoyang.com/2016/05/02/flink-concepts/#時間
Watermark:Flink用 Watermark 來對上面的 Event Time 類型的事件進行窗口處理。所謂的 Watermark 是一個時間基準。WaterMark包含一個時間戳,Flink使用 WaterMark 標記所有小于該時間戳的消息都已流入,Flink的數據源在確認所有小于某個時間戳的消息都已輸出到Flink流處理系統后,會生成一個包含該時間戳的 WaterMark ,插入到消息流中輸出到Flink流處理系統中,Flink操作符按照時間窗口緩存所有流入的消息,當operator處理到WaterMark時,它對所有小于該WaterMark時間戳的時間窗口數據進行處理并發送到下一個operator節點,然后也將WaterMark發送到下一個operator節點。
內置的SourceFunction實現
source相關的完整類圖如下:
RichSourceFunction
一個抽象類,繼承自 AbstractRichFunction 。為實現一個 Rich SourceFunction提供基礎能力(其實所謂的Rich,主要是提供某種范式或者模板幫助你完成一部分基礎實現)。該類的子類有兩個,不過他們仍然是抽象類,只是在此基礎上提供了更具體的實現:
- MessageAcknowledgingSourceBase :它針對的是數據源是消息隊列的場景并且提供了基于ID的應答機制。
- MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎上針對ID應答機制進行了更為細分的處理,支持兩種ID應答模型: session id 和 unique message id 。
ParallelSourceFunction
該接口只是個標記接口,用于標識繼承該接口的Source都是并行執行的。其直接實現類是 RichParallelSourceFunction ,它是一個抽象類并繼承自 AbstractRichFunction (從名稱可以看出,它應該兼具 rich 和 parallel 兩個特性,這里的 rich 體現在它定義了 open 和 close 這兩個方法)。
繼承 RichParallelSourceFunction 的那些SourceFunction意味著它們都是并行執行的并且可能有一些資源需要open/close,Flink提供了這么幾個實現:
- FileSourceFunction : 以文件為數據源的Source,它根據給定的 InputFormat 作為數據源記錄的生產器(它可以接收一個file path來基于文件生產記錄),根據給定的 TypeInformation 來產生序列化器,再結合內部創建的 splitIterator 實現了一個基于文件的sourceFunction。
- ConnectorSource : 抽象類,沒有具體的實現。通過其構造器注入了一個屬性 DeserializationSchema ,該屬性是一個協議接口,用于定義如何將二進制數據反序列化為Java/Scala對象。
- StatefulSequenceSource :有狀態的序列Source。它接收 start 和 end 作為一個發射序列的區間,然后根據一定的算法算得需要發射的時間間隔,并保證區間內的元素送達具有 exactly once 的強一致性,具體的計算方式需要結合當前task的subtask的數量以及當前subtask在集合中的索引計算得出。
- FromSplittableIteratorFunction :根據給定的 SplittableIterator (它是一個全局的iterator)結合當前task運行時subtask的數量,以及該subtask在所有subtask中的序號計算出分區(partition)從而產生一個細分的 Iterator 。通過 Iterator 迭代來發射元素。
FileMonitoringFunction
該Source是以監控給定 path 位置的文件為手段,根據給定的 interval 作為時間間隔,emit的內容依賴監控文件的變。Flink為這種形式的Source提供了三種watchtype :
public enum WatchType {
ONLY_NEW_FILES, //僅關注新文件產生
REPROCESS_WITH_APPENDED, //當有文件產生變更,該文件的所有內容都需要被重新處理
PROCESS_ONLY_APPENDED //當有文件產生變更,只有變更的內容需要被處理
}
該類型的Source始終發射的是一個三元組(Tuple3),它包含三個元素:
- filePath : 標識文件路徑
- offset : 偏移量
- fileSize : 文件大小
watchtype的不同主要影響發射元素的內容。當WatchType的類型為 ONLY_NEW_FILES 或 REPROCESS_WITH_APPENDED 類型時, offset 會被設置為0, fileSize 被設置為-1。而WatchType類型為 PROCESS_ONLY_APPENDED ,則三個值都為其對應的真實值。
SocketTextStreamFunction
根據給定的 hostname 和 port ,以socket的方式進行通信并獲取數據,以 delimiter 參數給定的字符作為終止標識符。
FromIteratorFunction
該Source接收一個迭代器,然后在發射循環體中,依次迭代發射數據。
FromElementsFunction
該Source接收一個元素迭代器(一組元素的集合),以Flink的類型序列化機制將其序列化為二進制數據,然后在發射元素的循環體中,進行反序列化為初始類型,再發射數據。
這里先序列化為二進制,再從二進制反序列化為最初的對象類型。不是特別容易理解,乍一看多此一舉,讓人匪夷所思。其實,這么做是有原因的,是因為Flink的序列化機制是其自定義的,并且跟其自主管理內存緊密聯系在一起(想了解其自主內存管理的可參看我之前的系列文章)。而自主內存管理又涉及到二進制數據的存儲。 FromElementsFunction 支持從某個 check point 部分恢復,所以必須先還原其原先的存儲位置(通過序列化),然后跳過不需要emit的元素,然后再發射需要發射的元素(將這些元素反序列化)。
常見連接器中的Source
Flink自身提供了一些針對第三方主流開源系統的連接器支持,它們有:
- elasticsearch
- flume
- kafka(0.8/0.9版本)
- nifi
- rabbitmq
- 推ter
這些連接器有些可以同時作為 source 和 sink 。因為我們今天的主題是source,所以我們先來看看以上這些被支持的連接器它們的source都是繼承自剛剛我們談到的哪些接口或者類。
- kafka : RichParallelSourceFunction
- nifi : RichParallelSourceFunction
- rabbitmq : MultipleIdsMessageAcknowledgingSourceBase(因為rabbitmq具備非常成熟的ack機制,所以繼承這個類是順其自然的)
小結
這篇文章我們主要談及了Flink的stream source相關的設計、實現。當然這個主題還沒有完全談完,還會有后續篇幅繼續解讀。