Flink 原理與實現:Aysnc I/O
Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,于1.2版本引入。主要目的是為了解決與外部系統交互時網絡延遲成為了系統瓶頸的問題。
流計算系統中經常需要與外部系統進行交互,比如需要查詢外部數據庫以關聯上用戶的額外信息。通常,我們的實現方式是向數據庫發送用戶 a 的查詢請求,然后等待結果返回,在這之前,我們無法發送用戶 b 的查詢請求。這是一種同步訪問的模式,如下圖左邊所示。
圖片來自官方文檔
圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以并發地處理多個請求和回復。也就是說,你可以連續地向數據庫發送用戶 a 、 b 、 c 等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。
Async I/O
使用 Async I/O 的前提是需要一個支持異步請求的客戶端。當然,沒有異步請求客戶端的話也可以將同步客戶端丟到線程池中執行作為異步客戶端。Flink 提供了非常簡潔的API,讓用戶只需要關注業務邏輯,一些臟活累活比如消息順序性和一致性保證都由框架處理了,多么棒的事情!
使用方式如下方代碼片段所示(來自官網文檔):
/** 'AsyncFunction' 的一個實現,向數據庫發送異步請求并設置回調 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)]{
/** 可以異步請求的特定數據庫的客戶端 */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** future 的回調的執行上下文(當前線程) */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
// 發起一個異步請求,返回結果的 future
val resultFuture: Future[String] = client.query(str)
// 設置請求完成時的回調: 將結果傳遞給 collector
resultFuture.onSuccess {
case result: String => asyncCollector.collect(Iterable((str, result)));
}
}
}
// 創建一個原始的流
val stream: DataStream[String] = ...
// 添加一個 async I/O 的轉換
val resultStream: DataStream[(String, String)] =
AsyncDataStream.(un)orderedWait(
stream, new AsyncDatabaseRequest(),
1000, TimeUnit.MILLISECONDS, // 超時時間
100) // 進行中的異步請求的最大數量
AsyncDataStream 有兩個靜態方法, orderedWait 和 unorderedWait ,對應了兩種輸出模式:有序和無序。
- 有序:消息的發送順序與接受到的順序相同(包括 watermark ),也就是先進先出。
- 無序:
- 在 ProcessingTime 的情況下,完全無序,先返回的結果先發送。
- 在 EventTime 的情況下,watermark 不能超越消息,消息也不能超越 watermark,也就是說 watermark 定義的順序的邊界。在兩個 watermark 之間的消息的發送是無序的,但是在watermark之后的消息不能先于該watermark之前的消息發送。
原理實現
AsyncDataStream.(un)orderedWait 的主要工作就是創建了一個 AsyncWaitOperator 。 AsyncWaitOperator 是支持異步 IO 訪問的算子實現,該算子會運行 AsyncFunction 并處理異步返回的結果,其內部原理如下圖所示。
如圖所示, AsyncWaitOperator 主要由兩部分組成: StreamElementQueue 和 Emitter 。StreamElementQueue 是一個 Promise 隊列,所謂 Promise 是一種異步抽象表示將來會有一個值(參考 Scala Promise 了解更多),這個隊列是未完成的 Promise 隊列,也就是進行中的請求隊列。Emitter 是一個單獨的線程,負責發送消息(收到的異步回復)給下游。
圖中 E5 表示進入該算子的第五個元素(”Element-5”),在執行過程中首先會將其包裝成一個 “Promise” P5 ,然后將 P5 放入隊列。最后調用 AsyncFunction 的 ayncInvoke 方法,該方法會向外部服務發起一個異步的請求,并注冊回調。該回調會在異步請求成功返回時調用 AsyncCollector.collect 方法將返回的結果交給框架處理。實際上 AsyncCollector 是一個 Promise ,也就是 P5 ,在調用 collect 的時候會標記 Promise 為完成狀態,并通知 Emitter 線程有完成的消息可以發送了。Emitter 就會從隊列中拉取完成的 Promise ,并從 Promise 中取出消息發送給下游。
消息的順序性
上文提到 Async I/O 提供了兩種輸出模式。其實細分有三種模式: 有序,ProcessingTime 無序,EventTime 無序。Flink 使用隊列來實現不同的輸出模式,并抽象出一個隊列的接口( StreamElementQueue ),這種分層設計使得 AsyncWaitOperator 和 Emitter 不用關心消息的順序問題。 StreamElementQueue 有兩種具體實現,分別是 OrderedStreamElementQueue 和 UnorderedStreamElementQueue 。 UnorderedStreamElementQueue 比較有意思,它使用了一套邏輯巧妙地實現完全無序和 EventTime 無序。
有序
有序比較簡單,使用一個隊列就能實現。所有新進入該算子的元素(包括 watermark),都會包裝成 Promise 并按到達順序放入該隊列。如下圖所示,盡管 P4 的結果先返回,但并不會發送,只有 P1 (隊首)的結果返回了才會觸發 Emitter 拉取隊首元素進行發送。
ProcessingTime 無序
ProcessingTime 無序也比較簡單,因為沒有 watermark,不需要協調 watermark 與消息的順序性,所以使用兩個隊列就能實現,一個 uncompletedQueue 一個 completedQueue 。所有新進入該算子的元素,同樣的包裝成 Promise 并放入 uncompletedQueue 隊列,當 uncompletedQueue 隊列中任意的Promise返回了數據,則將該 Promise 移到 completedQueue 隊列中,并通知 Emitter 消費。如下圖所示:
EventTime 無序
EventTime 無序類似于有序與 ProcessingTime 無序的結合體。因為有 watermark,需要協調 watermark 與消息之間的順序性,所以 uncompletedQueue 中存放的元素從原先的 Promise 變成了 Promise 集合。如果進入算子的是消息元素,則會包裝成 Promise 放入隊尾的集合中。如果進入算子的是 watermark,也會包裝成 Promise 并放到一個獨立的集合中,再將該集合加入到 uncompletedQueue 隊尾,最后再創建一個空集合加到 uncompletedQueue 隊尾。這樣,watermark 就成了消息順序的邊界。只有處在隊首的集合中的 Promise 返回了數據,才能將該 Promise 移到 completedQueue 隊列中,由 Emitter 消費發往下游。只有隊首集合空了,才能處理第二個集合。這樣就保證了當且僅當某個 watermark 之前所有的消息都已經被發送了,該 watermark 才能被發送。過程如下圖所示:
快照與恢復
分布式快照機制是為了保證狀態的一致性。我們需要分析哪些狀態是需要快照的,哪些是不需要的。首先,已經完成回調并且已經發往下游的元素是不需要快照的。否則,會導致重發,那就不是 exactly-once 了。而已經完成回調且未發往下游的元素,加上未完成回調的元素,就是上述隊列中的所有元素。
所以快照的邏輯也非常簡單,(1)清空原有的狀態存儲,(2)遍歷隊列中的所有 Promise,從中取出 StreamElement (消息或 watermark)并放入狀態存儲中,(3)執行快照操作。
恢復的時候,從快照中讀取所有的元素全部再處理一次,當然包括之前已完成回調的元素。所以在失敗恢復后,會有元素重復請求外部服務,但是每個回調的結果只會被發往下游一次。
本文的原理和實現分析基于 Flink 1.3 版本。
來自:http://wuchong.me/blog/2017/05/17/flink-internals-async-io/