Apache Flink 容錯機制

Introduce

Apache Flink 提供了可以恢復數據流應用到一致狀態的容錯機制。確保在發生故障時,程序的每條記錄只會作用于狀態一次(exactly-once),當然也可以降級為至少一次(at-least-once)。

容錯機制通過持續創建分布式數據流的快照來實現。對于狀態占用空間小的流應用,這些快照非常輕量,可以高頻率創建而對性能影響很小。流計算應用的狀態保存在一個可配置的環境,如:master 節點或者 HDFS上。

在遇到程序故障時(如機器、網絡、軟件等故障),Flink 停止分布式數據流。系統重啟所有 operator ,重置其到最近成功的 checkpoint。輸入重置到相應的狀態快照位置。保證被重啟的并行數據流中處理的任何一個 record 都不是 checkpoint 狀態之前的一部分。

注意: 為了容錯機制生效,數據源(例如 queue 或者 broker)需要能重放數據流。 Apache Kafka 有這個特性,Flink 中 Kafka 的 connector 利用了這個功能。

注意: 由于 Flink 的 checkpoint 是通過分布式快照實現的,接下來我們將 snapshot 和 checkpoint 這兩個詞交替使用。

Checkpointing

Flink 容錯機制的核心就是持續創建分布式數據流及其狀態的一致快照。這些快照在系統遇到故障時,充當可以回退的一致性檢查點(checkpoint)。 Lightweight Asynchronous Snapshots for Distributed Dataflows 描述了Flink 創建快照的機制。此論文是受分布式快照算法 Chandy-Lamport 啟發,并針對 Flink 執行模型量身定制。

Barriers

Flink 分布式快照的核心概念之一就是數據柵欄(barrier)。這些 barrier 被插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier 不會干擾正常數據,數據流嚴格有序。一個 barrier 把數據流分割成兩部分:一部分進入到當前快照,另一部分進入下一個快照。每一個 barrier 都帶有快照 ID,并且 barrier 之前的數據都進入了此快照。Barrier 不會干擾數據流處理,所以非常輕量。多個不同快照的多個 barrier 會在流中同時出現,即多個快照可能同時創建。

Barrier 在數據源端插入,當 snapshot n 的 barrier 插入后,系統會記錄當前 snapshot 位置值 n (用Sn表示)。例如,在 Apache Kafka 中,這個變量表示某個分區中最后一條數據的偏移量。這個位置值 Sn 會被發送到一個稱為 checkpoint coordinator 的模塊。(即 Flink 的 JobManager).

然后 barrier 繼續往下流動,當一個 operator 從其輸入流接收到所有標識 snapshot n 的 barrier 時,它會向其所有輸出流插入一個標識 snapshot n 的 barrier。當 sink operator (DAG 流的終點)從其輸入流接收到所有 barrier n 時,它向 the checkpoint coordinator 確認 snapshot n 已完成。當所有 sink 都確認了這個快照,快照就被標識為完成。

接收超過一個輸入流的 operator 需要基于 barrier 對齊(align)輸入。參見上圖:

  • operator 只要一接收到某個輸入流的 barrier n,它就不能繼續處理此數據流后續的數據,直到 operator 接收到其余流的 barrier n。否則會將屬于 snapshot n 的數據和 snapshot n+1的搞混

  • barrier n 所屬的數據流先不處理,從這些數據流中接收到的數據被放入接收緩存里(input buffer)

  • 當從最后一個流中提取到 barrier n 時,operator 會發射出所有等待向后發送的數據,然后發射snapshot n 所屬的 barrier

  • 經過以上步驟,operator 恢復所有輸入流數據的處理,優先處理輸入緩存中的數據

State

operator 包含任何形式的狀態,這些狀態都必須包含在快照中。狀態有很多種形式:

  • 用戶自定義狀態:由 transformation 函數例如( map() 或者 filter())直接創建或者修改的狀態。用戶自定義狀態可以是:轉換函數中的 Java 對象的一個簡單變量或者函數關聯的 key/value 狀態。

  • 系統狀態:這種狀態是指作為 operator 計算中一部分緩存數據。典型例子就是: 窗口緩存(window buffers),系統收集窗口對應數據到其中,直到窗口計算和發射。

operator 在收到所有輸入數據流中的 barrier 之后,在發射 barrier 到其輸出流之前對其狀態進行快照。此時,在 barrier 之前的數據對狀態的更新已經完成,不會再依賴 barrier 之前數據。由于快照可能非常大,所以后端存儲系統可配置。默認是存儲到 JobManager 的內存中,但是對于生產系統,需要配置成一個可靠的分布式存儲系統(例如 HDFS)。狀態存儲完成后,operator 會確認其 checkpoint 完成,發射出 barrier 到后續輸出流。

快照現在包含了:

  • 對于并行輸入數據源:快照創建時數據流中的位置偏移

  • 對于 operator:存儲在快照中的狀態指針

Exactly Once vs. At Least Once

對齊操作可能會對流程序增加延遲。通常,這種額外的延遲在幾毫秒的數量級,但是我們也遇到過延遲顯著增加的異常情況。針對那些需要對所有輸入都保持毫秒級的應用,Flink 提供了在 checkpoint 時關閉對齊的方法。當 operator 接收到一個 barrier 時,就會打一個快照,而不會等待其他 barrier。

跳過對齊操作使得即使在 barrier 到達時,Operator 依然繼續處理輸入。這就是說:operator 在 checkpoint n 創建之前,繼續處理屬于 checkpoint n+1 的數據。所以當異常恢復時,這部分數據就會重復,因為它們被包含在了 checkpoint n 中,同時也會在之后再次被處理。

注意: 對齊操作只會發生在擁有多輸入運算(join)或者多個輸出的 operator(重分區、分流)的場景下。所以,對于自由 map(), flatmap(), fliter() 等的并行操作即使在至少一次的模式中仍然會保證嚴格一次。

Asynchronous State Snapshots

我們注意到上面描述的機制意味著當 operator 向后端存儲快照時,會停止處理輸入的數據。這種同步操作會在每次快照創建時引入延遲。

我們完全可以在存儲快照時,讓 operator 繼續處理數據,讓快照存儲在后臺異步運行。為了做到這一點,operator 必須能夠生成一個后續修改不影響之前狀態的狀態對象。例如 RocksDB 中使用的寫時復制( copy-on-write )類型的數據結構。

接收到輸入的 barrier 時,operator 異步快照復制出的狀態。然后立即發射 barrier 到輸出流,繼續正常的流處理。一旦后臺異步快照完成,它就會向 checkpoint coordinator(JobManager)確認 checkpoint 完成。現在 checkpoint 完成的充分條件是:所有 sink 接收到了 barrier,所有有狀態 operator 都確認完成了狀態備份(可能會比 sink 接收到 barrier 晚)。

Recovery

在這種容錯機制下的錯誤回復很明顯:一旦遇到故障,Flink 選擇最近一個完成的 checkpoint k。系統重新部署整個分布式數據流,重置所有 operator 的狀態到 checkpoint k。數據源被置為從 Sk 位置讀取。例如在 Apache Kafka 中,意味著讓消費者從 Sk 處偏移開始讀取。

如果是增量快照,operator 需要從最新的全量快照回復,然后對此狀態進行一系列增量更新。

Operator Snapshot Implementation

當 operator 快照創建時有兩部分操作:同步操作和異步操作。

operator 和后端存儲將快照以 Java FutureTask 的方式提供。這個 task 包含了同步操作已經完成,異步操作還在等待的狀態(state)。異步操作在后臺線程中被執行。

完全同步的 operator 返回一個已經完成的 FutureTask 。如果異步操作需要執行,FutureTask 中的 run() 方法會被調用。

為了釋放流和其他資源的消耗,可以取消這些 task。

 

來自:https://segmentfault.com/a/1190000008129552

 

 本文由用戶 mthj8765 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!