ActiveMQ 使用示例
ActiveMQ 的使用非常簡單,和JDBC一樣,有標準的步驟:
1. 創建連接工廠
2. 創建連接
3. 創建會話
4. 創建目的地
5. 創建生產者或消費者
6. 生產或消費消息
7. 關閉生產或消費者、關閉會話、關閉連接
一個生產者例子如下:
class ActiveMQProducer implements Runnable {
public void run() {
try {
// 創建連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 創建JMS連接實例,并啟動連接
Connection connection = connectionFactory.createConnection();
connection.start();
// 創建Session對象,不開啟事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目標,可以是 Queue 或 Topic
Destination destination = session.createQueue("ling.wcaccepted");
// 創建生成者
MessageProducer producer = session.createProducer(destination);
// 設置消息不需持久化。默認消息需要持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 創建文本消息
TextMessage message = session.createTextMessage("Hello World!");
// 發送消息。non-persistent 默認異步發送;persistent 默認同步發送
producer.send(message);
// 關閉會話和連接
producer.close();
session.close();
connection.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
一個消費者例子如下:
class ActiveMQConsumer implements Runnable {
public void run() {
try {
// 創建連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://115.29.145.245:61616");
// 創建JMS連接實例,并啟動連接
Connection connection = connectionFactory.createConnection();
connection.start();
// 創建Session對象,不開啟事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目標,可以是 Queue 或 Topic
Destination destination = session.createQueue("ling.wcaccepted");
// 創建消費者
MessageConsumer consumer = session.createConsumer(destination);
// 獲取消息
System.out.println(consumer.receive());
// 關閉會話和連接
consumer.close();
session.close();
connection.close();
} catch(Exception e) {
}
}
}
ActiveMQ 的 Session、MessageProducer 和 MessageConsumer 類是非線程安全的,不能在多線程中共享。
結合Spring使用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- ActiveMQ 連接池配置 --> <bean id="activemqPoolFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template, 默認開啟消息持久化 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:connectionFactory-ref="activemqPoolFactory"/> <!-- ActiveMQ 消費者方法配置,下面配置生成2個listener --> <jms:listener-container connection-factory="activemqPoolFactory"> <jms:listener destination="ling.activity" ref="activemqClient" method="lingActivity"/> <jms:listener destination="ling.delete" ref="activemqClient" method="lingDelete"/> </jms:listener-container> <!-- ActiveMQ 消費者工具類配置 --> <bean id="activemqClient" class="com.winhong.ling.utils.ActivemqClient"/> </beans>
ActiveMQ 消費者工具類代碼:
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
* 消息隊列工具類。
*
* @author ningjh
* @version 1.0
* @since 2015/1/4
*/
public class ActivemqClient {
public final static String QUEUE_ACTIVITY = "ling.activity";
public final static String QUEUE_DELETE = "ling.delete";
@Autowired
private JmsTemplate jmsTemplate;
/**
* 發送文本消息
*
* @param destination 隊列名稱
* @param message 文本消息內容
*/
public void sendMessage(String destination, final String message) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
/**
* 消費者監聽器方法。監聽隊列 ling.activity
*
* @param message
*/
public void lingActivity(String message) {
System.out.println("接收到消息-activity: " + message);
}
/**
* 消費者監聽器方法。監聽隊列 ling.delete
*
* @param message
*/
public void lingDelete(String message) {
System.out.println("接收到消息-delete: " + message);
}
} 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!