ActiveMQ簡單介紹+簡單實例

jopen 10年前發布 | 104K 次閱讀 ActiveMQ 消息系統

1. JMS基本概念
</span>      JMS(Java Message Service) 即Java消息服務。它提供標準的產生、發送、接收消息的接口簡化 企業 應用的開發。它支持兩種消息通信模型:點到點(point-to-point)(P2P)模型和發布/訂閱(Pub/Sub)模型。P2P 模型規定了一個消息只能有一個接收者;Pub/Sub 模型允許一個消息可以有多個接收者。
    對于點到點模型,消息生產者產生一個消息后,把這個消息發送到一個Queue(隊列)中,然后消息接收者再從這個Queue中讀取,一旦這個消息被一個接收者讀取之后,它就在這個Queue中消失了,所以一個消息只能被一個接收者消費。

    與點到點模型不同,發布/訂閱模型中,消息生產者產生一個消息后,把這個消息發送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個消息到達這個Topic之后,所有消息接收者都會收到這個消息。

2.編程的結構

2.1消息產生者向JMS發送消息的步驟 
(1)創建連接使用的工廠類JMS ConnectionFactory 
(2)使用管理對象JMS ConnectionFactory建立連接Connection 
(3)使用連接Connection 建立會話Session 
(4)使用會話Session和管理對象Destination創建消息生產者MessageSender 
(5)使用消息生產者MessageSender發送消息 
2.2消息消費者從JMS接受消息的步驟 
(1)創建連接使用的工廠類JMS ConnectionFactory 
(2)使用管理對象JMS ConnectionFactory建立連接Connection 
(3)使用連接Connection 建立會話Session 
(4)使用會話Session和管理對象Destination創建消息消費者MessageReceiver 
(5)使用消息消費者MessageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver 
消息消費者必須實現了MessageListener接口,需要定義onMessage事件方法。

3.ActiveMQ的下載

下載地址:http://activemq.apache.org/download.html
解壓縮到本地,然后啟動/bin/activemq.bat

并且有的Eclipse沒有自帶jms.jar,需要去下載

4.發送端代碼:

package com.xkey.JMS;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsSender {

private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;


JmsSender(){

}

public void init(){
    connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");

    try{

        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.TRUE.booleanValue(),  
                Session.AUTO_ACKNOWLEDGE);
        //Queue
        destination = session.createQueue("xkey");
        producer = session.createProducer(destination);
        //Topic
        /**
         * Topic topic = session.createTopic("xkey.Topic");
         * producer = session.createProducer(topic);
        */
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        sendMessage(session,producer);
        session.commit();

    }catch(Exception e){
        e.printStackTrace();
    }finally{
        try {
            connection.close();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

private void sendMessage(Session session,MessageProducer producer) throws JMSException{
    for (int i = 1; i <= 5; i ++) {  
        TextMessage message = session.createTextMessage("First ActiveMQ Test:::: " + i);  
        // 發送消息
        System.out.println("Sender:" + "First ActiveMQ Test::: " + i);  
        producer.send(message);  
    }  
}

/**
 * @param args
 */
public static void main(String[] args) {
    // TODO Auto-generated method stub
    JmsSender jms = new JmsSender();
    jms.init();
}

}</pre>



5.接收端代碼

package com.xkey.JMS;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsReceiver {

private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private Destination destination = null;

public JmsReceiver(){

}

public void init(){
    connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
    try{
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.TRUE.booleanValue(),  
                Session.AUTO_ACKNOWLEDGE); 
        destination = session.createQueue("xkey");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener(){

            @Override
            public void onMessage(Message msg) {
                // TODO Auto-generated method stub
                TextMessage message = (TextMessage)msg;
                try{
                    System.out.println("Receiver " + message.getText());  
                }catch(Exception e){
                    e.printStackTrace();
                }
            }

        });
        /**while (true) {  
            TextMessage message = (TextMessage) consumer.receive(1000);  
            if (null != message) {  
                System.out.println("Receiver " + message.getText());  
            } else {  
                break;  
            }  
        }  */
    }catch(Exception e){
        e.printStackTrace();
    }finally{
        try {
            connection.close();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
/**
 * @param args
 */
public static void main(String[] args) {
    // TODO Auto-generated method stub
    JmsReceiver jms = new JmsReceiver();
    jms.init();
}

}</pre>



6、JMS+ActiveMQ+Spring+TomCat可以用來實現企業級的消息隊列,這樣能使服務請求端和服務操作端實現低耦合。不知道怎么實現分布式,也就是多個請求,每個請求用不同的服務器去相響應。

Spring + JMS + ActiveMQ實現簡單的消息隊列(監聽器異步實現)

連接:   http://blog.csdn.net/acceptedxukai/article/details/7775746

在Tomcat 6.0下用JNDI連接IBM MQ 6.0的配置方法(一)

連接:  http://blog.sina.com.cn/s/blog_60dadc490100ecy6.html

http://www.javawind.net/help/html/spring_ref_2.0/html/jms.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!