使用spring + ActiveMQ 總結

jopen 10年前發布 | 49K 次閱讀 ActiveMQ 消息系統

Spring 整合JMS 基于ActiveMQ 實現消息的發送接收

看了網上很多文件,最后總結出了自己需要的。

一、下載并安裝ActiveMQ

首先我們到apache官網上下載activeMQ(http://activemq.apache.org/download.html),進行解壓后運行其bin目錄下面的activemq.bat文件啟動activeMQ。

二、Spring中加入ActiveMQ的配置

首先將相關的jar拷貝到項目的lib文件下

使用spring + ActiveMQ 總結

配置之前先看一下相關目錄以便于理解

使用spring + ActiveMQ 總結

下面開始配置

<!-- ActiveMQ 連接工廠 -->
 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
 <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> -->
  <property name="brokerURL" value="${mqUrl}" />
 </bean>
 <!-- Spring Caching連接工廠 -->
 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 
 <bean id="cachingConnectionFactory"
  class="org.springframework.jms.connection.CachingConnectionFactory">
  <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 
  <property name="targetConnectionFactory" ref="connectinFactory"></property>
  <!-- Session緩存數量 -->
  <property name="sessionCacheSize" value="10"></property>
 </bean>

 <!-- 配置消息發送目的地方式 -->
 <!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 -->

 <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg value="q.notify"></constructor-arg>
 </bean>
 <!-- 目的地:Topic主題 :放入一個消息,所有訂閱者都會收到 -->
 <!--這個是主題目的地,一對多的--> 
 <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg value="t.notify"></constructor-arg>
 </bean>
 <!-- Spring JMS Template 配置JMS模版 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
 </bean>
 <!-- 使用Spring JmsTemplate 的消息生產者 -->
 <bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">
  <property name="jmsTemplate" ref="jmsTemplate"></property>
  <property name="notifyQueue" ref="notifyQueue"></property>
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer">
  <property name="jmsTemplate" ref="jmsTemplate"></property>
  <property name="notifyTopic" ref="notifyTopic"></property>
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <!-- 消息消費者 一般使用spring的MDP異步接收Queue模式 -->
 <!-- 消息監聽容器 -->
 <bean id="queueContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectinFactory"></property>
  <property name="destination" ref="notifyQueue"></property>
  <property name="messageListener" ref="queueMessageListener"></property>
 </bean>
 <!-- 消息監聽容器 -->
 <bean id="topicContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectinFactory"></property>
  <property name="destination" ref="notifyTopic"></property>
  <property name="messageListener" ref="topicMessageListener"></property>
  <!-- 發布訂閱模式 -->
  <property name="pubSubDomain" value="true" />

 </bean>
 <!-- 異步接收消息處理類 -->
 <bean id="queueMessageListener" class="com.common.jms.QueueMessageListener">
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="topicMessageListener" class="com.common.jms.TopicMessageListener">
  <property name="messageConverter" ref="messageConverter"></property>
 </bean>
 <bean id="messageConverter" class="com.common.jms.NotifyMessageConverter">
 </bean>

下面展示一下Sender

public class Sender {
 private static ServletContext servletContext;
 private static WebApplicationContext ctx; 
 /**
  * 發送點對點信息
  * @param noticeInfo
  */
 public static void setQueueSender(){ 
  servletContext = ServletActionContext.getServletContext();
  ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext);
   QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer"));
   PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();

(下面先展示PhoneNoticeInfo 然后是 QueueMessageProducer
   noticeInfo.setNoticeContent("Hello Word");
   noticeInfo.setNoticeTitle("hello Word");
   noticeInfo.setReceiver("hello");
   noticeInfo.setReceiverPhone("1111111");
   notifyMessageProducer.sendQueue(noticeInfo);
  }

public static ServletContext getServletContext() {
  return servletContext;
 }
 public static void setServletContext(ServletContext servletContext) {
  Sender.servletContext = servletContext;
 }
 public static WebApplicationContext getCtx() {
  return ctx;
 }
 public static void setCtx(WebApplicationContext ctx) {
  Sender.ctx = ctx;
 } 
}

 PhoneNoticeInfo

public class PhoneNoticeInfo implements Serializable {
 /** 消息標題 */
 public String noticeTitle;
 /** 消息內容 */
 public String noticeContent;
 /** 接收者 */
 public String receiver;
 /** 接收手機號 */
 public String receiverPhone;
 public String getNoticeTitle() {
  return noticeTitle;
 }
 public void setNoticeTitle(String noticeTitle) {
  this.noticeTitle = noticeTitle;
 }
 public String getNoticeContent() {
  return noticeContent;
 }
 public void setNoticeContent(String noticeContent) {
  this.noticeContent = noticeContent;
 }
 public String getReceiver() {
  return receiver;
 }
 public void setReceiver(String receiver) {
  this.receiver = receiver;
 }

 public String getReceiverPhone() {
  return receiverPhone;
 }
 public void setReceiverPhone(String receiverPhone) {
  this.receiverPhone = receiverPhone;
 }
 
}

QueueMessageProducer

/**
 * 消息生產者服務類
 */
public class QueueMessageProducer {
 private JmsTemplate jmsTemplate;
 private Destination notifyQueue;
 private NotifyMessageConverter messageConverter;
 public void sendQueue(PhoneNoticeInfo noticeInfo){
  sendMessage(noticeInfo);
 }
 private void sendMessage(PhoneNoticeInfo noticeInfo) {
  // TODO Auto-generated method stub
  jmsTemplate.setMessageConverter(messageConverter);
  jmsTemplate.setPubSubDomain(false);
  jmsTemplate.convertAndSend(notifyQueue,noticeInfo);
 }
 public JmsTemplate getJmsTemplate() {
  return jmsTemplate;
 }
 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
 public Destination getNotifyQueue() {
  return notifyQueue;
 }
 public void setNotifyQueue(Destination notifyQueue) {
  this.notifyQueue = notifyQueue;
 }
 public NotifyMessageConverter getMessageConverter() {
  return messageConverter;
 }
 public void setMessageConverter(NotifyMessageConverter messageConverter) {
  this.messageConverter = messageConverter;
 }
}

 

NotifyMessageConverter

/**
 * 消息轉換
 */
public class NotifyMessageConverter implements MessageConverter {
 private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);
 @Override
 /**
  * 轉換接收到的消息為NoticeInfo對象
  */
 public Object fromMessage(Message message) throws JMSException,
   MessageConversionException {
  // TODO Auto-generated method stub
  if (logger.isDebugEnabled()) {
   logger.debug("Receive JMS message :"+message);
  }
  if (message instanceof ObjectMessage) {
   ObjectMessage oMsg = (ObjectMessage)message;
   if (oMsg instanceof ActiveMQObjectMessage) {
    ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg;
    try {
     PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject();
     return noticeInfo;
    } catch (Exception e) {
     // TODO: handle exception
     logger.error("Message:${} is not a instance of NoticeInfo."+message.toString());
     throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString());
    }
   }else{
    logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString());
    throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString());
   }
  }else {
   logger.error("Message:${} is not a instance of ObjectMessage."+message.toString());
   throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString());
  }
 }

 @Override
 /**
  * 轉換NoticeInfo對象到消息
  */
 public Message toMessage(Object obj, Session session) throws JMSException,
   MessageConversionException {
  // TODO Auto-generated method stub
  if (logger.isDebugEnabled()) {
   logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
  }
  if (obj instanceof PhoneNoticeInfo) {
   ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage();
   msg.setObject((PhoneNoticeInfo)obj);
   return msg;
  }else {
   logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
  }
  return null;
 }

}

 

QueueMessageListener

public class QueueMessageListener implements MessageListener {
 private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
 private NotifyMessageConverter messageConverter;
 
 /**
  * 接收消息
  */
 @Override
 public void onMessage(Message message) {
  // TODO Auto-generated method stub
  try {
   ObjectMessage objectMessage = (ObjectMessage)message;
   PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage);
   System.out.println("queue收到消息"+noticeInfo.getNoticeContent());
   System.out.println("model:"+objectMessage.getJMSDeliveryMode()); 
   System.out.println("destination:"+objectMessage.getJMSDestination()); 
   System.out.println("type:"+objectMessage.getJMSType()); 
   System.out.println("messageId:"+objectMessage.getJMSMessageID()); 
   System.out.println("time:"+objectMessage.getJMSTimestamp()); 
   System.out.println("expiredTime:"+objectMessage.getJMSExpiration()); 
   System.out.println("priority:"+objectMessage.getJMSPriority()); 

  } catch (Exception e) {
   // TODO: handle exception
   logger.error("處理信息時發生異常",e);
  }
 }
 public NotifyMessageConverter getMessageConverter() {
  return messageConverter;
 }
 public void setMessageConverter(NotifyMessageConverter messageConverter) {
  this.messageConverter = messageConverter;
 }

}

 

來自:http://my.oschina.net/u/1411333/blog/318543

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!