消息中間件-Activemq之Master-Slaver
什么叫中間件?
中間件為軟件應用提供了操作系統所提供的服務之外的服務,可以把中間件描述為"軟件膠水"。中間件不是操作系統的一部分,不是數據庫操作系統,也不是應用軟件的一部分,而是能夠讓軟件開發者方便的處理通信、輸入和輸出,能夠專注自己應用的部分。
消息中間件解決了應用之間的消息傳遞、解耦、異步的問題。
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
一般中間件都提供了橫向擴展和縱向擴展,橫向擴展就是我們經常說的負載均衡,縱向擴展提供了Master-Slaver;
負載均衡:提供負載均衡的中間件都對外提供服務
Master-Slaver:同時只有一個中間件對外提供服務,當Master出現掛機等問題,Slaver會自動接管
看一個整合的簡圖:

當然activemq也提供了以上2中方式,分別是:Master-Slave和Broker Cluster
準備:
jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3
先來看看Master-Slave模式
1).Shared File System Master Slave
基于ActiveMQ的默認數據庫kahaDB完成的,kahaDB的底層是文件系統。這種方式的集群,Slave的個數沒有限制,哪個ActiveMQ實例先獲取共享文件的鎖,那個實例就是Master,其它的ActiveMQ實例就是Slave,當當前的Master失效,其它的Slave就會去競爭共享文件鎖,誰競爭到了誰就是Master
本次測試在同一臺機器上:
首先更改配置conf/activemq,做如下修改:
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="D:/kahaDB"/>
</persistenceAdapter>將activemq拷貝3份,分別:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分別啟動activemq命令,啟動的日志分別是:
表示當前進程是Master

表示當前進程沒有獲取到鎖,作為Slaver
測試:
下面的例子中分別提供了Producer(Sender類)和Consumer(Receiver類)
我們首先用Producer發送消息給activemq,然后停止Master,然后再用Consumer接受消息,測試結果是可以接受到數據的。
2).JDBC Master Slave
JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,只是把共享文件系統換成了共享數據庫。
修改配置文件conf/activemq
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<!--<kahaDB directory="D:/kahaDB"/>-->
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>添加數據源: <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>注:這里使用的是mysql,所以需要mysql驅動程序: mysql-connector-java-5.1.18,講jar包放入lib文件夾下面,驅動版本不對,會出現如下錯誤: Database lock driver override not found for : [mysql_connect ...
分別拷貝到其他幾個文件夾下,分別啟動,啟動成功后我們可以看到數據庫中多了幾張表:

測試方式同上;
官網手冊:http://activemq.apache.org/jdbc-master-slave.html
3).Replicated LevelDB Store
這種方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master
修改配置文件conf/activemq:
<!--<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
<kahaDB directory="D:/kahaDB"/>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>-->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="127.0.0.1:2181"
hostname="127.0.0.1"
sync="local_disk"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>首先啟動zookeeper,這里沒有做集群處理,默認端口是2181,然后分別啟動activemq, 啟動之后報錯:"activemq LevelDB IOException handler"。
原因:版本5.10.0存在的依賴沖突。
解決方案:
(1)移除lib目錄中的pax-url-aether-1.5.2.jar包
(2)注釋掉配置文件中的日志配置;
<!-- Allows accessing the server log
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
--> 測試方式同上;
提供java版的例子:
public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("發送的消息"
+ i);
System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
producer.send(message);
}
}
}public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}