activemq實例
來自: http://my.oschina.net/goudingcheng/blog/608872
ACTIVEMQ入門實例
1.下載版本版本:apache-activemq-5.4.1
2.解壓
運行apache-activemq-5.4.1\bin下的activemq
Failed to start ActiveMQ JMS Message Broker. Reason: java.io.EOFException: Chunk stream does not exist
若出現以上情況:就參考這個文章http://www.cnblogs.com/kaka/archive/2012/03/15/2398215.html在conf下的activemq。xml里面<broker
添加 schedulerSupport="false"
3.創建一個queen
package com.activemq.jmsApplication01;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.omg.CORBA.Context;
/*
* Sending a jms MESSAGE
* author paohaijiao
* date 2016/01/18
*
*
* */
public class MyMessageProducer {
static boolean useTransation = false;
static ConnectionFactory connectionFactory;
static Connection connection;
static Session session;
static Destination destination;
static MessageProducer producer;
static Message message;
public static void init() {
try {
/*
InitialContext ctx = new InitialContext();// 1.acquire a jms
// connection factory
connectionFactory = (ConnectionFactory) ctx
.lookup("ConnectionFactoryName");*/
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
connection = connectionFactory.createConnection();// 2 create a jms
// using the
// connection
// factory
connection.start();// 3.start connection
session = connection.createSession(useTransation,
Session.AUTO_ACKNOWLEDGE);// 4.create jms session from a
// connection
destination = session.createQueue("TEST.QUEUE");// 5,acquire jms
// destination
producer = session.createProducer(destination);// 6create jms
// producer,or
// create message
// and addr to a
// destination
// 7.create jms consumer or registera jms message listerner
message = session.createTextMessage("this is text");//
producer.send(message);// 8.send /recieve jms message
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
// 9.Release Resource
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
4.這時查看http://localhost:8161/admin/ 里面的queens就有記錄了
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數
package com.activemq.jmsApplication01;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyAsyncMessageConsumer implements MessageListener {
static boolean useTransation = false;
static ConnectionFactory connectionFactory;
static Connection connection;
static Session session;
static Destination destination;
static MessageConsumer consumer;
static Message message;
public void RecieveMessage() {
try {
// InitialContext ctx = new InitialContext();
//connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(useTransation,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST.QUEUE");
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.close();
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
System.out.println("Recieve message:" + message);
}
}
}若采用同步:代碼如下
package com.activemq.jmsApplication01;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MySyncMessageConsumer {
static boolean useTransation = false;
static ConnectionFactory connectionFactory;
static Connection connection;
static Session session;
static Destination destination;
static MessageConsumer consumer;
public void RecieveMessage() {
try {
/*InitialContext ctx = new InitialContext();
connectionFactory = (ConnectionFactory) ctx
.lookup("ConnectionFactoryName");*/
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(useTransation,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST.QUEUE");
consumer = session.createConsumer(destination);
System.out.println("init");
Message message = consumer.receive(100000);
while(true){
if (null ==message) {
System.out.println("收到消息" + ((TextMessage) message).getText());
} else{
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
connection.close();
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
因為針對消費比較快的消費者,我們使用同步(可以減少異步發送消息時產生的上下文切換),針對消費比較慢的消費者,我們使用異步。 同步發送消息的缺點是,對于生產者發送的消息,如果消費者消費的比較慢,那么生產者就會被阻塞。
默認配置是異步發送 (displayatchAsync=true),這種配置也保證了MQ的高性能。