Spring整合ActiveMQ

jopen 11年前發布 | 56K 次閱讀 ActiveMQ 消息系統

實現公交GPS定位,在地圖上動態顯示訂閱的公交車行車軌跡、軌跡回放等等一些功能。這就要用到消息推送服務中間件ActiveMQ。采用UDP的方式推送消息。

一.消息監聽
Spring提供了三種 AbstractMessageListenerContainer 的子類,每種各有其特點。


第一種:SimpleMessageListenerContainer
      這個消息偵聽容器是三種中最簡單的。它在啟動時創建固定數量的JMS session并在容器的整個生命周期中使用它們。這個類不能動態的適應運行時的要求或參與消息接收的事務處理。然而它對JMS提供者的要求也最低。它只需要簡單的JMS API。

第二種:DefaultMessageListenerContainer

      這個消息偵聽器使用的最多。和 SimpleMessageListenerContainer 相反,這個子類可以動態適應運行時侯的要求,也可以參與事務管理。每個收到的消息都注冊到一個XA事務中(如果使用 JtaTransactionManager 配置過),這樣就可以利用XA事務語義的優勢了。這個類在對JMS提供者的低要求和提供包括事務參于等的強大功能上取得了很好的平衡。

第三種:ServerSessionMessageListenerContainer
 

     這個監聽器容器利用JMS ServerSessionPool SPI動態管理JMS Session。 使用者各種消息監聽器可以獲得運行時動態調優功能,但是這也要求JMS提供者支持ServerSessionPool SPI。如果不需要運行時性能調整,請使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。 

二.自動將消息轉化為Java對象
        轉化器在很多組件中都是必不缺少的東西。Spring挺過MessageConverter接口提供了對消息轉換的支持。

三.代碼

      1.修改activeMQ  conf文件夾下activemq.xml配置文件,加入UDP傳輸方式

<!--     Licensed to the Apache Software Foundation (ASF) under one or more  
    contributor license agreements.  See the NOTICE file distributed with  
    this work for additional information regarding copyright ownership.  
    The ASF licenses this file to You under the Apache License, Version 2.0  
    (the "License"); you may not use this file except in compliance with  
    the License.  You may obtain a copy of the License at  

    http://www.apache.org/licenses/LICENSE-2.0  

    Unless required by applicable law or agreed to in writing, software  
    distributed under the License is distributed on an "AS IS" BASIS,  
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
    See the License for the specific language governing permissions and  
    limitations under the License.  

  -->   
- <!--  START SNIPPET: example  
  -->   
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">  
- <!--  Allows us to use system properties as variables in this configuration file  
  -->   
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
- <property name="locations">  
  <value>file:${activemq.conf}/credentials.properties</value>   
  </property>  
  </bean>  
- <!--  Allows log searching in hawtio console  
  -->   
  <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop" />   
- <!--         The <broker> element is used to configure the ActiveMQ broker. 

  -->   
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">  
- <destinationPolicy>  
- <policyMap>  
- <policyEntries>  
- <policyEntry topic=">">  
- <!--  The constantPendingMessageLimitStrategy is used to prevent  
                         slow topic consumers to block producers and affect other consumers  
                         by limiting the number of messages that are retained  
                         For more information, see:  

                         http://activemq.apache.org/slow-consumer-handling.html  


  -->   
- <pendingMessageLimitStrategy>  
  <constantPendingMessageLimitStrategy limit="1000" />   
  </pendingMessageLimitStrategy>  
  </policyEntry>  
  </policyEntries>  
  </policyMap>  
  </destinationPolicy>  
- <!--             The managementContext is used to configure how ActiveMQ is exposed in  
            JMX. By default, ActiveMQ uses the MBean server that is started by  
            the JVM. For more information, see:  

            http://activemq.apache.org/jmx.html  

  -->   
- <managementContext>  
  <managementContext createConnector="false" />   
  </managementContext>  
- <!--             Configure message persistence for the broker. The default persistence  
            mechanism is the KahaDB store (identified by the kahaDB tag).  
            For more information, see:  

            http://activemq.apache.org/persistence.html  

  -->   
- <persistenceAdapter>  
  <kahaDB directory="${activemq.data}/kahadb" />   
  </persistenceAdapter>  
- <!--             The systemUsage controls the maximum amount of space the broker will  
            use before disabling caching and/or slowing down producers. For more information, see:  
            http://activemq.apache.org/producer-flow-control.html  

  -->   
- <systemUsage>  
- <systemUsage>  
- <memoryUsage>  
  <memoryUsage percentOfJvmHeap="70" />   
  </memoryUsage>  
- <storeUsage>  
  <storeUsage limit="100 gb" />   
  </storeUsage>  
- <tempUsage>  
  <tempUsage limit="50 gb" />   
  </tempUsage>  
  </systemUsage>  
  </systemUsage>  
- <!--             The transport connectors expose ActiveMQ over a given protocol to  
            clients and other brokers. For more information, see:  

            http://activemq.apache.org/configuring-transports.html  

  -->   
- <transportConnectors>  
- <!--  DOS protection, limit concurrent connections to 1000 and frame size to 100MB  
  -->   
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <strong><span style="color:#cc33cc;"><transportConnector name="udp" uri="udp://0.0.0.0:8123" /> </span></strong>  
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  </transportConnectors>  
- <!--  destroy the spring context on shutdown to stop jetty  
  -->   
- <shutdownHooks>  
  <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />   
  </shutdownHooks>  
  </broker>  
- <!--         Enable web consoles, REST and Ajax APIs and demos  
        The web consoles requires by default login, you can disable this in the jetty.xml file  

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details  

  -->   
  <import resource="jetty.xml" />   
  </beans>  
- <!--  END SNIPPET: example  
  -->   
2. applicationContext.xml 

     

      <?xml version="1.0" encoding="UTF-8" ?>   
    - <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:flex="http://www.springframework.org/schema/flex" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">  
    - <!-- %%%%%%%%%%%%%%%%%%*********************消息處理 ACTIVEMQ***************************%%%%%%%%%%%%%  
      -->   
    - <!--  JMS TOPIC MODEL  
      -->   
    - <!--  TOPIC鏈接工廠  
      -->   
    - <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
      <property name="brokerURL" value="udp://localhost:8123" />   
    - <!--  UDP傳輸方式  
      -->   
    - <!--    <property name="brokerURL" value="tcp://localhost:61616" />   
      -->   
    - <!--  TCP傳輸方式  
      -->   
      <property name="useAsyncSend" value="true" />   
      </bean>  
    - <bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
      <property name="brokerURL" value="udp://localhost:8123" />   
    - <!--  UDP傳輸方式  
      -->   
    - <!--    <property name="brokerURL" value="tcp://localhost:61616" />   
      -->   
    - <!--  TCP傳輸方式  
      -->   
      </bean>  
    - <!--  定義主題  
      -->   
    - <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">  
      <constructor-arg value="normandy.topic" />   
      </bean>  
      <bean id="messageConvertForSys" class="com.tech.gps.util.MessageConvertForSys" />   
    - <!--  TOPIC send jms模板  
      -->   
    - <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
      <property name="connectionFactory" ref="topicSendConnectionFactory" />   
      <property name="defaultDestination" ref="myTopic" />   
      <property name="messageConverter" ref="messageConvertForSys" />   
    - <!--  發送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 
      -->   
      <property name="deliveryMode" value="1" />   
      <property name="pubSubDomain" value="true" />   
    - <!--  開啟訂閱模式  
      -->   
      </bean>  
    - <!--  消息發送方  
      -->   
    - <bean id="topicSender" class="com.tech.gps.util.MessageSender">  
      <property name="jmsTemplate" ref="jmsTemplate" />   
      </bean>  
    - <!--  消息接收方  
      -->   
      <bean id="topicReceiver" class="com.tech.gps.util.MessageReceiver" />   
    - <!--  主題消息監聽容器  
      -->   
    - <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
      <property name="connectionFactory" ref="topicListenConnectionFactory" />   
      <property name="pubSubDomain" value="true" />   
    - <!-- true 訂閱模式  
      -->   
      <property name="destination" ref="myTopic" />   
    - <!--  目的地 myTopic  
      -->   
      <property name="subscriptionDurable" value="true" />   
    - <!-- -這里是設置接收客戶端的ID,在持久化時,但這個客戶端不在線時,消息就存在數據庫里,知道被這個ID的客戶端消費掉 
      -->   
      <property name="clientId" value="clientId_1" />   
      <property name="messageListener" ref="topicReceiver" />   
      </bean>  
    - <!--  Servlet  
      -->   
    - <bean id="ControlServlet1" class="com.tech.gps.servlet.ControlServlet1">  
      <property name="topicSender" ref="topicSender" />   
      </bean>  
      </beans>  
    ?  
3. web.xml
      <?xml version="1.0" encoding="UTF-8" ?>   
    - <web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">  
    - <!--  加載spring配置文件applicationContext.xml  
      -->   
    - <listener>  
      <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>   
      </listener>  
    - <!--  指明spring配置文件在何處  
      -->   
    - <context-param>  
      <param-name>contextConfigLocation</param-name>   
      <param-value>classpath*:applicationContext.xml</param-value>   
      </context-param>  
    - <servlet>  
      <servlet-name>ControlServlet1</servlet-name>   
      <servlet-class>com.tech.gps.servlet.DelegatingServletProxy</servlet-class>   
      </servlet>  
    - <servlet-mapping>  
      <servlet-name>ControlServlet1</servlet-name>   
      <url-pattern>/ControlServlet1</url-pattern>   
      </servlet-mapping>  
    - <welcome-file-list>  
      <welcome-file>index11.jsp</welcome-file>   
      </welcome-file-list>  
      </web-app>  
    ?  
4. 消息發送
package com.tech.gps.util;  

import org.springframework.jms.core.JmsTemplate;  


public class MessageSender {    

    private JmsTemplate jmsTemplate;    

    public void sendMessage(String msg){    

        jmsTemplate.convertAndSend(msg);    
    }  

    public JmsTemplate getJmsTemplate() {  
        return jmsTemplate;  
    }  

    public void setJmsTemplate(JmsTemplate jmsTemplate) {  
        this.jmsTemplate = jmsTemplate;  
    }    

}    
5. 消息轉換
package com.tech.gps.util;  

import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.ObjectMessage;  
import javax.jms.Session;  
import javax.jms.TopicPublisher;  

import org.springframework.jms.support.converter.MessageConversionException;  
import org.springframework.jms.support.converter.MessageConverter;  

public class MessageConvertForSys implements MessageConverter {    

    public Message toMessage(Object object, Session session)    
            throws JMSException, MessageConversionException {    

        System.out.println("sendMessage:"+object.toString());    
        ObjectMessage objectMessage = session.createObjectMessage();    
        objectMessage.setStringProperty("key",object.toString());    

        return objectMessage;    
    }    

    public Object fromMessage(Message message) throws JMSException,    
            MessageConversionException {    


        ObjectMessage objectMessage = (ObjectMessage) message;    
        return objectMessage.getObjectProperty("key");    
    }  

}  
6. 消息接收
    package com.tech.gps.util;  

    import javax.jms.JMSException;  
    import javax.jms.Message;  
    import javax.jms.MessageListener;  
    import javax.jms.ObjectMessage;  

    public class MessageReceiver implements MessageListener {    

        public void onMessage(Message m) {    

            ObjectMessage om = (ObjectMessage) m;    
            try {    
                String key = om.getStringProperty("key");   
                System.out.println(" ");  
                System.out.println("receiveMessage:"+key);      

            } catch (JMSException e) {    
                e.printStackTrace();    
            }    
        }  


    }    
7. servlet控制器
    package com.tech.gps.servlet;  

    import java.io.IOException;  
    import java.io.PrintWriter;  

    import javax.servlet.ServletContext;  
    import javax.servlet.ServletException;  
    import javax.servlet.http.HttpServlet;  
    import javax.servlet.http.HttpServletRequest;  
    import javax.servlet.http.HttpServletResponse;  

    import org.springframework.context.ApplicationContext;  
    import org.springframework.web.context.WebApplicationContext;  
    import org.springframework.web.context.support.WebApplicationContextUtils;  

    import com.tech.gps.util.MessageSender;  


    public class ControlServlet1 extends HttpServlet {  

        private MessageSender topicSender;  


        public MessageSender getTopicSender() {  
            return topicSender;  
        }  

        public void setTopicSender(MessageSender topicSender) {  
            this.topicSender = topicSender;  
        }  

        public void init() throws ServletException {  

        }  

        public void doGet(HttpServletRequest request, HttpServletResponse response)  
                throws ServletException, IOException {  

                 doPost(request,response);  
        }  


        public void doPost(HttpServletRequest request, HttpServletResponse response)  
                throws ServletException, IOException {  

              request.setCharacterEncoding("utf-8");  

              for(int i =0;i<10;i++){  

                try {  
                     Thread.sleep(1000);  
                  } catch (InterruptedException e) {  

                      e.printStackTrace();  
                  }     

                  topicSender.sendMessage("坐標:118.36582,37.2569812");  

              }  

        }  

    }  
8.Spring整合Servlet
package com.tech.gps.servlet;  

import java.io.IOException;  

import javax.servlet.GenericServlet;  
import javax.servlet.Servlet;  
import javax.servlet.ServletException;  
import javax.servlet.ServletRequest;  
import javax.servlet.ServletResponse;  

import org.springframework.web.context.WebApplicationContext;  
import org.springframework.web.context.support.WebApplicationContextUtils;  

public class DelegatingServletProxy extends GenericServlet{  

     private String targetBean;    
     private Servlet proxy;   


    public void service(ServletRequest req, ServletResponse res)  
            throws ServletException, IOException {  

          proxy.service(req, res);    
    }    


    public void init() throws ServletException {    
        this.targetBean = getServletName();    
        getServletBean();    
        proxy.init(getServletConfig());    
    }    

    private void getServletBean() {    
        WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());    
        this.proxy = (Servlet) wac.getBean(targetBean);    
    }    
}    
9. 輸出

       sendMessage:坐標:128.36582,32.2569812
       receiveMessage:坐標:128.36582,32.2569812

來自:http://blog.csdn.net/allen_oscar/article/details/17066349

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!