Java消息隊列-Spring整合ActiveMq

bym08j22 7年前發布 | 37K 次閱讀 ActiveMQ Spring Java開發

1、概述

首先和大家一起回顧一下Java 消息服務,在我之前的文章 《Java消息隊列-JMS概述》 中,我為大家分析了:

  1. 消息服務:一個中間件,用于解決兩個活多個程序之間的耦合,底層由Java 實現。
  2. 優勢:異步、可靠
  3. 消息模型:點對點,發布/訂閱
  4. JMS中的對象

然后在另一篇文章 《Java消息隊列-ActiveMq實戰》 中,和大家一起從0到1的開啟了一個ActiveMq 的項目,在項目開發的過程中,我們對ActiveMq有了一定的了解:

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
  3. 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
  4. 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通過JDBC和journal提供高速的消息持久化
  7. 從設計上保證了高性能的集群,客戶端-服務器,點對點
  8. 支持Ajax
  9. 支持與Axis的整合
  10. 可以很容易得調用內嵌JMS provider,進行測試

在接下來的這篇文章中,我會和大家一起來整合Spring 和ActiveMq,這篇博文,我們基于Spring+JMS+ActiveMQ+Tomcat,實現了Point-To-Point的異步隊列消息和PUB/SUB(發布/訂閱)模型,簡單實例,不包含任何業務。

2、目錄結構

2.1 項目目錄

IDE選擇了IDEA(建議大家使用),為了避免下載jar 的各種麻煩,底層使用maven搭建了一個項目,整合了Spring 和ActiveMq

2.2 pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0 ;
  <modelVersion>4.0.0</modelVersion>
  <groupId>Crawl-Page</groupId>
  <artifactId>Crawl-Page</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Crawl-Page Maven Webapp</name>
  <url>http://maven.apache.org</url&gt;
  <!-- 版本管理 -->
  <properties>
    <springframework>4.1.8.RELEASE</springframework>
  </properties>

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency>

<!-- JSP相關 --> <dependency> <groupId>jstl</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <scope>provided</scope> <version>2.5</version> </dependency>

<!-- spring -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>${springframework}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>${springframework}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-tx</artifactId>
  <version>${springframework}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>${springframework}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${springframework}</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
</dependency>

<!-- activemq -->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.12.1</version>
</dependency>

<!-- 自用jar包,可以忽略-->
<dependency>
  <groupId>commons-httpclient</groupId>
  <artifactId>commons-httpclient</artifactId>
  <version>3.1</version>
</dependency>

</dependencies>

<build> <finalName>Crawl-Page</finalName> <plugins> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <configuration> <port>8080</port> <path>/</path> </configuration> </plugin> </plugins> </build>

</project>

View Code</code></pre>

因為這里pom.xml 文件有點長,就不展開了。

我們可以看到其實依賴也就幾個,1、Spring 核心依賴 2、ActiveMq core和pool(這里如果同學們選擇導入jar,可以直接導入我們上一篇博客中說道的那個activemq-all 這個jar包)3、java servlet 相關依賴

這里面我們選擇的ActiveMq pool 的依賴版本會和之后的dtd 有關系,需要版本對應,所以同學們等下配置activemq 文件的時候,需要注意dtd 版本選擇

2.3 web.xml

web.xml 也大同小異,指定Spring 配置文件,springMvc 命名,編碼格式

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee
          

<display-name>Archetype Created Web Application</display-name>

<!-- 加載spring的配置文件,例如hibernate、jms等集成 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath:applicationContext*.xml; </param-value> </context-param>

<listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener>

<servlet> <servlet-name>springMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring-mvc.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springMVC</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping>

<!-- 處理編碼格式 --> <filter> <filter-name>characterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>

</web-app> </code></pre>

2.4 SpringMvc 和applicationContext.xml

這里面的SpringMVC沒什么特別,有需要的同學可以參考一下:


<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
        http://www.springframework.org/schema/tx
        ;

<!-- 啟用MVC注解 -->
<mvc:annotation-driven />
<!-- 指定Sping組件掃描的基本包路徑 -->
<context:component-scan base-package="com.Jayce" >
    <!-- 這里只掃描Controller,不可重復加載Service -->
    <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>

<!-- JSP視圖解析器-->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
    <property name="prefix" value="/WEB-INF/views/" />
    <property name="suffix" value=".jsp" />
    <!--  定義其解析視圖的order順序為1 -->
    <property name="order" value="1" />
</bean>

</beans>

View Code</code></pre>

applicationContext.xml 主要使用來裝載Bean,我們項目中并沒有什么特別的Java Bean,因此只用來指出包掃描路徑:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        ;

<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<!-- 配置掃描路徑 -->
<context:component-scan base-package="com.Jayce">
    <!-- 只掃描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加載 -->
   <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>

</beans>

View Code</code></pre>

2.5 applicationContext-ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        

<context:component-scan base-package="com.Jayce" />
<mvc:annotation-driven />

<amq:connectionFactory id="amqConnectionFactory"
                       brokerURL="tcp://192.168.148.128:61616"
                       userName="admin"
                       password="admin" />

<!-- 配置JMS連接工長 -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
    <property name="sessionCacheSize" value="100" />
</bean>

<!-- 定義消息隊列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 設置消息隊列的名字 -->
    <constructor-arg>
        <value>Jaycekon</value>
    </constructor-arg>
</bean>

<!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="demoQueueDestination" />
    <property name="receiveTimeout" value="10000" />
    <!-- true是topic,false是queue,默認是false,此處顯示寫出false -->
    <property name="pubSubDomain" value="false" />
</bean>


<!-- 配置消息隊列監聽者(Queue) -->
<bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

<!-- 顯示注入消息監聽容器(Queue),配置連接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
<bean id="queueListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="demoQueueDestination" />
    <property name="messageListener" ref="queueMessageListener" />
</bean>

</beans> </code></pre>

這里和大家講解一下這個配置文件,如果大家能夠從上述配置文件中看懂,可以跳過。同學們也可以在 ActiveMQ官網 中的查看。

1、ActiveMq 中的DTD,我們在聲明相關配置之前,我們需要先導入ActiveMq 中的DTD,不然Spring 并不理解我們的標簽是什么意思。

http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

我們在pom.xml 文件中有配置了activemq 的版本依賴我們這里的版本,需要和依賴的版本一樣,不然是找不到相關的dtd

2、 amq:connectionFactory: 很直白的一個配置項,用于配置我們鏈接工廠的地址和用戶名密碼,這里需要注意的是選擇 tcp 連接而不是http連接

3、 jmsTemplate: 比較重要的一個配置,這里指定了連接工廠,默認消息發送目的地,還有連接時長,發布消息的方式

3、項目結構

3.1 ProducerService

package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;

import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;

/**

  • Created by Administrator on 2017/1/5. */ @Service public class ProducerService {

    @Resource(name="jmsTemplate") private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination,final String msg){

     System.out.println(Thread.currentThread().getName()+" 向隊列"+destination.toString()+"發送消息---------------------->"+msg);
     jmsTemplate.send(destination, new MessageCreator() {
         public Message createMessage(Session session) throws JMSException {
             return session.createTextMessage(msg);
         }
     });
    

    }

    public void sendMessage(final String msg){

     String destination = jmsTemplate.getDefaultDestinationName();
     System.out.println(Thread.currentThread().getName()+" 向隊列"+destination+"發送消息---------------------->"+msg);
     jmsTemplate.send(new MessageCreator() {
         public Message createMessage(Session session) throws JMSException {
             return session.createTextMessage(msg);
         }
     });
    

    } } </code></pre>

    將消息生產者做成一個服務,當我們需要發送消息的時候,只需要調用ProducerService實例中的sendMessage 方法就可以向默認目的發送一個消息。

    這里提供了兩個發送方式,一個是發送到默認的目的地,一個是根據目的地發送消息。

    有興趣的同學可以和我上一篇文章《ActiveMq實戰》中ActiveMq 發送消息的方式對比一下,可以發現一些不同。

    3.2 ConsumerService

    package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service;

import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage;

/**

  • Created by Administrator on 2017/1/5. */ @Service public class ConsumerService { @Resource(name="jmsTemplate") private JmsTemplate jmsTemplate;

    public TextMessage receive(Destination destination){

     TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
     try{
         System.out.println("從隊列" + destination.toString() + "收到了消息:\t"
                 + textMessage.getText());
     } catch (JMSException e) {
         e.printStackTrace();
     }
     return textMessage;
    

    } } </code></pre>

    因為我們項目中并沒有什么業務,所以的話對消息的處理也就是打印輸出。我們只需要調用jmsTemplate中的 receive 方法,就可以從里面獲取到一條消息。

    再和我們上一篇博客對比一下,上一篇博客中,我們接受到信息之后需要手動確認事務,這樣ActiveMQ中才會確定這條消息已經被正確讀取了。而整合了Spring之后,事務將由Spring 來管理。

    3.3 MessageController

    package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService; import com.Jayce.Service.ProducerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.TextMessage;

/**

  • Created by Administrator on 2017/1/5. */ @Controller public class MessageController { private Logger logger = LoggerFactory.getLogger(MessageController.class); @Resource(name = "demoQueueDestination") private Destination destination;

    //隊列消息生產者 @Resource(name = "producerService") private ProducerService producer;

    //隊列消息消費者 @Resource(name = "consumerService") private ConsumerService consumer;

    @RequestMapping(value = "/SendMessage", method = RequestMethod.POST) @ResponseBody public void send(String msg) {

     logger.info(Thread.currentThread().getName()+"------------send to jms Start");
     producer.sendMessage(msg);
     logger.info(Thread.currentThread().getName()+"------------send to jms End");
    

    }

    @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET) @ResponseBody public Object receive(){

     logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
     TextMessage tm = consumer.receive(destination);
     logger.info(Thread.currentThread().getName()+"------------receive from jms End");
     return tm;
    

    }

} </code></pre>

控制層里面需要注入我們的生產者和消費者(實際開發中,生產者和消費者肯定不會在同一個項目中的,不然就消息服務這個東西就沒有意義了)。

現在服務層和控制層都好了,接下來我們就進行一個簡單的測試

4、項目測試

4.1 啟動ActiveMq

先確定你的ActiveMQ服務已經開啟。

4.2 啟動項目

項目使用了Tomcat 插件,避免了本地再下載Tomcat的麻煩,有需要的同學可以使用一下。

<plugins>
      <plugin>
        <groupId>org.apache.tomcat.maven</groupId>
        <artifactId>tomcat7-maven-plugin</artifactId>
        <configuration>
          <port>8080</port>
          <path>/</path>
        </configuration>
      </plugin>
</plugins>

4.3 發送消息

這里用了Chrome 的一個插件 PostMan 有興趣的同學可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同學去弄頁面!

我們發送了一個post 請求之后,看一下服務器的效果:

我們可以看到,已經向隊列發送了一條消息。我們看一下ActiveMq現在的狀態:

我們可以看到,一條消息已經成功發送到了ActiveMq中。

4.4 接收消息

使用get請求訪問服務器后臺:

服務的輸出:

ActiveMq服務器狀態:

我們可以看到,消費者已經消費了一條信息,并且沒有斷開與ActiveMq之間的鏈接。

4.5 監聽器

在實際項目中,我們很少會自己手動去獲取消息,如果需要手動去獲取消息,那就沒有必要使用到ActiveMq了,可以用一個Redis 就足夠了。

不能手動去獲取消息,那么我們就可以選擇使用一個監聽器來監聽是否有消息到達,這樣子可以很快的完成對消息的處理。

4.5.1 applicationContext-ActiveMQ.xml 配置

在上面的配置文件中,我們已經默認的添加了這段監聽器的配置文件,如果同學們不想使用這個監聽器,可以直接注釋掉。

    <!-- 配置消息隊列監聽者(Queue) -->
    <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

<!-- 顯示注入消息監聽容器(Queue),配置連接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
<bean id="queueListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="demoQueueDestination" />
    <property name="messageListener" ref="queueMessageListener" />
</bean>

</code></pre>

4.5.2 MessageListener

我們需要創建一個類實現MessageListener 接口:

package com.Jayce.Filter;

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;

/**

  • Created by Administrator on 2017/1/5. */ public class QueueMessageListener implements MessageListener { public void onMessage(Message message) {
     TextMessage tm = (TextMessage) message;
     try {
         System.out.println("QueueMessageListener監聽到了文本消息:\t"
                 + tm.getText());
         //do something ...
     } catch (JMSException e) {
         e.printStackTrace();
     }
    
    } } </code></pre>

    實現接口的onMessage 方法,我們將需要的業務操作在里面解決,這樣子,就完成了我們生產者-中間件-消費者,這樣一個解耦的操作了。

    4.5.3 測試

    和上面一樣,使用postMan 發送post請求,我們可以看到控制臺里面,消息馬上就能打印出來:

    再看看ActiveMQ服務器的狀態:

    我們可以看到,使用監聽器的效果,和手動接收消息的效果是一樣的。

    這樣子一整個項目下來,我們已經成功的整合了Spring和ActiveMQ。

    4.6 壓力測試

    這里其實也算不上什么壓力測試,在配置pom.xml文件的時候,大家有看到一個 commons-httpclient 的依賴,接下來我們使用httpClient 不停的想服務器發送消息,看一下服務器解決消息的速度如何:

    package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.PostMethod; import org.junit.Test;

import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger;

/**

  • Created by Administrator on 2017/1/5. */ public class Client {

    @Test public void test() {

     HttpClient httpClient = new HttpClient();
     new Thread(new Sender(httpClient)).start();
    
    

    }

}

class Sender implements Runnable { public static AtomicInteger count = new AtomicInteger(0); HttpClient httpClient;

public Sender(HttpClient client) {
    httpClient = client;
}

public void run() {
        try {
            System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
            PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
            post.addParameter("msg", "Hello world!");
            httpClient.executeMethod(post);
            System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

</code></pre>

這里面用了HttpClient 來向服務器發送Post 請求,然后計數輸出,有興趣的同學可以自己測試一下,可以多開幾個線程,這里只開了一個線程。

 

來自:http://www.cnblogs.com/jaycekon/p/ActiveMq.html

 

 本文由用戶 bym08j22 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!