Java消息服務概要
一、消息中間件的作用
消息中間件,Message-Oriented Middleware,簡稱MOM。采用消息中間件的作用一般有兩點:一是解耦,二是異步(起到削峰填谷的作用)
二、JMS的基本規范
1、消息傳送模型
-
點對點模型(基于隊列的模式,如果有多個消費者,那么這些消費者輪流消費消息,達到負載均衡)
-
發布訂閱模型基于topic的發布/訂閱模型較為流行,通常以此模型為主,外加點對點模型實現生產者/消費者的負載均衡。
2、消息接收模型
-
推技術:消息服務器主動推送消息到消息消費者(JMS的Pub/Sub采用的是推技術)
-
拉技術:消息消費者定時去消息服務器取消息(JMS接收者能夠推送或拉取消息,取決于它是否使用異步onMessage回調或者是同步receive方法)
同步receive方式就是拉的方式,消費者主動去消息服務器取消息,異步的listener方式為推的方式。
同步receive的實例:
// Create the sender and send the message QueueSender qSender = qSession.createSender(requestQ); qSender.send(msg); // Wait to see if the loan request was accepted or declined String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter); TextMessage tmsg = (TextMessage)qReceiver.receive(30000); if (tmsg == null) { System.out.println("Lender not responding"); } else { System.out.println("Loan request was " + tmsg.getText()); }
3、基本概念
基本的概念:消息生產者、消息消費者、消息、目的地/topic/路由
(1)JMS1.X的api規范
(2)JMS2的api規范
4、消息格式
一條消息可分為 3 部分:消息頭(Head)、屬性(Attribute)和有效負載(Payload,即消息體)。
(1)消息頭
這部分通常是結構化的數據,提供了和消息有關的元數據。
-
消息唯一標識
-
目的地
-
傳送方式(持久/非持久)
-
消息接收時間
-
消息失效時間
-
消息優先級
-
是否重發等
(2)消息屬性
用于JMS的高級功能特性,以及存儲一些應用特有的屬性或者JMS廠商提供的額外功能屬性。
-
A、高級功能:groupId/groupSeq,消息的分組聚合
-
B、應用自己添加的屬性:比如username等
-
C、特殊廠商的額外功能屬性
(3)消息體
一般可以有結構化的和非結構化的,JMS里頭定義的比較詳細,有Text、Object、Bytes、Stream、Map等格式。但是一般大多byte[]格式,自己可以選擇自定義的序列化方式。
三、消息分發
1、消息選擇器
在目的地上使用消息選擇器,利用消息屬性和消息頭(無法使用消息體內的數據)作為條件表達式的準則,消息在目的地(隊列/主題)分發給消費者之前就過濾好。(rabbitmq有基于routing key的表達式過濾方式,來選擇接收哪幾個topic的消息)
2、消費者控制還是生產者控制
(1)MessageFilter,消費者控制了消息過濾,并決定它要接受什么消息
-
消息在目的地分發給消費者之前過濾。
-
使用MessageFitler的優點是,具有更強的可伸縮性,假設增加了一個CustType為PLATINUM級別,那么只要增加一個相應的消息消費者就可以。
(2)Multiple Destination,多目的方式,生產者控制消息分發
-
消息在發送給目的之前過濾,分發給不同目的地不同的消息
-
如果是使用Multiple Destination方式,就是增加一個隊列來保存PLATINUM,同時還得增加一個類來監聽這個新的隊列。
-
MultipleDestination的好處是生產者控制消息過濾分類,不容易出錯,但是可擴展性稍差,MessageFilter是可擴展性好,但是容易出錯。
3、未能過濾掉的消息如何處理
1)發布訂閱模型:
這些消息不會傳送給該訂閱者,不論是持久訂閱還是非持久訂閱
2)點對點模型:
-
未被消費者選擇的所有消息,對該消費者都是不可見的。
-
確認由發送者或發布者生產的所有消息各自都有和它們相關聯的有效期,默認情況是永不過期,那么意味著如果一條消息被過濾掉,沒有傳送給消費者,它將在隊列中永久駐留,可以通過設置生存時間來解決。
-
有些廠商提出了死信隊列(Dead Letter Queue,DLQ),或停用消息隊列(Dead Message Queue,DMQ)的概念,來處理那些被認為無法傳送的消息。
最簡單的情況,消息傳送系統將所有無法傳送的消息放入DMQ,而應用程序負責監測它的內容。也可以支持管理型事件,能夠在消息放入DMQ時通知應用程序。
對于rabbitmq,有一個dead letter的機制(當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange,這個Exchange就是DLX)。
四、消息可靠性基礎
1、消息保存轉發機制
-
保證傳送機制(guaranteed delivery),確保即便發生了具備故障,預定消費者最終也會接收到這條消息。
-
當消費者出故障時,將消息保存到持久化介質中,等待消費者恢復之后,從持久介質取出消息,轉發給消費者
2、DeliveryMode
public interface DeliveryMode { /** This is the lowest-overhead delivery mode because it does not require * that the message be logged to stable storage. The level of JMS provider * failure that causes a <CODE>NON_PERSISTENT</CODE> message to be lost is * not defined. * * <P>A JMS provider must deliver a <CODE>NON_PERSISTENT</CODE> message * with an * at-most-once guarantee. This means that it may lose the message, but it * must not deliver it twice. */ static final int NON_PERSISTENT = 1; /** This delivery mode instructs the JMS provider to log the message to stable * storage as part of the client's send operation. Only a hard media * failure should cause a <CODE>PERSISTENT</CODE> message to be lost. */ static final int PERSISTENT = 2; }
Message默認是PERSISTENT模式。
這個地方是JMS里頭容易讓人混亂的地方,其本質上,還是利用消息頭的deliveryMode屬性來標記的。
可以有兩個地方可以設置:
(1)對于消息生產者來說
是發送消息的時候,設置是否持久化;
msg.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
默認就是持久化模式的。
(2)對于消息消費者來說
是在接收消息之前設置傳送模式。比如對于topic提供durableSubscriber的方法。
public interface TopicSession extends javax.jms.Session { javax.jms.Topic createTopic(java.lang.String s) throws javax.jms.JMSException; javax.jms.TopicSubscriber createSubscriber(javax.jms.Topic topic) throws javax.jms.JMSException; javax.jms.TopicSubscriber createSubscriber(javax.jms.Topic topic, java.lang.String s, boolean b) throws javax.jms.JMSException; javax.jms.TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, java.lang.String s) throws javax.jms.JMSException; javax.jms.TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, java.lang.String s, java.lang.String s1, boolean b) throws javax.jms.JMSException; javax.jms.TopicPublisher createPublisher(javax.jms.Topic topic) throws javax.jms.JMSException; javax.jms.TemporaryTopic createTemporaryTopic() throws javax.jms.JMSException; void unsubscribe(java.lang.String s) throws javax.jms.JMSException; }
3、丟失重連
當客戶端和服務器之間的網絡連接丟失時,JMS提供者必須盡可能重新建立連接。如果該JMS提供者無法重新自動連接,在客戶端調用某種能夠引起網絡流量的方法時,提供者必須拋出一個異常,向客戶端通知這個情況。JMS提供了ExceptionListener接口,用于捕獲丟失的連接(可以在捕獲時重連),并向客戶端通知這個情況。與MessageListener不同,MessageListener是與會話綁定在一起的。
五、消息確認機制
一般是自動確定、手動確定。
1、消息生產者發送消息時
TopicPublisher.publish和QueueSender.send方法是同步的,這些方法負責發送消息,同時進行阻塞,直到從消息服務器接收到一個確認為止。一旦接收到一個確認,執行線程就會恢復并返回方法,認為消息發送成功。底層確認對客戶端編程模型來說是不可見的。如果在這個操作期間發生了一個故障情況,就會拋出一個異常,同時認為該消息未被傳送(注意重新發送指的是消費服務器到消費者的重新發送)。
2、消息消費者接收消息時
如果會話是AUTO_ACKNOWLEDGE模式,當每個消費者獲得消息時,JMS提供者的客戶端運行時環境必須自動向服務器發送確認信息。如果服務器沒有接收到這個確認信息,它就會認為該消息未被傳送,并可能會試圖重新傳送。
3、消息服務器收發消息時
(1)服務器接收到生產者的消息,發送確認給生產者
確認消息從服務器發送到生產者,意味著服務器已經接收到該消息,并已經承擔了傳送它的責任。從JMS服務器的角度來看,發送到生產者的確認并未和消息傳送直接關聯。邏輯上,它們是兩個獨立的步驟。
-
A、對于持久消息來說,服務器將消息寫入磁盤,然后在再通知生產者該消息已經被接收。
-
B、對于非持久消息,意味著服務器能夠在接收到消息后立刻通知發送者,并將該消息存入內存。如果該消息的主題沒有訂閱者,根據廠商的不同,也可能會將該消息拋棄。
(2)消息消費者接收時確認
-
A、對于持久訂閱者來說一直到消息服務器接收到所有的消息預定接收者的確認時,消息服務器才會認為該消息已經完成傳送。要獲得這些信息,必須對每個消費者都非常了解:哪一個客戶端已經接收到每條消息,哪一個還沒有接收到。一旦消息服務器將消息傳送給所有的已知訂閱者,并已分別從訂閱者那里接收到確認,就會將這條消息從持久存儲器中刪除。如果持久訂閱,而訂閱者當前并未連接,那么消息服務器會將該消息保存起來,直到該訂閱者變成可用狀態或消息到期為止。甚至對于非持久消息來說,也是如此。也就是說,如果消息消費者設定為持久訂閱,則不管消息生產者設定消息是否是持久的,當消費者不在時,總是保存,恢復時轉發。
-
B、對于非持久性消息來說在消息服務器已經向發送者確認消息后,以及消息服務器有機會代表未連接的持久訂閱者將消息寫入磁盤之前,二者之間可能會有一個時間窗,如果JMS在這個時間窗內出現故障,該消息就可能丟失。
-
C、如果是使用持久消息時一個提供者可能會出現故障,但是會優雅恢復正常。由于消息保存在持久存儲器中,它們并沒有丟失,在提供者再次啟動時,它們又會傳送給消費者。如果是點對點隊列,它們能夠保證被傳送出去,如果是發布訂閱發送,只有消費者的訂閱為持久性時,才能被保證傳送出去,非持久訂閱的傳送行為因提供者不同而不同。
六、消息的事務機制
1、生產者提交消息的事務
-
生產者在commit之前,JMS提供者不會開始向它的消費者傳送消息,即使它已經從發送者那里接收到所有的消息。
-
發送消息時,如果在發送消息的方法正常完成后沒有調用commit方法,JMS提供者會從隊列中刪除這些消息,而這些消息并沒有傳送給消費者。
2、消費者消費消息的事務
-
消息會盡可能快地傳送給接收者,但是它們一直由JMS提供者保存,直到接收者在會話對象上發布commit為止,如果發生了故障或調用了rollback,提供會試圖重新傳送這些消息,這種情況下,這些消息會設置為重新傳送標記。
-
接收消息時,如果在接收消息方法正常完成后沒有調用commit方法,消息就會被標記為未被傳送,JMS提供者會將這些消息重新傳送給消費者,并將JMSRedelivered標記為true,表示此前曾試圖處理過這些消息。