ActiveMQ –JMS開源框架入門介紹
介紹基本的JMS概念與開源的JMS框架ActiveMQ應用,內容涵蓋一下幾點:
1. 基本的JMS概念
2. JMS的消息模式
3. 介紹ActiveMQ
4. 一個基于ActiveMQ的JMS例子程序
一:JMS基本概念
1. JMS的目標
為企業級的應用提供一種智能的消息系統,JMS定義了一整套的企業級的消息概念與工具,
盡可能最小化的Java語言概念去構建最大化企業消息應用。統一已經存在的企業級消息系
統功能。
2. JMS提供者
JMS提供者是指那些完全完成JMS功能與管理功能的JMS消息廠商,理論上JMS提供者完成
JMS消息產品必須是100%的純Java語言實現,可以運行在跨平臺的架構與操作系統上,當前
一些JMS廠商包括IBM,Oracle, JBoss社區 (JBoss Community), Apache 社區(ApacheCommunity)。
3. JMS應用程序, 一個完整的JMS應用應該實現以下功能:
JMS 客戶端 – Java語言開發的接受與發送消息的程序
非JMS客戶端 – 基于消息系統的本地API實現而不是JMS
消息 – 應用程序用來相互交流信息的載體
被管理對象–預先配置的JMS對象,JMS管理員創建,被客戶端運用。如鏈接工廠,主題等
JMS提供者–完成JMS功能與管理功能的消息系統
二:JMS的消息模式
1. 點對點的消息模式(Point to Point Messaging)

下面的JMS對象在點對點消息模式中是必須的:
a. 隊列(Queue) – 一個提供者命名的隊列對象,客戶端將會使用這個命名的隊列對象
b. 隊列鏈接工廠(QueueConnectionFactory) – 客戶端使用隊列鏈接工廠創建鏈接隊列
ConnectionQueue來取得與JMS點對點消息提供者的鏈接。
c. 鏈接隊列(ConnectionQueue) – 一個活動的鏈接隊列存在在客戶端與點對點消息提供者之
間,客戶用它創建一個或者多個JMS隊列會話(QueueSession)
d. 隊列會話(QueueSession) – 用來創建隊列消息的發送者與接受者(QueueSenderand
QueueReceiver)
e. 消息發送者(QueueSender 或者MessageProducer)– 發送消息到已經聲明的隊列
f. 消息接受者(QueueReceiver或者MessageConsumer) – 接受已經被發送到指定隊列的消息
2. 發布訂閱模式(publish – subscribe Mode)

a. 主題Topic(Destination) – 一個提供者命名的主題對象,客戶端將會使用這個命名的主題對象
b. 主題鏈接工廠(TopciConnectionFactory) – 客戶端使用主題鏈接工廠創建鏈接主題
ConnectionTopic來取得與JMS消息Pub/Sub提供者的鏈接。
c. 鏈接主題(ConnectionTopic) – 一個活動的鏈接主題存在發布者與訂閱者之間
d. 會話(TopicSession) – 用來創建主題消息的發布者與訂閱者 (TopicPublisher and
TopicSubscribers)
e. 消息發送者MessageProducer) – 發送消息到已經聲明的主題
f. 消息接受者(MessageConsumer) – 接受已經被發送到指定主題的消息
三:介紹ActiveMQ
ActiveMQ是apache社區完成的JMS開源消息組件,客戶端支持多種語言調用,包括Java,C++, C#,
Perl, Python等。支持Spring配置集成等。更多信息訪問這里:
http://activemq.apache.org/index.html
四:基于ActiveMQ的Publish/subscribe模式Demo程序
消息Broker,JMSprovider
import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* refer to http://activemq.apache.org/jndi-support.html
* http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
* @author gloomyfish
*
*/
public class PureJMSProducer {
private static final Log LOG = LogFactory.getLog(PureJMSProducer.class);
private PureJMSProducer() {
}
/**
* @param args the destination name to send to and optionally, the number of
* messages to send
*/
public static void main(String[] args) {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
BrokerService broker = null;
final int numMsgs = 10;
/*
* Create a JNDI API InitialContext object
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
LOG.info("Could not create JNDI API context: " + e.toString());
System.exit(1);
}
// create external TCP broker
try {
broker = BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616"));
broker.start();
} catch (URISyntaxException e) {
LOG.info("Could not create broker: " + e.toString());
} catch (Exception e) {
LOG.info("Could not create broker: " + e.toString());
}
// try {
//
// }
/*
* Look up connection factory and destination.
*/
try {
connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");
destination = (Destination)jndiContext.lookup("MyTopic");
} catch (NamingException e) {
LOG.info("JNDI API lookup failed: " + e);
System.exit(1);
}
/*
* Create connection. Create session from connection; false means
* session is not transacted. Create sender and text message. Send
* messages, varying text slightly. Send end-of-messages message.
* Finally, close connection.
*/
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
Thread.sleep(3000);
for (int i = 0; i < numMsgs; i++) {
message.setText("This is message " + (i + 1));
LOG.info("Sending message: " + message.getText());
producer.send(message);
Thread.sleep(3000);
}
/*
* Send a non-text control message indicating end of messages.
*/
producer.send(session.createMessage());
} catch (JMSException e) {
LOG.info("Exception occurred: " + e);
} catch (InterruptedException e) {
LOG.info("Exception occurred: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
}
}
}
// stop the TCP broker
try {
broker.stop();
} catch (Exception e) {
LOG.info("stop the broker failed: " + e);
}
}
}客戶端: import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQClient {
public static void main(String[] args) throws IOException {
// -- http://dlc.sun.com/pdf//816-5904-10/816-5904-10.pdf
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://locahost");
Connection connection = factory.createConnection();
connection.start();
// create message topic
//Topic topic= new ActiveMQTopic("MyTopic");
InitialContext jndiContext=new InitialContext();
Topic topic=(Topic)jndiContext.lookup("MyTopic");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// register message consumer
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(30000);
session.close();
connection.stop();
} catch(Exception e) {
e.printStackTrace();
}
}
} 項目配置,Jar依賴:

依賴的三個Jar分別為:
- activemq-all.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- xbean-spring.jar
轉自:http://blog.csdn.net/jia20003/article/details/7601176