ActiveMQ簡述

lizengkui 8年前發布 | 20K 次閱讀 Java EE ActiveMQ 消息系統

 

概述

ActiveMQ是Apache所提供的一個開源的消息系統,完全采用 Java 來實現,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規范。JMS是一組Java應用程序接口,它提供消息的創建、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,類似于Java 數據庫 的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序能夠與不同廠商的消息組件很好地進行通信。

JMS支持兩種消息發送和接收模型。一種稱為P2P(Ponit to Point)模型,即采用點對點的方式發送消息。P2P模型是基于隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱為可能,P2P模型在點對點的情況下進行消息傳遞時采用。

另一種稱為Pub/Sub(Publish/Subscribe,即發布-訂閱)模型,發布-訂閱模型定義了如何向一個內容節點發布和訂閱消息,這個內容節點稱為topic(主題)。主題可以認為是消息傳遞的中介,消息發布這將消息發布到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發布者互相保持獨立,不需要進行接觸即可保證消息的傳遞,發布-訂閱模型在消息的一對多廣播時采用。

ActiveMQ的安裝

下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對linux系統進行闡述,當然ActiveMQ也有win版的,這里就不贅述了),可以去 官網 下載,也可以在下方留言區留下你的郵箱,博主會發給你的~

下載之后解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gz

ActiveMQ目錄內容有:

  • bin目錄包含ActiveMQ的啟動腳本
  • conf目錄包含ActiveMQ的所有配置文件
  • data目錄包含日志文件和持久性消息數據
  • example: ActiveMQ的示例
  • lib: ActiveMQ運行所需要的lib
  • webapps: ActiveMQ的web控制臺和一些相關的demo

運行命令: activemq start (在activemq/bin下運行)

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')

查看activemq是否運行命令: ps -aux | grep activemq

shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq

關閉命令: activemq stop

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre
  Heap sizes: current=63232k  free=62218k  max=932096k
    JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:
  [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED

ActiveMQ的默認服務端口為61616,這個可以在conf/activemq.xml配置文件中修改:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個jar包:activemq-all-5.13.2.jar,引入這個jar到你的項目中即可開始編寫案例代碼。

博主的activemq服務器地址為10.10.195.187,這個在下面代碼中會有體現。

按照JMS的規范,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來創建connection.在這個基礎之上我們再創建session, destination, producer和consumer。因此主要的幾個步驟如下:

  1. 獲得JMS connection factory. 通過我們提供特定環境的連接信息來構造factory。
  2. 利用factory構造JMS connection
  3. 啟動connection
  4. 通過connection創建JMS session.
  5. 指定JMS destination.
  6. 創建JMS producer或者創建JMS message并提供destination.
  7. 創建JMS consumer或注冊JMS message listener.
  8. 發送和接收JMS message.
  9. 關閉所有JMS資源,包括connection, session, producer, consumer等。

下面來看代碼舉例(P2P式)。通過Java實現的基于ActiveMQ的請求提交:

package com.zzh.activemq;

import java.io.Serializable;
import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class RequestSubmit
{
    //消息發送者
    private MessageProducer producer;
    //一個發送或者接受消息的線程
    private Session session;

    public void init() throws Exception
    {
        //ConnectionFactory連接工廠,JMS用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //Connection:JMS客戶端到JMS Provider的連接,從構造工廠中得到連接對象
        Connection connection = connectionFactory.createConnection();
        //啟動
        connection.start();
        //獲取連接操作
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destinatin = session.createQueue("RequestQueue");
        //得到消息生成(發送)者
        producer = session.createProducer(destinatin);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        ObjectMessage message = session.createObjectMessage(requestParam);
        producer.send(message);
        session.commit();
    }

    public static void main(String[] args) throws Exception{
        RequestSubmit submit = new RequestSubmit();
        submit.init();
        HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();
        requestParam.put("朱小廝", "zzh");
        submit.submit(requestParam);
    }
}

創建Session時有兩個非常重要的參數,第一個boolean類型的參數用來表示是否采用事務消息。如果是事務消息,對于的參數設置為true,此時消息的提交自動有comit處理,消息的回滾則自動由rollback處理。加入消息不是事務的,則對應的該參數設置為false,此時分為三種情況:

  • Session.AUTO_ACKNOWLEDGE表示Session會自動確認所接收到的消息。
  • Session.CLIENT_ACKNOWLEDGE表示由客戶端程序通過調用消息的確認方法來確認所接收到的消息。
  • Session.DUPS_OK_ACKNOWLEDGE使得Session將“懶惰”地確認消息,即不會立即確認消息,這樣有可能導致消息重復投遞。

提供Java實現的基于ActiveMQ的請求處理:

package com.zzh.activemq;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class RequestProcessor
{
    public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        System.out.println("requestHandler....."+requestParam.toString());
        for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet())
        {
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
    }

    public static void main(String[] args) throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("RequestQueue");
        //消息消費(接收)者
        MessageConsumer consumer = session.createConsumer(destination);

        RequestProcessor processor = new RequestProcessor();

        while(true)
        {
            ObjectMessage message = (ObjectMessage) consumer.receive(1000);
            if(null != message)
            {
                System.out.println(message);
                HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();
                processor.requestHandler(requestParam);
            }
            else
            {
                break;
            }
        }
    }
}

輸出結果:

ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{朱小廝=zzh}
朱小廝:zzh

可以通過頁面查看隊列的使用情況,在瀏覽器中輸入 http://10.10.195.187:8161/admin/queues.jsp ,用戶名和密碼都是:admin,看到以下頁面:

這個是在jetty服務器下跑的,可以修改conf/jetty.xml來修改相關jetty配置。

上面的例子是關于P2P模式的,不過有個不妥之處,就是沒有資源的釋放。下面舉一個Pub/Sub模式的。通過JMS創建ActiveMQ的topic,并給topic發送消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;

public class TopicRequest
{
    //消息發送者
    private MessageProducer producer;
    //一個發送或者接受消息的線程
    private Session session;
    //Connection:JMS客戶端到JMS Provider的連接
    private Connection connection;

    public void init() throws Exception
    {
        //ConnectionFactory連接工廠,JMS用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //從構造工廠中得到連接對象
        connection = connectionFactory.createConnection();
        //啟動
        connection.start();
        //獲取連接操作
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        producer = session.createProducer(topic);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    public void submit(String mess) throws Exception
    {
        TextMessage message = session.createTextMessage();
        message.setText(mess);
        producer.send(message);
    }

    public void close()
    {
        try
        {
            if(session != null)
                session.close();
            if(producer != null)
                producer.close();
            if(connection !=null )
                connection.close();
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception
    {
        TopicRequest topicRequest = new TopicRequest();
        topicRequest.init();
        topicRequest.submit("I'm first");
        topicRequest.close();
    }
}

消息發送到對應的topic后,需要將listener注冊到需要訂閱的topic上,以便能夠接收該topic的消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicReceive
{
    private MessageConsumer consumer;
    private Session session;

    public void init() throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message)
            {
                TextMessage tm = (TextMessage) message;
                System.out.println(tm);
                try
                {
                    System.out.println(tm.getText());
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws Exception
    {
        TopicReceive receive = new TopicReceive();
        receive.init();
    }
}

輸出結果:

ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first

參考文獻

1. 《大型分布式網站 架構 ——設計與實踐》陳康賢著。

2.  http://activemq.apache.org/

 

via: http://www.importnew.com/19690.html

 本文由用戶 lizengkui 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!