基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

jopen 10年前發布 | 25K 次閱讀 RabbitMQ 消息系統

RabbitMQ的是一個復雜的野獸。 

它靈活,強大,但也很難完全把控和掌握。 

許多不同的使用情況和使用模式都可以建立在這個強大的軟件之上,但在第一次嘗試為一個特定的解決方案編寫代碼時,差錯和設計錯誤也是司空見慣的事情。 

在本文中,我將討論   Palermo 的設計和實現,它是一個實現了可以將RabbitMQ用作底層排隊機制構建的那些可能的使用模式其中之一:批處理作業處理系統。 

通過批處理作業處理系統,我們引用了一種機制來由 作業程序從不同隊列中提取作業的自動執行。客戶可以排入新的作業到這些隊列,它們最終將被傳遞給將執行作業的程序。如果在執行一個任務失敗了,作業程序線 程將會把失敗的作業放入一個特殊的隊列,它可以重新執行,或者檢查出來進行錯誤調試。

創建Palermo的靈感主要來自于Resque ,它是一個作業處理系統,由Github使用Ruby和Redis創建.

對于系統的使用者,Resque讓這些變成可能:定義擁有不同名字的隊列;作業排隊時,在系統中通過某個輸入參數匹配Ruby類名;在不同的機器中 啟動工作者進程--它們將處理作業,實例化Ruby類,并使用提供的輸入參數來執行作業.如果作業執行失敗,作業將被路由到一個特殊的"失敗"作業隊列. 在這個隊列中的作業會重新執行或被刪除.Resque工作者和底層的操作系統很好的集成在一起,它可以像系統操作者控制作業執行的方式,來處理即將到來的 信號.它也提供了一個web接口,可以監控作業和工作者的狀態,同時也可以執行某種動作,像作業的重新排隊或隊列中作業的清理.

從一個開發者的角度來講,Resque的主要優勢是整個系統的使用超級簡單.定義和排隊作業只需要幾行Ruby代碼即可,同時系統以一種持續的和魯棒的方式運行.

Palermo的目標是針對JVM語言,創建一個簡單易用的作業處理系統,像Resque一樣的健壯,并使用RabbitMQ作為底層的隊列技術而不是Redis.

從一個正式的角度來講,一個作業處理系統的隊列技術可以定義為一個元組空間(tuple space),一種連續關聯的內存,一些進程排隊作業使用如下的方式編寫元組:

write(queue_name, job_type, input_argument)

工作者進程從內存刪除元組,一次一個,使用謂詞匹配隊列名:

read(queue_name, ?, ?)

一個僅有的,必須以元組空間,加入到作業處理隊列技術模型中的限制是,來自分布式內存中的讀函數必須遵從先進先出(FIFO)的語 義.RabbitMQ隊列可以看作是這種類型的,包含FIFO語義的內存.隊列名作為第一個參數傳遞到"讀"謂詞,用來提取存儲在內存中的下一個匹配的元 組.

基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

為了獲取想要讀取的元組空間語義,我們需要處理RabbitMQ內部的一些設置,并配置如下的選項:

- 隊列的持久性
– 消息的持久性
– 服務的質量
– 消息確認

由于要使用可持續的內存,我們需要為RabbitMQ所管理的隊列和消息增加可持續性.為了達到這個效果,在RabbitMQ之中,隊列需要聲明 為"durable",消息需要聲明為"persistent".通過這種方式,即便RabbitMQ broker崩潰了,對于已經創建的隊列信息和未決的消息將在重啟時恢復.

當超過一個工作者連接到同一個隊列時,RabbitMQ會使用round robin的方式,在所有的可用工作者中分發消息.這里主要的問題在于,只要消息到達隊列,RabbitMQ就將發送一個消息到下一個工作者,而不管這個 工作者是否正在處理一個不同的消息.如果一個作業需要很長的時間來處理,到來的消息將堆疊在這個繁忙工作者的本地緩存中,而其他的工作者將處于閑置狀態. 我們可以使用RabbitMQ的兩個特性來處理這種情況:消息確認和服務質量/預取數量.

首先,工作者采用顯式確認的方式處理完消息后可以通知RabbitMQ.消息只有確認后才會從隊列刪除.如果工作者停止運行而沒有確認消息,RabbitMQ會自動將消息重新排隊.

同時,服務質量(qos)配置可以告知RabbitMQ,發送給特定工作者的最大未確認消息數.如果設置了這個值,即從預取數量到1,只有最后一條消息被工作者進程確認,其他消息才能從隊列發送出去.
通過這種方式,可以實現在工作者之間合理的分配作業.

基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

我們仍然需要找到一種方式實現寫內存的操 作.RabbitMQ不同于其他隊列排隊系統的一個重要的方面,是強烈的訂閱者和消費者分離的考量.在RabbitMQ體系結構中,這點是通過隊列和訂閱 者交換的介紹來實現的.訂閱者不知道消息的最終目的地,他們只知道一個交換的名字和路由關鍵字,這個關鍵字可以看作是發送消息的地址.

針對不同的交換類型,RabbitMQ支持不同的語義,這將影響一個帶有地址的消息匹配一個交換的方式.它將傳遞到實際的信箱(隊列),消費者將從這里接收消息.

在我們的例子中,寫方法的第一個參數是隊列名稱。 隊列名稱也是工作者預測下一個內存中待處理的任務時使用的參數。

Palermo中該方法下一個參數是一種直接交換類型。 在這種交換類型中,路由關鍵字必須和傳輸消息的隊列名稱匹配。

如果一條消息被發往 RabbitMQ 進行交換,并且沒有隊列連接到交換,那么這條消息會被丟棄或者被發往相關的備用交換。 為了避免這種情況, Palermo中 一個發布者每次把一個新的任務加入到隊列中時,在發送任務的數據之前,發布者會聲明匹配路由關鍵字的隊列。 通過這種方式,我們可以確定當沒有工作者在等待處理任務時消息也不會被丟棄。 在 RabbitMQ中,使用相同的參數重新聲明一個已存在的隊列是一個合法的操作,且沒有任何影響。

使用之前為RabbitMQ所描述的設置,我們可以建立工作處理系統的核心。然而,某些特性,像失敗事務的管理或者消息的序列化,它們不能被編址放到RabbitMQ的特性中。在Palermo中,這個特性已經被實現,并被作為一個附加的應用軟件層,這個層可以被分配作為一個Java庫。

首要的問題是如何處理失敗的事務。我們來看它是怎么(工作的),過去我們使用顯式的人工確認,在人工處理或超時時,RabbitMQ將會處理致命的 錯誤。這套機制也可以用于處理在事務處理邏輯中(產生的)任何類型的異常條件,除了我們期望的功能需求外,失敗信息必定會發送給一個特別的失敗事務隊列, 他們可以被查閱,移除和重入隊。這個功能已經在Palermo中被完成,被包裝為一個通用的Java事務用來處理try/catch塊,并通過管道將失敗信息添加到隊列中,并添加一些關于事務的錯誤信息,(例如)重試次數和在消息元數據的頭中的原始隊列一并發送消息給RabbitMQ。

序列化的問題,已經通過定義一個作業消息來陳述了,這個消息包含作業Java類的頭信息,作業序列化參數和序列化類型.有關參數形象化的一小部分內容,通過一個人眼可識別的字符串,也隨消息一起發送了. Palermo已經通過支持插件和去插件的方式,對系統進行不同的序列化,同時,包含一個基于JSON的序列化轉換器.但是,默認的序列化技術是使用JBoss Serialization.只有作業的參數被序列化,并發送給工作者,作業類的字節碼必須在工作者的class path中可用,以便正確執行作業.
Palermo工作者只是執行實際作業邏輯的一種通用方式,其中作業邏輯是封裝在作業類的定義之中.

Palermo的整個邏輯已經在Clojure編程語言中實現,但是,多虧Clojure Java inter-op特性,它可以在java代碼或其他任意基于JVM的語言中使用.由于Palermo工作者線程在JVM中運行,它們從底層的操作系統中被隔離出來.像Resque工作者那樣,集成在OS之中,很難實現,但是某種程度的集成,在可能的情況下,已經嘗試過,如,為關聯到Palermo工作者的RabbitMQ消費者的身份標識,使用進程標識符.一個命令行的接口也已經實現了,所以,新的工作者,通過腳本和使用者,可以很簡單的啟動.

解決方案的最后一個組件是Palermo 系統中運行的一個web接口。 這只是一個使用了Palermo 類庫自檢特性的簡單的web應用,它讓人們更容易理解Palermo 系統的運行機制。 這個接口是Resque web接口的復制,在管理交換、隊列、工作者方面,它可以作為RabbitMQ 通用接口的替代者。

通過Palermo 類庫,web接口提供的所有功能可以在任意java代碼中進行使用。

基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

基于 RabbitMQ 構建一個類似 Resque 的作業處理系統

考慮到所有之前我們能想到的, Palermo 只是薄薄的一層,它建立在一個特定的 RabbitMQ設置的頂部,并被封裝成一個可重用的作業處理使用模式的實現類庫。 相同的方法可能適用于不同的使用模式,這些模式使用 RabbitMQ作為底層引擎,這樣可以在寫入非特定應用程序的代碼、處理設置和由 RabbitMQ特定配置而引入其余的復雜性方面節省時間。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!