ActiveMQ發消息和收消息

jopen 11年前發布 | 141K 次閱讀 ActiveMQ 消息系統

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。下面詳細的解釋常用類的作用

ConnectionFactory 接口(連接工廠) 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。
Connection 接口(連接) 連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。
Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過 JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者) 由會話創建的對象,用于接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者) 由會話創建的對象,用于發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。
Message 接口(消息) 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另一個應用程序。一個消息有三個主要部分: 消息頭(必須):包含用于識別和為消息尋找路由的操作設置。 一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。 一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 消息接口非常靈活,并提供了許多方式來定制消息的內容。
Session 接口(會話) 表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事 務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允 許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。

JMS的消息模式有1.點對點的消息模式(Point to Point Messaging)

2.發布訂閱模式(publish – subscribe Mode)

這里基于點對點的消息模式進行ActiveMQ發消息和收消息過程的分析,請看模型圖:

ActiveMQ發消息和收消息

點對點的消息發送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder(客戶端A) 發送消息,receive(客戶端B)接收消息。具體點就是客戶端A發送Message Queue ,而 客戶端B從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在 任何時刻發送信息到Queue,而不需要知道接收客戶端是不是在運行 

請看下面發消息和收消息的例子

package com.activemq.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class ActiveMqTest {

 private static String queueName = "activemq_queue_";


 public static void main(String[] args) {
 Receiver receiver=new Receiver();
 Sender sender =new Sender();
 try {
 sender.send();
 receiver.receive();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }


 static class Receiver {
 public static void receive() throws Exception {
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 Connection connection = connectionFactory.createConnection();
 connection.start();
 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);
 MessageConsumer consumer = session.createConsumer(destination);
 //第一種情況
 int i = 0;
 while (i < 3) {
 i++;
 TextMessage message = (TextMessage) consumer.receive();
 session.commit();
 // TODO something....
 System.out
 .println("收到消息:" +message.getText());
 }
 session.close();
 connection.close();
 //----------------第一種情況結束----------------------
 //第二種方式
//          consumer.setMessageListener(new MessageListener() {
//              public void onMessage(Message arg0) {
//                  if(arg0 instanceof TextMessage){
//                      try {
//                          System.out.println("arg0="+((TextMessage)arg0).getText());
//                      } catch (JMSException e) {
//                          e.printStackTrace();
//                      }
//                  }
//              }
//          });
 //第三種情況
//           while (true) {
//            Message msg = consumer.receive(1000);
//            TextMessage message = (TextMessage) msg;
//            if (null != message) { 
//               System.out.println("收到消息:" + message.getText());
//            } 
//        }
 }
 }


 static class Sender {
 public static void send() throws Exception {
 ConnectionFactory connectionFactory = null;
 connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, //null
                ActiveMQConnection.DEFAULT_PASSWORD, //null
                "tcp://localhost:61616");


 Connection connection = connectionFactory.createConnection();
 connection.start();


 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);


 MessageProducer producer = session.createProducer(destination);
 for (int i = 0; i < 3; i++) {
 TextMessage message = session.createTextMessage("count"+new Date().getTime());
 Thread.sleep(1000);
 // 通過消息生產者發出消息
 System.out.println("發送消息"+i+new Date());
 producer.send(message);
 }
 session.commit();
 session.close();
 connection.close();
 }
 }
}
Sender主要的作用是發送消息,Receiver主要的作用是接受消息,并且顯示一下接收消息的內容,這里詳細的解釋接受消息的方法:

(1)第一種方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者會一直等待下去,直到有消息到達,或者超時。

其實第一種方法和第三種方法接受原理一樣,區別是第一種知道要接受消息的條數,接受完消息,手動關系連接。而第三種不知道要接受多少條數據,所以使用while (true) 死循環直接在接受消息

(2)第二種方法:消息消費者注冊一個MessageListener當有消息到達的時候,會回調它的onMessage()方法。

這里需要注意的是,你注冊完成MessageListener,千萬不要關閉連接session.close();和connection.close();因為你剛剛注冊完成監聽器,就把連接關閉,就不會受到消息,所以監聽器中也不會有處理。(這個問題可把我整哭了,搞了半天,才弄明白)

請看ActiveMQ 頁面上顯示隊列的信息

ActiveMQ發消息和收消息name是隊列名稱

Number Of Pending Messages  是隊列中有多少個消息等待出隊列

Number Of Consumers  是隊列中有多少個消費者

Messages Enqueued  隊列共有多少個信息

Messages Dequeued  是隊列中已經出列多少個消息

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!