基于 HBase 構建可伸縮的分布式事務隊列
一個實時流處理框架通常需要兩個基礎架構:處理器和隊列。處理器從隊列中讀取事件,執行用戶的處理代碼,如果要繼續對結果進行處理,處理器還會把事件寫到 另外一個隊列。隊列由框架提供并管理。隊列做為處理器之間的緩沖,傳輸數據和事件,這樣處理器可以單獨操作和擴展。例如,一個web 服務訪問日志處理應用,可能是這樣的:
框架之間的主要區別在于隊列語義,通常不同之處有以下幾點:
-
調度保障機制: 至少一次,至多一次,只有一次。
-
容災機制: 失敗對用戶和自動恢復是透明的。
-
可用性: 數據在出現錯誤后可以保存并重啟。
-
可擴展性:產品/用戶增加時的局限性。
-
性能Performance: 隊列操作的吞吐量和延遲。
我們想在開源的 Cask Data Application Platform (CDAP)上 提供一個動態可擴展,強一致性并且有一次性交易機制的實時流處理框架,在這個強大的機制保護下,開發者可以自由操作任何形式的數據操作而不用擔心不一致 性,潛在的返工和失敗。它可以幫助開發者在沒有分布式系統背景的情況下建立他們的大數據應用。此外,如果需要可以關閉這種強大的保護機制換取高性能。它總 是比其他方式更容易使用。
可擴展隊列
隊列有兩種基本操作:入隊和出隊。生產者將消息寫到隊頭(入隊),消費者從隊尾讀取數據(出隊)。如果做為一個整體你添加更多生成者時的入隊速度和添加更多消費者時的出隊速度足夠快,我們說這個隊列是可擴展的。
理想狀態下,擴展是線性的,這意味著兩倍的生產者 /消費者,會產生兩位速度的出隊/入隊,增長只受集群的規模限制。為了支持生產者的線性擴展,隊列需要一個存儲系統并且需要當前寫入者的數量線性擴展。為 了應對消費者的線性擴展,隊列可以分區,例如一個消費者只處理隊列中的一段數據。
隊列擴展的另一個方面是它應該可以橫向擴展。這意味著隊列性能的上限可以通過增加集群結點的方式來提升。這是很重要的,它可以保證隊列不受當前集群大小限制根據數據的增長而擴展。
分區的 HBase 隊列
我們選擇 Apache HBase 做為隊列的存儲層。它為存儲強一致性,可橫向擴展的行數據做了設計和優化。它的并發寫操作性能非常好,并提供了有序掃描以支持分區消費者。我們使用 HBase Coprocessors 的高效掃描濾波和隊列清洗。為了在隊列上使用一次性語義,我們用 Tephra’s 為 HBase 提供傳輸支持。
生產者和消費者具有操作獨立性。每個生產者通過 Hbase puts 批處理執行入隊操作,消費者通過執行 Hbase Scans 執行出隊操作。生產者和消費者的數量之間沒有關聯,他們可以分離。
此隊列存在一個消費者組的概念。一個消費者組,是由相同的關鍵字劃分的消費者集合,這樣,每個發布到隊列的事件,就會由此消費者組中的消費者去消費。使用 消費者組,可以通過不同的關鍵字劃分同一個隊列,同時,也可以通過數據的操作性特點來拓展。按照上面訪問日志分析的例子,生產者和消費者組可能看起來像這 樣:
對于Log Parser,這里有兩個生產者在運行,它們并發的向隊列寫數據。在消費者這邊,這里存在兩個消費者組。 Unique User Counter組有兩個消費者,使用UserID作為劃分(隊列的)關鍵字。Page View Counter組則有三個消費者,使用 PageID 作為劃分(隊列的)關鍵字。
隊列行值格式
當一個事件通過一個生產者被發布出去,一個或多個消費者組合將收到消息,我們把事件寫入 HBase 表的一個或多個行上,那么這條記錄就被設計成適用于每個消費者組。事件的有效負荷和元數據被存儲在獨立的列上,那么行的值就是下面這樣的格式:
兩 個有趣的部分是行的值是分區 ID 和整個 ID。分區 ID 通過限定行值前綴再提供給消費者。消費者只被允許讀數據,并在出隊的時候使用前綴掃描。分區 ID 由兩部分組成:一個消費者組 ID 和一個消費者 ID。生產者計算出每個消費者組的分區 ID,并通過入隊寫到那些行。
行關鍵字中的入口 ID(Entry ID)包含了事務信息。它由 Tephra 觸發的生產者事務寫指針和單向增長的計數器組成。這個計數器由本地的生產者生成,同時,針對事件,計數器需要讓行關鍵字唯一,因為生產者可以在同一個事務中將多個事件加入隊列。
出隊列的時候,計數器會使用事務寫指針來決定,隊列入口是否已經提交,以及是否可以消費了。事務寫指針和計數器的組合,使得行關鍵字總是唯一的。這讓生產者可以獨立的操作,而不會有寫沖突。
為了生成分區 ID(Partition ID),生產者需要知道大小和每個消費者組的分區關鍵字。當應用程序啟動,以及組大小發生任何變化的時候,消費者組信息都會被記錄下來。
改變生產者和消費者
增加或減少生產者是很直接的,因為每個生產者都是獨立操作的。增加或減少生產者進程就可以滿足這個要求。然而,當消費者組的大小需要改變的時候,就需要協調來正確更新消費者組的信息。可以用下面的圖表來概括所需的步驟:
由于暫停和恢復是由 Apache ZooKeeper 來協調的,同時它們也是并行執行的,所以它們是兩個非常快速的操作。例如,之前我們提到的 Web 訪問日志分析應用程序,改變消費者組信息的過程可能看起來像這樣:
基于這個隊列的設計,入隊列和出隊列的性能,與單獨的批量 HBase Puts 和 HBase Scans 不相上下,這樣也帶來與 Tephra 服務器進行通訊的開銷。通過在同一個業務處理中將多個事件批量處理,可以大大降低這個開銷。
最后,為了避免“熱點聚焦(hotspotting)“,我們基于簇的大小提前分割了 HBase 表,同時,在行關鍵字(row key)上采用 加鹽(salting) 的方式來更好的分配寫。否則,由于是單調的增加業務處理寫指針,行關鍵字就會是連續的。
性能值
我們在小型的 10 節點的 HBase 集群上已經測試過性能,結果令人印象深刻。使用 1K 字節負載,以 500 個事件為一個批次大小,我們完成了生產和消費 100K 個事件/秒的吞吐量,其中運行了 3 個生產者和 10 個消費者。我們也觀察到當我們增加消費者和消費者的時候,吞吐量線性增加:例如,當我們將生產者和消費者數量加倍的時候,吞吐量增加到 200K 個事件/秒。
在 HBase 的幫助下,結合最佳實踐,我們成功的創建了一個線性可伸縮的,分布式事務隊列系統。同時,在 CDAP 中使用這個系統提供實時流處理框架:動態可伸縮,強一致性,以及一次交付的傳輸保證。