JMS 實例講解
1. JMS基本概念
JMS(Java Message Service) 即Java消息服務。它提供標準的產生、發送、接收消息的接口簡化企業應用的開發。它支持兩種消息通信模型:點到點(point-to-point)(P2P)模型和發布/訂閱(Pub/Sub)模型。P2P 模型規定了一個消息只能有一個接收者;Pub/Sub 模型允許一個消息可以有多個接收者。
對于點到點模型,消息生產者產生一個消息后,把這個消息發送到一個Queue(隊列)中,然后消息接收者再從這個Queue中讀取,一旦這個消息被一個接收者讀取之后,它就在這個Queue中消失了,所以一個消息只能被一個接收者消費。
與點到點模型不同,發布/訂閱模型中,消息生產者產生一個消息后,把這個消息發送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個消息到達這個Topic之后,所有消息接收者都會收到這個消息。
簡單的講,點到點模型和發布/訂閱模型的區別就是前者是一對一,后者是一對多。
2. 幾個重要概念
Destination:消息發送的目的地,也就是前面說的Queue和Topic。創建好一個消息之后,只需要把這個消息發送到目的地,消息的發送者就可以繼續做自己的事情,而不用等待消息被處理完成。至于這個消息什么時候,會被哪個消費者消費,完全取決于消息的接受者。
Message:從字面上就可以看出是被發送的消息。它有下面幾種類型:
StreamMessage:Java 數據流消息,用標準流操作來順序的填充和讀取。
MapMessage:一個Map類型的消息;名稱為 string 類型,而值為 Java 的基本類型。
TextMessage:普通字符串消息,包含一個String。
ObjectMessage:對象消息,包含一個可序列化的Java 對象
BytesMessage:二進制數組消息,包含一個byte[]。
XMLMessage: 一個XML類型的消息。
最常用的是TextMessage和ObjectMessage。
Session:與JMS提供者所建立的會話,通過Session我們才可以創建一個Message。
Connection:與JMS提供者建立的一個連接。可以從這個連接創建一個會話,即Session。
ConnectionFactory:那如何創建一個Connection呢?這就需要下面講到的ConnectionFactory了。通過這個工廠類就可以得到一個與JMS提供者的連接,即Conection。
Producer:消息的生產者,要發送一個消息,必須通過這個生產者來發送。
MessageConsumer:與生產者相對應,這是消息的消費者或接收者,通過它來接收一個消息。
前面多次提到JMS提供者,因為JMS給我們提供的只是一系列接口,當我們使用一個JMS的時候,還是需要一個第三方的提供者,它的作用就是真正管理這些Connection,Session,Topic和Queue等。
通過下面這個簡圖可以看出上面這些概念的關系。
ConnectionFactory---->Connection--->Session--->Message
Destination + Session------------------------------------>Producer
Destination + Session------------------------------------>MessageConsumer
那么可能有人會問: ConnectionFactory和Destination 從哪兒得到?
這就和JMS提供者有關了. 如果在一個JavaEE環境中, 可以通過JNDI查找得到, 如果在一個非JavaEE環境中, 那只能通過JMS提供者提供給我們的接口得到了.
前一講簡單的介紹了一下JMS的基本概念, 這一講結合一個例子讓大家深入理解前一講的基本概念. 首先需要做的是選擇一個JMS提供者, 如果在JavaEE環境中可以不用考慮這些. 我們選擇ActiveMQ, 官方地址: http://activemq.apache.org/. 網上有很多介紹ActiveMQ的文檔, 所以在這里就不介紹了.
按照上一講的這個簡圖,
ConnectionFactory---->Connection--->Session--->Message
Destination + Session------------------------------------>Producer
Destination + Session------------------------------------>MessageConsumer
首先需要得到ConnectionFactoy和Destination,這里創建一個一對一的Queue作為Destination。
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Queue queue = new ActiveMQQueue("testQueue");
然后又ConnectionFactory創建一個Connection, 再啟動這個Connection:
Connection connection = factory.createConnection();
connection.start();
接下來需要由Connection創建一個Session:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
現在暫且不用管參數的含義, 以后會詳細講到.
下面就可以創建Message了,這里創建一個TextMessage。
Message message = session.createTextMessage("Hello JMS!");
要想把剛才創建的消息發送出去,需要由Session和Destination創建一個消息生產者:
MessageProducer producer = session.createProducer(queue);
下面就可以發送剛才創建的消息了:
producer.send(message);
消息發送完成之后,我們需要創建一個消息消費者來接收這個消息:
MessageConsumer comsumer = session.createConsumer(queue);
Message recvMessage = comsumer.receive();
消息消費者接收到這個消息之后,就可以得到它的內容:
System.out.println(((TextMessage)recvMessage).getText());
至此,一個簡單的JMS例子就完成了。下面是全部:
注:需要導入相關的jar包,如果使用maven管理項目,則需要在pom.xml中加入如下依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
之后創建java文件:
import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class MessageSendAndReceive { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = factory.createConnection(); connection.start(); Queue queue = new ActiveMQQueue("testQueue"); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Message message = session.createTextMessage("Hello JMS!"); MessageProducer producer = session.createProducer(queue); producer.send(message); System.out.println("Send Message Completed!"); MessageConsumer comsumer = session.createConsumer(queue); Message recvMessage = comsumer.receive(); System.out.println(((TextMessage)recvMessage).getText()); } }
消息的消費者接收消息可以采用兩種方式:
1、consumer.receive() 或 consumer.receive(int timeout);
2、注冊一個MessageListener。
采用第一種方式,消息的接收者會一直等待下去,直到有消息到達,或者超時。后一種方式會注冊一個監聽器,當有消息到達的時候,會回調它的onMessage()方法。下面舉例說明:
MessageConsumer comsumer = session.createConsumer(queue); comsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message m) { TextMessage textMsg = (TextMessage) m; try { System.out.println(textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } });