JAVA中利用ActiveMQ收發消息
ActiveMQ不多說,下現是開啟線程進行消息監聽,符合條件則發送相應的消息到對方,兩者消息均采用隊列模式
SMSMQListener.java
/**
- */ package com.wxcm.sms;
import java.util.HashMap; import java.util.List; import java.util.Map;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired;
import com.wxcm.sms.service.ChannelService; import com.wxcm.sms.util.MQUtil; import com.wxcm.sms.vo.Channel; import com.wxcm.waf.Configur;
/**
@author D.H. Sue / public class SMSMQListener implements Runnable {
Logger logger = Logger.getLogger(SMSMQListener.class); @Autowired ChannelService channelService; @Autowired Configur configur; Connection connection; Session session;
public SMSMQListener() {
}
public void run() {
logger.info("--ActiveMQ Connect--"); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + configur.getProperties().get("sms.mq.url") + ")"); logger.info("--ActiveMQ Connect Success--"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("stream"); MessageConsumer consumer = session.createConsumer(destination); while (true) { consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { MapMessage mm = (MapMessage) message; try { logger.info("--Msg From GET_ALL_CHANNELS--"); if (mm.getString("type").equals("get_all_channels")) { sendChannelInfo2StreamServerI(mm.getString("queue_name"), Constants.SEND_CHANNEL_2_MQ_ADD); logger.info("--Send Channel Info Done--"); } } catch (JMSException e) { logger.info(e); } } }); } } catch (JMSException e) { logger.info(e); }
}
private void sendChannelInfo2StreamServerI(String streamServerI, String opType){
try { List<Channel> channelList = (List<Channel>) channelService.listChannels(); Map<String, String> msgMap = new HashMap<String, String>(); if (channelList!=null) { msgMap.put("count", channelList.size()+""); for (int i = 0; i < channelList.size(); i++) { msgMap.put("type_" + i, Constants.SEND_CHANNEL_2_MQ_ADD); msgMap.put("channel_num_" + i, channelList.get(i).getChannelnumber() + ""); msgMap.put("srcurl_" + i, channelList.get(i).getSrcurl()); } MQUtil mqUtil = new MQUtil(configur.getProperties().get("sms.mq.url").toString()); Connection connection = mqUtil.connectMQ(); mqUtil.sendMessage(connection, msgMap, streamServerI); mqUtil.disconnectMQ(connection); } } catch (Exception e) { e.printStackTrace(); }
}
public void init() throws Exception {
Thread thread = new Thread(this); thread.setName("listening"); thread.start();
}
public void destroy(){
try { if (session!=null) { session.close(); } } catch (Exception e) { logger.error(e); } try { if (connection!=null) { connection.stop(); connection.close(); } } catch (Exception e) { logger.error(e); }
} }</pre>下面是MQUtil.java工具類:
package com.wxcm.sms.util;
import java.util.Iterator; import java.util.List; import java.util.Map;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger;
/**
- ActiveMQ工具類
@author D.H. Sue / public class MQUtil { Logger logger = Logger.getLogger(MQUtil.class); private String url;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public MQUtil(String url){
this.url = url;
}
/**
- 連接到ActiveMQ服務器
- @param isProducer
- 是否為生產者1表示生產者,0表示消費者
- @return
不空表示連接成功,空表示連接失敗 */ public Connection connectMQ() { ConnectionFactory connectionFactory; Connection connection = null; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, url); try { connection = connectionFactory.createConnection(); connection.start(); } catch (Exception e) { logger.error(e); }
return connection; }
/**
- 斷開連接
- @param connection
ActiveMQ連接 */ public void disconnectMQ(Connection connection){ try { if (connection != null) { connection.stop(); connection.close(); } } catch (Exception e) { logger.error(e.getMessage()); } }
/**
- 將消息message以topicName為主題發送出去
- @param key
- 消息名
- @param message
- 消息體
- @param topicName
- 主題名稱
@throws Exception */ public void sendMessage(Connection connection, List<String> key, List<String> message, String topicName) { Session session = null; MessageProducer producer; try {
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); Iterator<String> keyIt = key.iterator(); Iterator<String> messageIt = message.iterator(); while (keyIt.hasNext() && messageIt.hasNext()) { mapMessage.setString(keyIt.next(), messageIt.next()); } producer.send(mapMessage); session.commit();
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
try { if (session != null) { session.close(); } } catch (JMSException e) { logger.error(e); }
} }
public void sendMessage(Connection connection, Map<String, String> msgMap, String streamServerI) { Session session; MessageProducer producer; try {
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(streamServerI); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); for (Map.Entry<String, String> entity : msgMap.entrySet()) { mapMessage.setString(entity.getKey(), entity.getValue()); } producer.send(mapMessage); session.commit();
} catch (Exception e) {
logger.error(e);
} }
}</pre>需要注意一點的是,使用線程來監聽MQ消息時,一定要記得斷開會話與鏈接,否則會遺留多個消費者,從而干擾程序的正確運行,因為是啟動線程,因此需要在配置文件中配置線程的啟動方法與銷毀方法:
<bean id="SMSMQListener" class="com.wxcm.sms.SMSMQListener" scope="singleton" lazy-init="false" init-method="init" destroy-method="destroy"/>