activemq實例

kongna 10年前發布 | 21K 次閱讀 消息系統

來自: http://my.oschina.net/goudingcheng/blog/608872


ACTIVEMQ入門實例
1.下載版本版本:apache-activemq-5.4.1
2.解壓
運行apache-activemq-5.4.1\bin下的activemq
Failed to start ActiveMQ JMS Message Broker. Reason: java.io.EOFException: Chunk stream does not exist
若出現以上情況:就參考這個文章http://www.cnblogs.com/kaka/archive/2012/03/15/2398215.html在conf下的activemq。xml里面<broker
添加 schedulerSupport="false"
3.創建一個queen
package com.activemq.jmsApplication01;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.omg.CORBA.Context;
/*
 * Sending a jms MESSAGE
 * author paohaijiao
 * date 2016/01/18
 *  
 *
 * */
public class MyMessageProducer {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageProducer producer;
    static Message message;

    public static void init() {

        try {
            /*
            
            InitialContext ctx = new InitialContext();// 1.acquire a jms
                                                        // connection factory
            connectionFactory = (ConnectionFactory) ctx
                    .lookup("ConnectionFactoryName");*/
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
            connection = connectionFactory.createConnection();// 2 create a jms
                                                                // using the
                                                                // connection
                                                                // factory
            connection.start();// 3.start connection
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);// 4.create jms session from a
                                                // connection
            destination = session.createQueue("TEST.QUEUE");// 5,acquire jms
                                                            // destination
            producer = session.createProducer(destination);// 6create jms
                                                            // producer,or
                                                            // create message
                                                            // and addr to a
                                                            // destination
            // 7.create jms consumer or registera jms message listerner
            message = session.createTextMessage("this is text");//
            producer.send(message);// 8.send /recieve jms message
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                // 9.Release Resource
                producer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
}
4.這時查看http://localhost:8161/admin/ 里面的queens就有記錄了

Name Number Of Pending Messages   Number Of Consumers   Messages Enqueued   Messages Dequeued   Views   Operations  
example.A 0 1 0 0 BrowseActive Consumers
       
Send To    Purge    Delete
TEST.QUEUE 1 0 1 0 BrowseActive Consumers
       
Send To    Purge    Delete

 

Number Of Consumers  消費者 這個是消費者端的消費者數量

Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息  進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息  可以理解為是消費這消費掉的數

package com.activemq.jmsApplication01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyAsyncMessageConsumer implements MessageListener {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;
    static Message message;

    public void RecieveMessage() {
        try {
        //    InitialContext ctx = new InitialContext();

            //connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
            
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
             
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);

        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            try {
                connection.close();
                session.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block                                                                               
                e.printStackTrace();
            }
        }
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            System.out.println("Recieve message:" + message);

        }

    }

}若采用同步:代碼如下

package com.activemq.jmsApplication01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MySyncMessageConsumer {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;


    public void RecieveMessage() {

        try {
            /*InitialContext ctx = new InitialContext();
            connectionFactory = (ConnectionFactory) ctx
                    .lookup("ConnectionFactoryName");*/
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE");
            consumer = session.createConsumer(destination);
       
                System.out.println("init");

                
                Message message = consumer.receive(100000);
                while(true){
                    if (null ==message) {
                       System.out.println("收到消息" + ((TextMessage) message).getText());
                    } else{
                        continue;
                    }
               
                }
            

        } catch (Exception e) {

            e.printStackTrace();
        }finally{
            try {
                connection.close();
                session.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }

    }
}

Name Number Of Pending Messages   Number Of Consumers   Messages Enqueued   Messages Dequeued   Views   Operations  
example.A 0 1 0 0 BrowseActive Consumers
       
Send To    Purge    Delete
TEST.QUEUE 0 1 1 1 BrowseActive Consumers
       
Send To    Purge    Delete

因為針對消費比較快的消費者,我們使用同步(可以減少異步發送消息時產生的上下文切換),針對消費比較慢的消費者,我們使用異步。 同步發送消息的缺點是,對于生產者發送的消息,如果消費者消費的比較慢,那么生產者就會被阻塞。

默認配置是異步發送 (displayatchAsync=true),這種配置也保證了MQ的高性能。

 

 本文由用戶 kongna 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!