ActiveMQ 使用示例
ActiveMQ 的使用非常簡單,和JDBC一樣,有標準的步驟:
1. 創建連接工廠
2. 創建連接
3. 創建會話
4. 創建目的地
5. 創建生產者或消費者
6. 生產或消費消息
7. 關閉生產或消費者、關閉會話、關閉連接
一個生產者例子如下:
class ActiveMQProducer implements Runnable { public void run() { try { // 創建連接工廠 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 創建JMS連接實例,并啟動連接 Connection connection = connectionFactory.createConnection(); connection.start(); // 創建Session對象,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建目標,可以是 Queue 或 Topic Destination destination = session.createQueue("ling.wcaccepted"); // 創建生成者 MessageProducer producer = session.createProducer(destination); // 設置消息不需持久化。默認消息需要持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 創建文本消息 TextMessage message = session.createTextMessage("Hello World!"); // 發送消息。non-persistent 默認異步發送;persistent 默認同步發送 producer.send(message); // 關閉會話和連接 producer.close(); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } }
一個消費者例子如下:
class ActiveMQConsumer implements Runnable { public void run() { try { // 創建連接工廠 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://115.29.145.245:61616"); // 創建JMS連接實例,并啟動連接 Connection connection = connectionFactory.createConnection(); connection.start(); // 創建Session對象,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建目標,可以是 Queue 或 Topic Destination destination = session.createQueue("ling.wcaccepted"); // 創建消費者 MessageConsumer consumer = session.createConsumer(destination); // 獲取消息 System.out.println(consumer.receive()); // 關閉會話和連接 consumer.close(); session.close(); connection.close(); } catch(Exception e) { } } }
ActiveMQ 的 Session、MessageProducer 和 MessageConsumer 類是非線程安全的,不能在多線程中共享。
結合Spring使用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- ActiveMQ 連接池配置 --> <bean id="activemqPoolFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template, 默認開啟消息持久化 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:connectionFactory-ref="activemqPoolFactory"/> <!-- ActiveMQ 消費者方法配置,下面配置生成2個listener --> <jms:listener-container connection-factory="activemqPoolFactory"> <jms:listener destination="ling.activity" ref="activemqClient" method="lingActivity"/> <jms:listener destination="ling.delete" ref="activemqClient" method="lingDelete"/> </jms:listener-container> <!-- ActiveMQ 消費者工具類配置 --> <bean id="activemqClient" class="com.winhong.ling.utils.ActivemqClient"/> </beans>
ActiveMQ 消費者工具類代碼:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 消息隊列工具類。 * * @author ningjh * @version 1.0 * @since 2015/1/4 */ public class ActivemqClient { public final static String QUEUE_ACTIVITY = "ling.activity"; public final static String QUEUE_DELETE = "ling.delete"; @Autowired private JmsTemplate jmsTemplate; /** * 發送文本消息 * * @param destination 隊列名稱 * @param message 文本消息內容 */ public void sendMessage(String destination, final String message) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } /** * 消費者監聽器方法。監聽隊列 ling.activity * * @param message */ public void lingActivity(String message) { System.out.println("接收到消息-activity: " + message); } /** * 消費者監聽器方法。監聽隊列 ling.delete * * @param message */ public void lingDelete(String message) { System.out.println("接收到消息-delete: " + message); } }
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!