JAVA中利用ActiveMQ收發消息

dh_sue 10年前發布 | 59K 次閱讀 ActiveMQ Java

ActiveMQ不多說,下現是開啟線程進行消息監聽,符合條件則發送相應的消息到對方,兩者消息均采用隊列模式
SMSMQListener.java

/**

  • */ package com.wxcm.sms;

import java.util.HashMap; import java.util.List; import java.util.Map;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired;

import com.wxcm.sms.service.ChannelService; import com.wxcm.sms.util.MQUtil; import com.wxcm.sms.vo.Channel; import com.wxcm.waf.Configur;

/**

  • @author D.H. Sue / public class SMSMQListener implements Runnable {

    Logger logger = Logger.getLogger(SMSMQListener.class); @Autowired ChannelService channelService; @Autowired Configur configur; Connection connection; Session session;

    public SMSMQListener() {

    }

    public void run() {

     logger.info("--ActiveMQ Connect--");
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + configur.getProperties().get("sms.mq.url") + ")");
     logger.info("--ActiveMQ Connect Success--");
     try {
         connection = connectionFactory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = session.createQueue("stream");
         MessageConsumer consumer = session.createConsumer(destination);
         while (true) {
             consumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
                     MapMessage mm = (MapMessage) message;
                     try {
                         logger.info("--Msg From GET_ALL_CHANNELS--");
                         if (mm.getString("type").equals("get_all_channels")) {
                             sendChannelInfo2StreamServerI(mm.getString("queue_name"), Constants.SEND_CHANNEL_2_MQ_ADD);
                             logger.info("--Send Channel Info Done--");
                         }
                     } catch (JMSException e) {
                         logger.info(e);
                     }
                 }
             });
         }
     } catch (JMSException e) {
         logger.info(e);
     }
    
    

    }

    private void sendChannelInfo2StreamServerI(String streamServerI, String opType){

     try {
         List<Channel> channelList = (List<Channel>) channelService.listChannels();
         Map<String, String> msgMap = new HashMap<String, String>();
         if (channelList!=null) {
             msgMap.put("count", channelList.size()+"");
             for (int i = 0; i < channelList.size(); i++) {
                 msgMap.put("type_" + i, Constants.SEND_CHANNEL_2_MQ_ADD);
                 msgMap.put("channel_num_" + i, channelList.get(i).getChannelnumber() + "");
                 msgMap.put("srcurl_" + i, channelList.get(i).getSrcurl());
             }
             MQUtil mqUtil = new MQUtil(configur.getProperties().get("sms.mq.url").toString());
             Connection connection = mqUtil.connectMQ();
             mqUtil.sendMessage(connection, msgMap, streamServerI);
             mqUtil.disconnectMQ(connection);
         }
     } catch (Exception e) {
         e.printStackTrace();
     }
    

    }

    public void init() throws Exception {

     Thread thread = new Thread(this);
     thread.setName("listening");
     thread.start();
    

    }

    public void destroy(){

     try {
         if (session!=null) {
             session.close();
         }
     } catch (Exception e) {
         logger.error(e);
     }
    
     try {
         if (connection!=null) {
             connection.stop();
             connection.close();
         }
     } catch (Exception e) {
         logger.error(e);
     }
    

    } }</pre>下面是MQUtil.java工具類:

    package com.wxcm.sms.util;

import java.util.Iterator; import java.util.List; import java.util.Map;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger;

/**

  • ActiveMQ工具類
  • @author D.H. Sue / public class MQUtil { Logger logger = Logger.getLogger(MQUtil.class); private String url;

    public String getUrl() {

     return url;
    

    }

    public void setUrl(String url) {

     this.url = url;
    

    }

    public MQUtil(String url){

     this.url = url;
    

    }

    /**

    • 連接到ActiveMQ服務器
    • @param isProducer
    • 是否為生產者1表示生產者,0表示消費者
    • @return
    • 不空表示連接成功,空表示連接失敗 */ public Connection connectMQ() { ConnectionFactory connectionFactory; Connection connection = null; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, url); try { connection = connectionFactory.createConnection(); connection.start(); } catch (Exception e) { logger.error(e); }

      return connection; }

      /**

    • 斷開連接
    • @param connection
    • ActiveMQ連接 */ public void disconnectMQ(Connection connection){ try { if (connection != null) { connection.stop(); connection.close(); } } catch (Exception e) { logger.error(e.getMessage()); } }

      /**

    • 將消息message以topicName為主題發送出去
    • @param key
    • 消息名
    • @param message
    • 消息體
    • @param topicName
    • 主題名稱
    • @throws Exception */ public void sendMessage(Connection connection, List<String> key, List<String> message, String topicName) { Session session = null; MessageProducer producer; try {

       session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
       Topic topic = session.createTopic(topicName);
       producer = session.createProducer(topic);
       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
       MapMessage mapMessage = session.createMapMessage();
       Iterator<String> keyIt = key.iterator();
       Iterator<String> messageIt = message.iterator();
       while (keyIt.hasNext() && messageIt.hasNext()) {
           mapMessage.setString(keyIt.next(), messageIt.next());
       }
       producer.send(mapMessage);
       session.commit();
      

      } catch (Exception e) {

       logger.error(e.getMessage());
      

      } finally {

       try {
           if (session != null) {
               session.close();
           }
       } catch (JMSException e) {
           logger.error(e);
       }
      

      } }

      public void sendMessage(Connection connection, Map<String, String> msgMap, String streamServerI) { Session session; MessageProducer producer; try {

       session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
       Queue queue = session.createQueue(streamServerI);
       producer = session.createProducer(queue);
       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
       MapMessage mapMessage = session.createMapMessage();
       for (Map.Entry<String, String> entity : msgMap.entrySet()) {
           mapMessage.setString(entity.getKey(), entity.getValue());
       }
       producer.send(mapMessage);
       session.commit();
      

      } catch (Exception e) {

       logger.error(e);
      

      } }

}</pre>需要注意一點的是,使用線程來監聽MQ消息時,一定要記得斷開會話與鏈接,否則會遺留多個消費者,從而干擾程序的正確運行,因為是啟動線程,因此需要在配置文件中配置線程的啟動方法與銷毀方法:

<bean id="SMSMQListener" class="com.wxcm.sms.SMSMQListener" scope="singleton" lazy-init="false" init-method="init" destroy-method="destroy"/>

 本文由用戶 dh_sue 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!