使用spring + ActiveMQ 總結
Spring 整合JMS 基于ActiveMQ 實現消息的發送接收
看了網上很多文件,最后總結出了自己需要的。
一、下載并安裝ActiveMQ
首先我們到apache官網上下載activeMQ(http://activemq.apache.org/download.html),進行解壓后運行其bin目錄下面的activemq.bat文件啟動activeMQ。
二、Spring中加入ActiveMQ的配置
首先將相關的jar拷貝到項目的lib文件下
配置之前先看一下相關目錄以便于理解
下面開始配置
<!-- ActiveMQ 連接工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> -->
<property name="brokerURL" value="${mqUrl}" />
</bean>
<!-- Spring Caching連接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="connectinFactory"></property>
<!-- Session緩存數量 -->
<property name="sessionCacheSize" value="10"></property>
</bean>
<!-- 配置消息發送目的地方式 -->
<!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 -->
<bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="q.notify"></constructor-arg>
</bean>
<!-- 目的地:Topic主題 :放入一個消息,所有訂閱者都會收到 -->
<!--這個是主題目的地,一對多的-->
<bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="t.notify"></constructor-arg>
</bean>
<!-- Spring JMS Template 配置JMS模版 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
</bean>
<!-- 使用Spring JmsTemplate 的消息生產者 -->
<bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"></property>
<property name="notifyQueue" ref="notifyQueue"></property>
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"></property>
<property name="notifyTopic" ref="notifyTopic"></property>
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<!-- 消息消費者 一般使用spring的MDP異步接收Queue模式 -->
<!-- 消息監聽容器 -->
<bean id="queueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectinFactory"></property>
<property name="destination" ref="notifyQueue"></property>
<property name="messageListener" ref="queueMessageListener"></property>
</bean>
<!-- 消息監聽容器 -->
<bean id="topicContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectinFactory"></property>
<property name="destination" ref="notifyTopic"></property>
<property name="messageListener" ref="topicMessageListener"></property>
<!-- 發布訂閱模式 -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- 異步接收消息處理類 -->
<bean id="queueMessageListener" class="com.common.jms.QueueMessageListener">
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<bean id="topicMessageListener" class="com.common.jms.TopicMessageListener">
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<bean id="messageConverter" class="com.common.jms.NotifyMessageConverter">
</bean>
下面展示一下Sender
public class Sender {
private static ServletContext servletContext;
private static WebApplicationContext ctx;
/**
* 發送點對點信息
* @param noticeInfo
*/
public static void setQueueSender(){
servletContext = ServletActionContext.getServletContext();
ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext);
QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer"));
PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();
(下面先展示PhoneNoticeInfo 然后是 QueueMessageProducer )
noticeInfo.setNoticeContent("Hello Word");
noticeInfo.setNoticeTitle("hello Word");
noticeInfo.setReceiver("hello");
noticeInfo.setReceiverPhone("1111111");
notifyMessageProducer.sendQueue(noticeInfo);
}
public static ServletContext getServletContext() {
return servletContext;
}
public static void setServletContext(ServletContext servletContext) {
Sender.servletContext = servletContext;
}
public static WebApplicationContext getCtx() {
return ctx;
}
public static void setCtx(WebApplicationContext ctx) {
Sender.ctx = ctx;
}
}
PhoneNoticeInfo
public class PhoneNoticeInfo implements Serializable {
/** 消息標題 */
public String noticeTitle;
/** 消息內容 */
public String noticeContent;
/** 接收者 */
public String receiver;
/** 接收手機號 */
public String receiverPhone;
public String getNoticeTitle() {
return noticeTitle;
}
public void setNoticeTitle(String noticeTitle) {
this.noticeTitle = noticeTitle;
}
public String getNoticeContent() {
return noticeContent;
}
public void setNoticeContent(String noticeContent) {
this.noticeContent = noticeContent;
}
public String getReceiver() {
return receiver;
}
public void setReceiver(String receiver) {
this.receiver = receiver;
}
public String getReceiverPhone() {
return receiverPhone;
}
public void setReceiverPhone(String receiverPhone) {
this.receiverPhone = receiverPhone;
}
}
QueueMessageProducer
/**
* 消息生產者服務類
*/
public class QueueMessageProducer {
private JmsTemplate jmsTemplate;
private Destination notifyQueue;
private NotifyMessageConverter messageConverter;
public void sendQueue(PhoneNoticeInfo noticeInfo){
sendMessage(noticeInfo);
}
private void sendMessage(PhoneNoticeInfo noticeInfo) {
// TODO Auto-generated method stub
jmsTemplate.setMessageConverter(messageConverter);
jmsTemplate.setPubSubDomain(false);
jmsTemplate.convertAndSend(notifyQueue,noticeInfo);
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getNotifyQueue() {
return notifyQueue;
}
public void setNotifyQueue(Destination notifyQueue) {
this.notifyQueue = notifyQueue;
}
public NotifyMessageConverter getMessageConverter() {
return messageConverter;
}
public void setMessageConverter(NotifyMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}
NotifyMessageConverter
/**
* 消息轉換
*/
public class NotifyMessageConverter implements MessageConverter {
private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);
@Override
/**
* 轉換接收到的消息為NoticeInfo對象
*/
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
// TODO Auto-generated method stub
if (logger.isDebugEnabled()) {
logger.debug("Receive JMS message :"+message);
}
if (message instanceof ObjectMessage) {
ObjectMessage oMsg = (ObjectMessage)message;
if (oMsg instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg;
try {
PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject();
return noticeInfo;
} catch (Exception e) {
// TODO: handle exception
logger.error("Message:${} is not a instance of NoticeInfo."+message.toString());
throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString());
}
}else{
logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString());
throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString());
}
}else {
logger.error("Message:${} is not a instance of ObjectMessage."+message.toString());
throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString());
}
}
@Override
/**
* 轉換NoticeInfo對象到消息
*/
public Message toMessage(Object obj, Session session) throws JMSException,
MessageConversionException {
// TODO Auto-generated method stub
if (logger.isDebugEnabled()) {
logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
}
if (obj instanceof PhoneNoticeInfo) {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage();
msg.setObject((PhoneNoticeInfo)obj);
return msg;
}else {
logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
}
return null;
}
}
QueueMessageListener
public class QueueMessageListener implements MessageListener {
private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
private NotifyMessageConverter messageConverter;
/**
* 接收消息
*/
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
ObjectMessage objectMessage = (ObjectMessage)message;
PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage);
System.out.println("queue收到消息"+noticeInfo.getNoticeContent());
System.out.println("model:"+objectMessage.getJMSDeliveryMode());
System.out.println("destination:"+objectMessage.getJMSDestination());
System.out.println("type:"+objectMessage.getJMSType());
System.out.println("messageId:"+objectMessage.getJMSMessageID());
System.out.println("time:"+objectMessage.getJMSTimestamp());
System.out.println("expiredTime:"+objectMessage.getJMSExpiration());
System.out.println("priority:"+objectMessage.getJMSPriority());
} catch (Exception e) {
// TODO: handle exception
logger.error("處理信息時發生異常",e);
}
}
public NotifyMessageConverter getMessageConverter() {
return messageConverter;
}
public void setMessageConverter(NotifyMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}
來自:http://my.oschina.net/u/1411333/blog/318543