SpringJMS+activeMQ實踐
Spring對JMS提供了很好的支持,可以通過JmsTemplate來方便地實現消息服務。本例通過 activeMQ服務器模擬了消息的發送與接收。需要注意的是,activeMQ的運行依賴jdk的環境,而且對jdk的版本也有要求,我用的是 jdk1.6+activeMQ5.4.3。
運行環境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3.
一定要注意activeMQ的版本與jdk的兼容性,最新的activeMQ版本估計要在jdk1.7以上才能運行。
先說一下activeMQ的安裝:
1、下載:http://activemq.apache.org/download.html 選擇合適的Windows版本
2、安裝
(1) 首先配置JAVA環境變量
JAVA_HOME=D:\Program Files\Java\jdk1.5.0
CLASSPAHT=.;%JAVA_HOME%\lib
PATH=%JAVA_HOME%\bin;
(2)直接解壓至任意目錄(例如:D:\apache-activemq-5.3.0)
3、啟動ActiveMQ服務器:直接運行\bin\win32\activemq.bat
當運行成功后,界面顯示: Started SelectChannelConnector@0.0.0.0:8161 即說明activemq啟動成功。
4、打開ActiveMQ消息管理后臺系統 http://localhost:8161/admin/
需要依賴的jar包有:spring.jar , activemq-all-5.4.3.jar , commons-logging-api-1.1.jar , commons-io-1.3.2.jar
好了,準備工作做完后,開始上代碼,
先看一下,我們最終的Spring配置文件applicationContext.xml的內容,如下所示:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:p="http://www.springframework.org/schema/p" xmlns:jee="http://www.springframework.org/schema/jee" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd"> <!-- jms 連接工廠 --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- 配置代理的地址,即配置activeMQ的連接URI, 讓jms工廠能夠連接到activeMQ服務器(將activeMQ暴露給客戶端使用, 負責客戶端與activeMQ之間的連接通信) --> <property name="brokerURL"> <value>tcp://localhost:61616</value><!-- 一種標準URI地址,意思是說標識一個本地的端口號位61616的TCP連接(其中,"61616"是activeMQ默認的連接端口號) --> </property> </bean> <!-- ActiveMQ連接器將這種簡單等級結構的URI模式稱為低等級的連接器(low-levelconnectors), 并為這些連接器實現了基本的網絡通信協議。低等級連接器URIs使用主題(scheme)標識底層使用的網絡協議, 使用路徑元素定位網絡資源服務(一般為主機名加上端口號),使用查詢元素用來確定連接器附加信息。 --> <!-- jms 連接池 --> <!-- <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> </bean> --> <!-- jms 模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> </bean> <!-- jms Topic --> <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg value="STOCKS.JAVA" /> </bean> <!-- jms Consumer --> <bean id="javaConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory" /> <property name="destination" ref="myTopic" /> <property name="messageListener" ref="myTextListener" /> </bean> <!-- 消息監聽器 --> <bean id="myTextListener" class="demo.TextListener"> </bean> <!-- 消息發布器 --> <bean id="springPublisher" class="demo.SpringPublisher"> <property name="template"> <ref local="jmsTemplate" /> </property> <property name="topic"> <ref local="myTopic" /> </property> </bean> </beans>
接下來,消息生成器代碼,實現spring的MessageCreator接口:
package demo;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.MessageCreator;
public class MyMessageCreator implements MessageCreator {
/**
* 消息序號
*/
private int msgNo;
public MyMessageCreator(int no) {
this.msgNo = no;
}
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMsg = session.createTextMessage();
textMsg.setText(new Date() + "第" + this.msgNo + "條消息發出");
return textMsg;
}
} 接下來,消息監聽器,實現javaEE的規范MessageListener接口即可,因為要注入到spring的DefaultMessageListenerContainer中。此監聽器通過監聽來自destination(在spring中配置)的消息,一旦有消息就打印出來:
package demo;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TextListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage msg = null;
try {
if (message instanceof TextMessage) {
msg = (TextMessage) message;
System.out.println("Reading message: " + msg.getText());
} else {
System.out.println("Message of wrong type: "
+ message.getClass().getName());
}
} catch (JMSException e) {
System.out.println("JMSException in onMessage(): " + e.toString());
} catch (Throwable t) {
System.out.println("Exception in onMessage():" + t.getMessage());
}
}
} 下面是消息發布器,通過spring的jms模板,即可輕松的獲得與activeMQ的連接與通信,從而獲得Connection和Destination,再通過JmsTemplate的send方法,即可發送消息到指定的destination(在spring中配置)中,以供客戶端接收:
package demo;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
public class SpringPublisher {
/**
* Jms模板
*/
private JmsTemplate template;
/**
* Topic
*/
private Destination topic;
public JmsTemplate getTemplate() {
return template;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public Destination getTopic() {
return topic;
}
public void setTopic(Destination topic) {
this.topic = topic;
}
/**
* Start
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
int messageCount = 10;
while ((--messageCount) > 0) {
sendMessage(messageCount);
Thread.sleep(1000);
}
}
/**
* 消息發送
*/
protected void sendMessage(int msgNO) {
this.template.send(this.topic, new MyMessageCreator(msgNO));
}
} 至此,基本的jms就已經搭建好了,很簡單吧,一個spring上下文配置,一個消息生成器,一個消息發布器,一個監聽器,搞定。接下來,編寫一個測試類,看運行結果(注意在運行測試類前,一定要先啟動activeMQ服務器):
package test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import demo.SpringPublisher;
public class SpringJmsTestMain {
/**
* @param args
*/
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext.xml" });
SpringPublisher publisher = (SpringPublisher) context
.getBean("springPublisher");
publisher.start();
}
} 運行結果如下:
Reading message: Sun Jun 28 19:40:05 CST 2015第9條消息發出
Reading message: Sun Jun 28 19:40:06 CST 2015第8條消息發出
Reading message: Sun Jun 28 19:40:07 CST 2015第7條消息發出
Reading message: Sun Jun 28 19:40:08 CST 2015第6條消息發出
Reading message: Sun Jun 28 19:40:09 CST 2015第5條消息發出
Reading message: Sun Jun 28 19:40:10 CST 2015第4條消息發出
Reading message: Sun Jun 28 19:40:12 CST 2015第3條消息發出
Reading message: Sun Jun 28 19:40:13 CST 2015第2條消息發出
Reading message: Sun Jun 28 19:40:14 CST 2015第1條消息發出