ActiveMQ基本介紹

jopen 10年前發布 | 21K 次閱讀 ActiveMQ 消息系統

1、ActiveMQ服務器工作模型

      通過ActiveMQ消息服務交換消息。消息生產者將消息發送至消息服務,消息消費者則從消息服務接收這些消息。這些消息傳送操作是使用一組實現 ActiveMQ應用編程接口 (API) 的對象來執行的。

      ActiveMQ客戶端使用 ConnectionFactory 對象創建一個連接,向消息服務發送消息以及從消息服務接收消息均是通過此連接來進行。Connection 是客戶端與消息服務的活動連接。創建連接時,將分配通信資源以及驗證客戶端。這是一個相當重要的對象,大多數客戶端均使用一個連接來進行所有的消息傳送。  連接用于創建會話。Session 是一個用于生成和使用消息的單線程上下文。它用于創建發送的生產者和接收消息的消費者,并為所發送的消息定義發送順序。會話通過大量確認選項或通過事務來支持可靠傳送。  客戶端使用 MessageProducer 向指定的物理目標(在 API 中表示為目標身份對象)發送消息。生產者可指定一個默認傳送模式(持久性消息與非持久性消息)、優先級和有效期值,以控制生產者向物理目標發送的所有消息。 

同樣,客戶端使用 MessageConsumer 對象從指定的物理目標(在 API 中表示為目標對象)接收消息。消費者可使用消息選擇器,借助它,消息服務可以只向消費者發送與選擇標準匹配的那些消息。  消費者可以支持同步或異步消息接收。異步使用可通過向消費者注冊 MessageListener 來實現。當會話線程調用 MessageListener 對象的 onMessage 方法時,客戶端將使用消息。

 

2、ActiveMQ消息傳送模型

ActiveMQ 支持兩種截然不同的消息傳送模型:PTP(即點對點模型)和Pub/Sub(即發布 /訂閱模型),分別稱作:PTP Domain 和Pub/Sub Domain。 

 

PTP(使用Queue 即隊列目標) 消息從一個生產者傳送至一個消費者。在此傳送模型中,目標是一個隊列。消息首先被傳送至隊列目標,然后根據隊列傳送策略,從該隊列將消息傳送至向此隊列進行注冊的某一個消費者,一次只傳送一條消息。可以向隊列目標發送消息的生產者的數量沒有限制,但每條消息只能發送至、并由一個消費者成功使用。如果沒有已經向隊列目標注冊的消費者,隊列將保留它收到的消息,并在某個消費者向該隊列進行注冊時將消息傳送給該消費者。 

 

Pub/Sub(使用 Topic即主題目標) 消息從一個生產者傳送至任意數量的消費者。在此傳送模型中,目標是一個主題。消息首先被傳送至主題目標,然后傳送至所有已訂閱此主題的活動消費者。可以向主題目標發送消息的生產者的數量沒有限制,并且每個消息可以發送至任意數量的訂閱消費者。主題目標也支持持久訂閱的概念。持久訂閱表示消費者已向主題目標進行注冊,但在消息傳送時此消費者可以處于非活動狀態。當此消費者再次處于活動狀態時,它將接收此信息。如果沒有已經向主題目標注冊的消費者,主題不保留其接收到的消息,除非有非活動消費者注冊了持久訂閱。

 

3、ActiveMQ消息選擇器

ActiveMQ 提供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基于這些屬性的選擇標準來表明對消息是否感興趣。這就簡化了客戶端的工作,并避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增加了一些額外開銷。 消息選擇器是用于MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),并確定是否將實際消費該消息。消息選擇器是一些字符串,它們基于某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創建的一部分。

 

4、ActiveMQ消息簽收

在不帶事務的 Session 中,一條消息何時和如何被簽收取決于Session的設置。 

1.Session.AUTO_ACKNOWLEDGE 

當客戶端從 receive 或 onMessage成功返回時,Session 自動簽收客戶端的這條消息的收條。在AUTO_ACKNOWLEDGE的 Session 中,同步接收 receive是上述三個階段的一個例外,在這種情況下,收條和簽收緊隨在處理消息之后發生。 

2.Session.CLIENT_ACKNOWLEDGE 

    客戶端通過調用消息的 acknowledge 方法簽收消息。在這種情況下,簽收發生在 Session 層面:簽收一個已消費的消息會自動地簽收這個 Session 所有已消費消息的收條。 

3.Session.DUPS_OK_ACKNOWLEDGE 

   此選項指示 Session 不必確保對傳送消息的簽收。它可能引起消息的重復,但是降低了 Session 的開銷,所以只有客戶端能容忍重復的消息,才可使用(如果ActiveMQ 再次傳送同一消息,那么消息頭中的JMSRedelivered 將被設置為 true)。客戶端成功接收一條消息的標志是這條消息被簽收。成功接收一條消息一般包括如下三個階段: 

1.客戶端接收消息; 

2.客戶端處理消息; 

3.消息被簽收。簽收可以由 ActiveMQ發起,也可以由客戶端發起,取決于 Session 簽收模式的設置。 在帶事務的 Session 中,簽收自動發生在事務提交時。如果事務回滾,所有已經接收的消息將會被再次傳送。 

 

5、ActiveMQ消息傳送模式

ActiveMQ 支持兩種消息傳送模式:PERSISTENT 和 NON_PERSISTENT 兩種。 

1.PERSISTENT(持久性消息) 

        這是 ActiveMQ 的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對于這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。這意味著在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由于某種原因導致失敗,它可以恢復此消息并將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。 

2.NON_PERSISTENT(非持久性消息) 

    保證這些消息最多被傳送一次。對于這些消息,可靠性并非主要的考慮因素。此模式并不要求持久性的數據存儲,也不保證消息服務由于某種原因導致失敗后消息不會丟失。 

有兩種方法指定傳送模式: 

1.使用 setDeliveryMode 方法,這樣所有的消息都采用此傳送模式; 

2.使用 send 方法為每一條消息設置傳送模式。

 

6、ActiveMQ優先級設置

通常,可以確保將單個會話向目標發送的所有消息按其發送順序傳送至消費者。然而,如果為這些消息分配了不同的優先級,消息傳送系統將首先嘗試傳送優先級較高的消息。 

有兩種方法設置消息的優先級: 

1.使用 setDeliveryMode 方法,這樣所有的消息都采用此傳送模式; 

2.使用 send 方法為每一條消息設置傳送模式; 

消息優先級從 0-9 十個級別,0-4 是普通消息,5-9 是加急消息。如果不指定優先級,則默認為 4。JMS 不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要先于普通消息到達。

 

7、ActiveMQ消息過期設置

允許消息過期 。默認情況下,消息永不會過期。如果消息在特定周期內失去意義,那么可以設置過期時間。 

有兩種方法設置消息的過期時間,時間單位為毫秒: 

1.使用 setTimeToLive 方法為所有的消息設置過期時間; 

2.使用 send 方法為每一條消息設置過期時間。

消息過期時間,send 方法中的 timeToLive 值加上發送時刻的 GMT 時間值。如果 timeToLive 值等于零,則 JMSExpiration 被設為零, 表示該消息永不過期。如果發送后,在消息過期時間之后消息還沒有被發送到目的地,則該消息被清除。 

 

8、ActiveMQ持久訂閱設置

通過為發布者設置 PERSISTENT傳送模式,為訂閱者時使用持久訂閱,這樣可以保證 Pub/Sub 程序接收所有發布的消息。 

消息訂閱分為非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處于激活狀態,也就是和 ActiveMQ 保持連接狀態才能收到發送到某個主題的消息,而當客戶端處于離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。持久訂閱時,客戶端向 ActiveMQ 注冊一個識別自己身份的 ID,當這個客戶端處于離線時,ActiveMQ會為這個 ID 保存所有發送到主題的消息,當客戶端再次連接到ActiveMQ 時, 會根據自己的 ID 得到所有當自己處于離線時發送到主題的消息。持久訂閱會增加開銷,同一時間在持久訂閱中只有一個激活的用戶。 建立持久訂閱的步驟: 

1. 為連接設置一個客戶 ID; 

2. 為訂閱的主題指定一個訂閱名稱; 

上述組合必須唯一。 

 

 

9、ActiveMQ異步發送消息

ActiveMQ支持生產者以同步或異步模式發送消息。使用不同的模式對 send方法的 反應時間有巨大的影響,反映時間是衡量ActiveMQ 吞吐量的重要因素,使用異步發送

可以提高系統的性能。 在默認大多數情況下,AcitveMQ是以異步模式發送消息。例外的情況:在沒有使用事務的情況下,生產者以 PERSISTENT傳送模式發送消息。在這種情況下,send方法都是同步的,并且一直阻塞直到 ActiveMQ發回確認消息:消息已經存儲在持久性數據存儲中。這種確認機制保證消息不會丟失,但會造成生產者阻塞從而影響反應時間。 

高性能的程序一般都能容忍在故障情況下丟失少量數據。如果編寫這樣的程序,可以通過使用異步發送來提高吞吐量(甚至在使用PERSISTENT 傳送模式的情況下)。

 

10、ActiveMQ消費者特性

(1)消費者異步分派 

在 ActiveMQ4 中,支持 ActiveMQ 以同步或異步模式向消費者分派消息。這樣的意義:可以以異步模式向處理消息慢的消費者分配消息;以同步模式向處理消息快的消費

者分配消息。 ActiveMQ默認以同步模式分派消息(setDispatchAsync(false)或TEST.QUEUE?consumer.dispatchAsync=false),這樣的設置可以提高性能。但是對于處理消息慢的消費者,需要以異步模式分派。 

 

(2)消費者優先級

在 ActveMQ 分布式環境中,在有消費者存在的情況下,如果更希望ActveMQ 發送消給消費者而不是其他的 ActveMQ 到ActveMQ 的傳送,可以如下設置: TEST.QUEUE?consumer.prority=10

 

(3)獨占消費者

ActiveMQ維護隊列消息的順序并順序把消息分派給消費者。但是如果建立了多個Session 和 MessageConsumer,那么同一時刻多個線程同時從一個隊列中接收消息時就并

不能保證處理時有序。 有時候有序處理消息是非常重要的。ActiveMQ4 支持獨占的消費。ActiveMQ 挑選一個 MessageConsumer, 并把一個隊列中所有消息按順序分派給它。 如果消費者發生故障,那么 ActiveMQ 將自動故障轉移并選擇另一個消費者。可以如下設置: TEST.QUEUE?consumer.exclusive=true

 

11、ActiveMQ消息預取機制

ActiveMQ的目標之一就是高性能的數據傳送,所以 ActiveMQ 使用“預取限制”來 控制有多少消息能及時的傳送給任何地方的消費者。 一旦預取數量達到限制,那么就不會有消息被分派給這個消費者直到它發回簽收消息(用來標識所有的消息已經被處理)。 可以為每個消費者指定消息預取。如果有大量的消息并且希望更高的性能,那么可以為這個消費者增大預取值。如果有少量的消息并且每條消息的處理都要花費很長的時間,那么可以設置預取值為 1,這樣同一時間,ActiveMQ 只會為這個消費者分派一條消息。如:TEST.QUEUE?consumer.prefetchSize=10

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!