Spring下ActiveMQ實戰

jopen 9年前發布 | 23K 次閱讀 ActiveMQ 消息系統

MessageQueue是分布式的系統里經常要用到的組件,一般來說,當需要把消息跨網段、跨集群的分發出去,就可以用這個。一些典型的示例就是:

1、集群A中的消息需要發送給多個機器共享;

2、集群A中消息需要主動推送,但彼此的網絡不是互通的(如集群A只有過HA才能被外界訪問);

 Spring下ActiveMQ實戰

    當然上面的幾個點,除了用MQ還有其它實現方式,但是MQ無疑是非常適合用來做這些事的。眾多MQ中,ActiveMQ是比較有名氣也很穩定的,它發送消 息的成本非常廉價,支持Queue與Topic兩種消息機制。本文主要就是講如何在Spring環境下配置此MQ:

 

1、場景假設

    現有機器兩臺Server、Worker需要進行異步通信,另有一臺ActiveMQ機器,關于MQ的配置信息存放在Zookeeper中,Zookeeper的節點有:

      - /mq/activemq/ip:mq的機器ip

      -/mq/activemq/port:這是mq的機器端口

 

2、Server的Spring XML配置

    Server主要的工作就是接受Worker消息,并發送消息給Worker。主要是定義了連接MQ的連接池接受Worker消息的隊列worker,發送消息給Worker的隊列server:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/jms ;

<!-- ActiveMQ連接池 -->
<bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL">
                <bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />
            </property>
            <property name="closeTimeout" value="60000" />
            <!-- <property name="userName" value="admin" /> -->
            <!-- <property name="password" value="admin" /> -->
            <!-- <property name="optimizeAcknowledge" value="true" /> -->
            <property name="optimizedAckScheduledAckInterval" value="10000" />
        </bean>
    </property>
</bean>


<!-- Worker任務消息 -->
<bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="worker_topic" />
</bean>
<!-- 任務監聽容器 -->
<bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="conFactory" />
    <property name="destination" ref="taskWorkerTopic" />
    <property name="messageListener">
        <bean class="lekko.mq.task.TaskWorkerListener" />
    </property>
    <property name="pubSubDomain" value="true" />
</bean>


<!-- Server任務消息 -->
<bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="server_topic" />
</bean>    
<!-- 任務消息發送模板 -->
<bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />

</beans></pre>
  一段一段地分析,ActiveMQ連接池這里,定義了連接的bean為 “conFactory”,其中 broberURL屬性是通過后臺Java代碼的靜態方法來設置的,方便線上環境通過Java代碼動態地切換,稍后會介紹這塊代碼,你現在需要知道的是, 它實際上返回的就是一個字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后臺來管理連接信息,直接改成 “<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。

    接下來,便是Worker消息隊列的定義,這里定義為“taskWorkerTopic”,類型是 org.apache.activemq.command.ActiveMQTopic,(訂閱模式)它表示一個消息可以被多個機器收到并處理,其它的還 有org.apache.activemq.command.ActiveMQQueue,(點對點模式)表示一個消息只能被一臺機器收到,當收到后消息 就出隊列了,其它機器無法處理。它們都有一個構造參數constructor-arg,指定了消息隊列的名稱,一個MQ中一個消息隊列的名字是唯一的。

    Worker的消息隊列定義好了之后,就是接受Worker的里消息了,這里定義了“taskWorkerContainer”,其屬性分別定義了連接池、目標隊列、消息處理器(我們自己的Java類,后面再講),參數pubSubDomain用于指定是使用訂閱模式還是使用點對點模式,如果是ActiveMQTopic則要設置為true,默認是false。

    好了,Server現在已經可以通過自己定義的“lekko.mq.task.TaskWorkerListener”類接受并處理taskWorkerTopic的消息了。

    如法炮制,定義一個專門用于往Worker里發消息的隊列“taskServerTopic”,并定義發送消息的模板“taskServerTemplate”備用。

 

3、Server端的接收類與發送類

    lekko.mq.task.TaskWorkerListener便是一個接收類示例:

package lekko.mq.task;

import javax.jms.Message; import javax.jms.MessageListener;

import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; import lekko.mq.model.MessageModel;

/**

  • Task消息監聽類
  • @author lekko */ @Service public class TaskWorkerListener implements MessageListener {

    private Logger _logger = Logger.getLogger(TaskWorkerListener.class);

    @Override public void onMessage(Message message) {

     if (message instanceof ActiveMQObjectMessage) {
         ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
         try {
             onMessage((MessageModel) aMsg.getObject());
         } catch (Exception e) {
             _logger.warn("Message:${} is not a instance of MessageModel.", e);
         }
     } else {
         _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
     }
    

    }

    /**

    • 處理消息
    • @param message 自定義消息實體 */ public void onMessage(MessageModel message) { ... }

}</pre>這里給大家演示的并不是最基礎的知識,處理的消息是一個自定義的類 “lekko.mq.model.MessageModel”,這個類怎么寫可以隨便整,反正就是一些你要傳遞的數據字段,但是記得要實現Serializable接口。如果你需要傳遞的僅僅是純字符串,那么直接在代碼的23行片,把message.toString()即可。這個類通過前面XML配置會處理來自 “worker_topic”隊列中的消息。

    

    再就是發送類,實際上就是把前面的taskServiceTemplate拿來用就行了:

package lekko.mq.task;

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import lekko.mq.model.MessageModel;

/**

  • 服務器任務消息分發
  • @author lekko */ @Service public class TaskServerSender {

    @Autowired @Qualifier("taskServerTemplate") private JmsTemplate jmsTemplate;

    /**

    • 發送消息 */ public void sendMessage(MessageModel msg) { jmsTemplate.convertAndSend(msg); }

}</pre>
  把這個類TaskServerSender注入到任意需要用到的地方,調用sendMessage方法即可。它會往前面定義的 “server_topic”中塞消息,等Worker來取。

 

4、關于Zookeeper配置MQ連接信息

    Worker端的配置我這里不再闡述,因為它跟在Server端的配置太相像,區別就在于Server端是從worker_topic中取消息,往 server_topic中寫消息;而Worker端的代碼則是反過來,往worker_topic中寫消息,從server_topic中取消息。

    那么如何使用Java代碼來控制ActiveMQ的配置消息呢:

package lekko.mq.util;

import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;

/**

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!