ActiveMQ簡單介紹+簡單實例
1. JMS基本概念 與點到點模型不同,發布/訂閱模型中,消息生產者產生一個消息后,把這個消息發送到一個Topic中,這個Topic可以同時有多個接收者在監聽,當一個消息到達這個Topic之后,所有消息接收者都會收到這個消息。 2.編程的結構 2.1消息產生者向JMS發送消息的步驟 3.ActiveMQ的下載 下載地址:http://activemq.apache.org/download.html 并且有的Eclipse沒有自帶jms.jar,需要去下載 4.發送端代碼:
</span> JMS(Java Message Service) 即Java消息服務。它提供標準的產生、發送、接收消息的接口簡化 企業 應用的開發。它支持兩種消息通信模型:點到點(point-to-point)(P2P)模型和發布/訂閱(Pub/Sub)模型。P2P 模型規定了一個消息只能有一個接收者;Pub/Sub 模型允許一個消息可以有多個接收者。
對于點到點模型,消息生產者產生一個消息后,把這個消息發送到一個Queue(隊列)中,然后消息接收者再從這個Queue中讀取,一旦這個消息被一個接收者讀取之后,它就在這個Queue中消失了,所以一個消息只能被一個接收者消費。
(1)創建連接使用的工廠類JMS ConnectionFactory
(2)使用管理對象JMS ConnectionFactory建立連接Connection
(3)使用連接Connection 建立會話Session
(4)使用會話Session和管理對象Destination創建消息生產者MessageSender
(5)使用消息生產者MessageSender發送消息
2.2消息消費者從JMS接受消息的步驟
(1)創建連接使用的工廠類JMS ConnectionFactory
(2)使用管理對象JMS ConnectionFactory建立連接Connection
(3)使用連接Connection 建立會話Session
(4)使用會話Session和管理對象Destination創建消息消費者MessageReceiver
(5)使用消息消費者MessageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver
消息消費者必須實現了MessageListener接口,需要定義onMessage事件方法。
解壓縮到本地,然后啟動/bin/activemq.bat
package com.xkey.JMS;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsSender {
private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;
JmsSender(){
}
public void init(){
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
try{
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE.booleanValue(),
Session.AUTO_ACKNOWLEDGE);
//Queue
destination = session.createQueue("xkey");
producer = session.createProducer(destination);
//Topic
/**
* Topic topic = session.createTopic("xkey.Topic");
* producer = session.createProducer(topic);
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session,producer);
session.commit();
}catch(Exception e){
e.printStackTrace();
}finally{
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void sendMessage(Session session,MessageProducer producer) throws JMSException{
for (int i = 1; i <= 5; i ++) {
TextMessage message = session.createTextMessage("First ActiveMQ Test:::: " + i);
// 發送消息
System.out.println("Sender:" + "First ActiveMQ Test::: " + i);
producer.send(message);
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
JmsSender jms = new JmsSender();
jms.init();
}
}</pre>
package com.xkey.JMS;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class JmsReceiver {
private ConnectionFactory connectionFactory = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; private Destination destination = null; public JmsReceiver(){ } public void init(){ connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616"); try{ connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE.booleanValue(), Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("xkey"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message msg) { // TODO Auto-generated method stub TextMessage message = (TextMessage)msg; try{ System.out.println("Receiver " + message.getText()); }catch(Exception e){ e.printStackTrace(); } } }); /**while (true) { TextMessage message = (TextMessage) consumer.receive(1000); if (null != message) { System.out.println("Receiver " + message.getText()); } else { break; } } */ }catch(Exception e){ e.printStackTrace(); }finally{ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub JmsReceiver jms = new JmsReceiver(); jms.init(); }
}</pre>
6、JMS+ActiveMQ+Spring+TomCat可以用來實現企業級的消息隊列,這樣能使服務請求端和服務操作端實現低耦合。不知道怎么實現分布式,也就是多個請求,每個請求用不同的服務器去相響應。
Spring + JMS + ActiveMQ實現簡單的消息隊列(監聽器異步實現)
連接: http://blog.csdn.net/acceptedxukai/article/details/7775746
在Tomcat 6.0下用JNDI連接IBM MQ 6.0的配置方法(一)
連接: http://blog.sina.com.cn/s/blog_60dadc490100ecy6.html
http://www.javawind.net/help/html/spring_ref_2.0/html/jms.html