ActiveMQ發消息和收消息
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。下面詳細的解釋常用類的作用
ConnectionFactory 接口(連接工廠) 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。
Connection 接口(連接) 連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。
Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過 JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者) 由會話創建的對象,用于接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者) 由會話創建的對象,用于發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。
Message 接口(消息) 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另一個應用程序。一個消息有三個主要部分: 消息頭(必須):包含用于識別和為消息尋找路由的操作設置。 一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。 一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 消息接口非常靈活,并提供了許多方式來定制消息的內容。
Session 接口(會話) 表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事 務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允 許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。
JMS的消息模式有1.點對點的消息模式(Point to Point Messaging)
2.發布訂閱模式(publish – subscribe Mode)
這里基于點對點的消息模式進行ActiveMQ發消息和收消息過程的分析,請看模型圖:
點對點的消息發送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder(客戶端A) 發送消息,receive(客戶端B)接收消息。具體點就是客戶端A發送Message Queue ,而 客戶端B從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在 任何時刻發送信息到Queue,而不需要知道接收客戶端是不是在運行 。
請看下面發消息和收消息的例子
package com.activemq.queue;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMqTest {
private static String queueName = "activemq_queue_";
public static void main(String[] args) {
Receiver receiver=new Receiver();
Sender sender =new Sender();
try {
sender.send();
receiver.receive();
} catch (Exception e) {
e.printStackTrace();
}
}
static class Receiver {
public static void receive() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
//第一種情況
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
// TODO something....
System.out
.println("收到消息:" +message.getText());
}
session.close();
connection.close();
//----------------第一種情況結束----------------------
//第二種方式
// consumer.setMessageListener(new MessageListener() {
// public void onMessage(Message arg0) {
// if(arg0 instanceof TextMessage){
// try {
// System.out.println("arg0="+((TextMessage)arg0).getText());
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// }
// });
//第三種情況
// while (true) {
// Message msg = consumer.receive(1000);
// TextMessage message = (TextMessage) msg;
// if (null != message) {
// System.out.println("收到消息:" + message.getText());
// }
// }
}
}
static class Sender {
public static void send() throws Exception {
ConnectionFactory connectionFactory = null;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, //null
ActiveMQConnection.DEFAULT_PASSWORD, //null
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("count"+new Date().getTime());
Thread.sleep(1000);
// 通過消息生產者發出消息
System.out.println("發送消息"+i+new Date());
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
}Sender主要的作用是發送消息,Receiver主要的作用是接受消息,并且顯示一下接收消息的內容,這里詳細的解釋接受消息的方法:
(1)第一種方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者會一直等待下去,直到有消息到達,或者超時。
其實第一種方法和第三種方法接受原理一樣,區別是第一種知道要接受消息的條數,接受完消息,手動關系連接。而第三種不知道要接受多少條數據,所以使用while (true) 死循環直接在接受消息。
(2)第二種方法:消息消費者注冊一個MessageListener,當有消息到達的時候,會回調它的onMessage()方法。
這里需要注意的是,你注冊完成MessageListener,千萬不要關閉連接session.close();和connection.close();因為你剛剛注冊完成監聽器,就把連接關閉,就不會受到消息,所以監聽器中也不會有處理。(這個問題可把我整哭了,搞了半天,才弄明白)
請看ActiveMQ 頁面上顯示隊列的信息
name是隊列名稱
Number Of Pending Messages 是隊列中有多少個消息等待出隊列
Number Of Consumers 是隊列中有多少個消費者
Messages Enqueued 是隊列共有多少個信息
Messages Dequeued 是隊列中已經出列多少個消息