ActiveMQ發消息和收消息
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。下面詳細的解釋常用類的作用
ConnectionFactory 接口(連接工廠) 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。
Connection 接口(連接) 連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。
Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過 JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者) 由會話創建的對象,用于接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者) 由會話創建的對象,用于發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。
Message 接口(消息) 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另一個應用程序。一個消息有三個主要部分: 消息頭(必須):包含用于識別和為消息尋找路由的操作設置。 一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。 一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 消息接口非常靈活,并提供了許多方式來定制消息的內容。
Session 接口(會話) 表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事 務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允 許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。
JMS的消息模式有1.點對點的消息模式(Point to Point Messaging)
2.發布訂閱模式(publish – subscribe Mode)
這里基于點對點的消息模式進行ActiveMQ發消息和收消息過程的分析,請看模型圖:
點對點的消息發送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder(客戶端A) 發送消息,receive(客戶端B)接收消息。具體點就是客戶端A發送Message Queue ,而 客戶端B從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在 任何時刻發送信息到Queue,而不需要知道接收客戶端是不是在運行 。
請看下面發消息和收消息的例子
package com.activemq.queue; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMqTest { private static String queueName = "activemq_queue_"; public static void main(String[] args) { Receiver receiver=new Receiver(); Sender sender =new Sender(); try { sender.send(); receiver.receive(); } catch (Exception e) { e.printStackTrace(); } } static class Receiver { public static void receive() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(destination); //第一種情況 int i = 0; while (i < 3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); // TODO something.... System.out .println("收到消息:" +message.getText()); } session.close(); connection.close(); //----------------第一種情況結束---------------------- //第二種方式 // consumer.setMessageListener(new MessageListener() { // public void onMessage(Message arg0) { // if(arg0 instanceof TextMessage){ // try { // System.out.println("arg0="+((TextMessage)arg0).getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // } // }); //第三種情況 // while (true) { // Message msg = consumer.receive(1000); // TextMessage message = (TextMessage) msg; // if (null != message) { // System.out.println("收到消息:" + message.getText()); // } // } } } static class Sender { public static void send() throws Exception { ConnectionFactory connectionFactory = null; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, //null ActiveMQConnection.DEFAULT_PASSWORD, //null "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("count"+new Date().getTime()); Thread.sleep(1000); // 通過消息生產者發出消息 System.out.println("發送消息"+i+new Date()); producer.send(message); } session.commit(); session.close(); connection.close(); } } }Sender主要的作用是發送消息,Receiver主要的作用是接受消息,并且顯示一下接收消息的內容,這里詳細的解釋接受消息的方法:
(1)第一種方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者會一直等待下去,直到有消息到達,或者超時。
其實第一種方法和第三種方法接受原理一樣,區別是第一種知道要接受消息的條數,接受完消息,手動關系連接。而第三種不知道要接受多少條數據,所以使用while (true) 死循環直接在接受消息。
(2)第二種方法:消息消費者注冊一個MessageListener,當有消息到達的時候,會回調它的onMessage()方法。
這里需要注意的是,你注冊完成MessageListener,千萬不要關閉連接session.close();和connection.close();因為你剛剛注冊完成監聽器,就把連接關閉,就不會受到消息,所以監聽器中也不會有處理。(這個問題可把我整哭了,搞了半天,才弄明白)
請看ActiveMQ 頁面上顯示隊列的信息
name是隊列名稱
Number Of Pending Messages 是隊列中有多少個消息等待出隊列
Number Of Consumers 是隊列中有多少個消費者
Messages Enqueued 是隊列共有多少個信息
Messages Dequeued 是隊列中已經出列多少個消息