ActiveMQ的使用

jopen 9年前發布 | 36K 次閱讀 ActiveMQ 消息系統

ActiveMQ 是Apache出品的開源消息總線。完全支持JMS1.1規范

首先我們要了解一下JMS

JMS簡介

Java消息服務(Java Message Service,JMS)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

Java消息服務的規范包括兩種消息模式,點對點和發布者/訂閱者。許多提供商支持這一通用框架因此,程序員可以在他們的分布式軟件中實現面向消息的操作,這些操作將具有不同面向消息中間件產品的可移植性。

Java消息服務支持同步和異步的消息處理,在某些場景下,異步消息是必要的;在其他場景下,異步消息比同步消息操作更加便利。

Java消息服務支持面向事件的方法接收消息,事件驅動的程序設計現在被廣泛認為是一種富有成效的程序設計范例

在應用系統開發時,Java消息服務可以推遲選擇面對消息中間件產品,也可以在不同的面對消息中間件切換。

JMS元素:

     JMS由以下7個元素組成

1.JMS提供者(JMS provider)

連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。

2. JMS客戶(JMS client)

生產或消費消息的基于Java的應用程序或對象。

3. JMS生產者(JMS producer/publisher)

創建并發送消息的JMS客戶。

4. JMS消費者(JMS consumer/subscriber)

接收消息的JMS客戶。

5. JMS消息(JMS message)

包括可以在JMS客戶之間傳遞的數據的對象

6. JMS隊列(JMS queue)

一個容納那些被發送的等待閱讀的消息的區域。隊列暗示,這些消息將按照順序發送。一旦一個消息被閱讀,該消息將被從隊列中移走。

7. JMS主題(JMS topic)

一種支持發送消息給多個訂閱者的機制。

JMS消息傳輸方式:

         Java消息服務應用程序結構支持兩種模型:

點對點或隊列模型(Point-to-point model)

一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列。這種模式被概括為:

·         只有一個消費者將獲得消息

·         生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。

·         每一個成功處理的消息都由接收者簽收

發布者/訂閱者模型(Publish/subscribe model)

支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:

·         多個消費者可以獲得消息

·         在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。

使用Java語言,JMS提供了將應用與提供數據的傳輸層相分離的方式。同一組Java可以通過JNDI中關于提供者的信息,連接不同的JMS提供者。這一組類首先使用一個連接工廠以連接到隊列或主題,然后發送或發布消息。在接收端,客戶接收或訂閱這些消息。

JMS應用程序接口

Java消息服務的API在javax.jms(J2EE)包中提供。

ConnectionFactory 接口(連接工廠)

用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。

Connection 接口(連接)

連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。

Destination 接口(目標)

目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。

MessageConsumer 接口(消息消費者)

由會話創建的對象,用于接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。

MessageProducer 接口(消息生產者)

由會話創建的對象,用于發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。

Message 接口(消息)

是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另一個應用程序。一個消息有三個主要部分:

1.   消息頭(必須):包含用于識別和為消息尋找路由的操作設置。

2.   一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。

3.   一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。

消息接口非常靈活,并提供了許多方式來定制消息的內容。

Session 接口(會話)

表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。

在早期版本的 JMS 中,必須使用的 pub/sub 和點對點編程模型具有不同的類層次結構。保留這些類層次結構是為了支持與早期版本的 JMS API 的向后兼容,但鼓勵客戶端開發人員使用通用接口。

                     PTP 與 Pub/Sub 接口之間的關系

JMS 通用 PTP 域 Pub/Sub 域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

下面對這些 JMS 概念進行簡單定義。有關更多信息,請參閱 JMS 規范的 PTP 和 Pub/Sub 章節。

  • ConnectionFactory - 客戶端用于創建 Connection 的管理對象

  • Connection - 到 JMS 提供者的活動連接

  • Destination - 封裝消息目的地標識的管理對象

  • Session - 發送和接收消息的單線程上下文

  • MessageProducer - Session 創建的對象,用于將消息發送到目的地

  • MessageConsumer - Session 創建的對象,用于接收發送到目的地的消息

使用ActiveMQ

部署ActiveMQ

       首先我們可以到官方下載頁去下載最新的ActiveMQ的部署程序(http://activemq.apache.org/download-archives.html)

將程序解壓后如果已經配置了java環境可以直接運行安裝目錄/bin下的activemq.bat來啟動activemq程序

activemq使用了jetty服務器來進行管理,可以在conf/jetty.xml文件中對其配置,conf/activemq.xml文件中對activemq進行配置

在Java項目中使用activemq

         在java工程中導入ActiveMQ需要的包

         需要如下包:

·         activemq-core.jar

·         activeio-core.jar

·         *kahadb.jar (if you wish to use persistence,如果要使用持久化需要此jar包)

·         slf4j-api.jar

·         J2EE Jars

  • geronimo-spec-jms.jar

  • geronimo-spec-jta.jar

  • geronimo-spec-j2ee-management.jar

也可以使用默認的activemq-all.jar,下載地址http://mvnrepository.com/artifact/org.apache.activemq/activemq-all)

生產消息過程

// Create a ConnectionFactory,創建連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 
// Create a Connection,創建連接
Connection connection = connectionFactory.createConnection();
connection.start();//打開連接
 
// Create a Session//創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//指定ACK_Mode簽收確認模式為自動確認
 
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");//創建消息目標(點對點模型隊列)
//Destination destination = session.createTopic("TEST.FOO");//創建消息目標(訂閱主題)
// Create a MessageProducer from the Session to the Topic or Queue,創建消息生產者
MessageProducer producer = session.createProducer(destination);//創建消息生產者
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//指定傳輸模式-非持久性消息
 
// Create a messages,創建消息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);//創建文本消息
 
// Tell the producer to send the message
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);//發送消息
 
// Clean up
session.close();//關閉會話
connection.close();//關閉連接

消費消息過程

示例代碼:

 
// Create a ConnectionFactory,創建連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 
// Create a Connection,創建連接
Connection connection = connectionFactory.createConnection();
connection.start();//打開連接
 
connection.setExceptionListener(this);//指定連接使用的異常監聽器
 
// Create a Session,創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //指定ACK_Mode簽收確認模式為自動確認
 
 
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");//創建消息目標(點對點模型隊列)
//Destination destination = session.createTopic("TEST.FOO");//創建消息目標(訂閱主題)
 
// Create a MessageConsumer from the Session to the Topic or Queue//創建消息消費者
MessageConsumer consumer = session.createConsumer(destination);
 
// Wait for a message
Message message = consumer.receive(1000);//接收1000毫秒內到達的消息,如果沒有收到此方法將阻塞等待直到指定超時時間
 
if (message instanceof TextMessage) {//判斷消息類型是否為文本消息
    TextMessage textMessage = (TextMessage) message;
    String text = textMessage.getText();
    System.out.println("Received: " + text);
} else {
    System.out.println("Received: " + message);
}
 
consumer.close();//關閉消費者
session.close();//關閉會話
connection.close();//關閉連接

消息消費者有兩種消息接收方式

1.  上面示例中的調用receive方法,使用阻塞的方式獲取消息

2.  使用MessageConsumer對象的setMessageListener方法設置監聽的方式,設置監聽后如果關閉Consumer、session或connection將終止監聽

JMS消息類型

    JMS API 定義了五種消息正文類型:

  • Stream(流)- StreamMessage 對象的消息正文包含 Java 編程語言原始值流(“Java 基本類型”)。按順序填充和讀取。

  • Map(映射)- MapMessage 對象的消息正文包含一組名稱-值對,其中名稱是 String 對象,值是 Java 基本類型。可以根據名稱按順序或隨機訪問這些條目。條目的順序是不確定的。

  • Text(文本)- TextMessage 對象的消息正文包含 java.lang.String 對象。此消息類型可用于傳輸純文本消息和 XML 消息。

  • Object(對象)- ObjectMessage 對象的消息正文包含 Serializable Java 對象。

  • Byte(字節)- BytesMessage 對象的消息正文包含未解釋的字節流。此消息類型可以按字面意義編碼正文,以匹配現有的消息格式。在大多數情況下,可以使用更易用的其他正文類型。盡管 JMS API 允許將消息屬性用于字節消息,但一般不使用它們,因為包含屬性可能會影響格式。

ACK_MODE確認模式

確認模式用來指示會話如何確認收到消息,在使用Connection的createSession方法時需要指定此選項,在JMS的Session接口中包含下面四個選項

· AUTO_ACKNOWLEDGE = 1    自動確認

當會話從對 receive 的調用成功返回時,或在會話已調用的用于處理消息的消息偵聽器成功返回時,會話會自動確認客戶端的消息接收。


·  CLIENT_ACKNOWLEDGE = 2    客戶端手動確認

通過此確認模式,客戶端通過調用消息的 acknowledge 方法確認已使用的消息。 確認已使用的消息將確認該會話已使用的所有消息。

·  DUPS_OK_ACKNOWLEDGE = 3    自動批量確認

此確認模式指示會話延遲確認消息的傳送。這可能在 JMS 提供者失敗的情況下導致傳送某些重復消息,因此只有能允許重復消息的使用方才應使用此模式。使用此模式可以通過最大限度地減少會話為防止重復所做的工作,從而減少會話開銷。

·  SESSION_TRANSACTED = 0    事務提交并確認

如果會話是事務的則使用此模式,忽略設置的其他模式值

在事務開啟之后,和session.commit()之前,所有消費的消息,要么全部正常確認,要么全部redelivery。這種嚴謹性,通常在基于GROUP(消息分組)或者其他場景下特別適合。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!