JMS 實例講解

jopen 10年前發布 | 23K 次閱讀 JMS 消息系統

1. JMS基本概念
     JMS(Java Message Service) 即Java消息服務。它提供標準的產生、發送、接收消息的接口簡化企業應用的開發。它支持兩種消息通信模型:點到點(point-to-point)(P2P)模型和發布/訂閱(Pub/Sub)模型。P2P 模型規定了一個消息只能有一個接收者;Pub/Sub 模型允許一個消息可以有多個接收者。
    對于點到點模型,消息生產者產生一個消息后,把這個消息發送到一個Queue(隊列)中,然后消息接收者再從這個Queue中讀取,一旦這個消息被一個接收者讀取之后,它就在這個Queue中消失了,所以一個消息只能被一個接收者消費。
    與點到點模型不同,發布/訂閱模型中,消息生產者產生一個消息后,把這個消息發送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個消息到達這個Topic之后,所有消息接收者都會收到這個消息。
     

簡單的講,點到點模型和發布/訂閱模型的區別就是前者是一對一,后者是一對多。

2. 幾個重要概念
   Destination:消息發送的目的地,也就是前面說的Queue和Topic。創建好一個消息之后,只需要把這個消息發送到目的地,消息的發送者就可以繼續做自己的事情,而不用等待消息被處理完成。至于這個消息什么時候,會被哪個消費者消費,完全取決于消息的接受者。
  Message:從字面上就可以看出是被發送的消息。它有下面幾種類型:
        StreamMessage:Java 數據流消息,用標準流操作來順序的填充和讀取。
        MapMessage:一個Map類型的消息;名稱為 string 類型,而值為 Java 的基本類型。
        TextMessage:普通字符串消息,包含一個String。
        ObjectMessage:對象消息,包含一個可序列化的Java 對象
        BytesMessage:二進制數組消息,包含一個byte[]。
        XMLMessage: 一個XML類型的消息。
    最常用的是TextMessage和ObjectMessage。
   Session:與JMS提供者所建立的會話,通過Session我們才可以創建一個Message。
   Connection:與JMS提供者建立的一個連接。可以從這個連接創建一個會話,即Session。
   ConnectionFactory:那如何創建一個Connection呢?這就需要下面講到的ConnectionFactory了。通過這個工廠類就可以得到一個與JMS提供者的連接,即Conection。
   Producer:消息的生產者,要發送一個消息,必須通過這個生產者來發送。
   MessageConsumer:與生產者相對應,這是消息的消費者或接收者,通過它來接收一個消息。
    前面多次提到JMS提供者,因為JMS給我們提供的只是一系列接口,當我們使用一個JMS的時候,還是需要一個第三方的提供者,它的作用就是真正管理這些Connection,Session,Topic和Queue等。

    通過下面這個簡圖可以看出上面這些概念的關系。

ConnectionFactory---->Connection--->Session--->Message
Destination + Session------------------------------------>Producer
Destination + Session------------------------------------>MessageConsumer

    那么可能有人會問: ConnectionFactory和Destination 從哪兒得到?
    這就和JMS提供者有關了. 如果在一個JavaEE環境中, 可以通過JNDI查找得到, 如果在一個非JavaEE環境中, 那只能通過JMS提供者提供給我們的接口得到了.

 

前一講簡單的介紹了一下JMS的基本概念, 這一講結合一個例子讓大家深入理解前一講的基本概念. 首先需要做的是選擇一個JMS提供者, 如果在JavaEE環境中可以不用考慮這些. 我們選擇ActiveMQ, 官方地址: http://activemq.apache.org/. 網上有很多介紹ActiveMQ的文檔, 所以在這里就不介紹了.

按照上一講的這個簡圖,

ConnectionFactory---->Connection--->Session--->Message
Destination + Session------------------------------------>Producer
Destination + Session------------------------------------>MessageConsumer

首先需要得到ConnectionFactoy和Destination,這里創建一個一對一的Queue作為Destination。
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Queue queue = new ActiveMQQueue("testQueue");

然后又ConnectionFactory創建一個Connection, 再啟動這個Connection:
Connection connection = factory.createConnection();
connection.start();

接下來需要由Connection創建一個Session:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    現在暫且不用管參數的含義, 以后會詳細講到.

下面就可以創建Message了,這里創建一個TextMessage。
Message message = session.createTextMessage("Hello JMS!");

要想把剛才創建的消息發送出去,需要由Session和Destination創建一個消息生產者:
MessageProducer producer = session.createProducer(queue);

下面就可以發送剛才創建的消息了:
producer.send(message);

消息發送完成之后,我們需要創建一個消息消費者來接收這個消息:
MessageConsumer comsumer = session.createConsumer(queue);
Message recvMessage = comsumer.receive();

消息消費者接收到這個消息之后,就可以得到它的內容:
System.out.println(((TextMessage)recvMessage).getText());

至此,一個簡單的JMS例子就完成了。下面是全部:

注:需要導入相關的jar包,如果使用maven管理項目,則需要在pom.xml中加入如下依賴:

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>

之后創建java文件:

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class MessageSendAndReceive {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = 
                new ActiveMQConnectionFactory("vm://localhost");
   
        Connection connection = factory.createConnection();
        connection.start();
       
        Queue queue = new ActiveMQQueue("testQueue");
       
        final Session session = 
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Message message = session.createTextMessage("Hello JMS!");
       
        MessageProducer producer = session.createProducer(queue);
        producer.send(message);
   
        System.out.println("Send Message Completed!");
       
        MessageConsumer comsumer = session.createConsumer(queue);
        Message recvMessage = comsumer.receive();
        System.out.println(((TextMessage)recvMessage).getText());
    }
}

消息的消費者接收消息可以采用兩種方式:

1、consumer.receive() 或 consumer.receive(int timeout);
2、注冊一個MessageListener。

采用第一種方式,消息的接收者會一直等待下去,直到有消息到達,或者超時。后一種方式會注冊一個監聽器,當有消息到達的時候,會回調它的onMessage()方法。下面舉例說明:

MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
    @Override
    public void onMessage(Message m) {
        TextMessage textMsg = (TextMessage) m;
        try {
            System.out.println(textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }         
});

 

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!