消息中間件-Activemq之Master-Slaver
什么叫中間件?
中間件為軟件應用提供了操作系統所提供的服務之外的服務,可以把中間件描述為"軟件膠水"。中間件不是操作系統的一部分,不是數據庫操作系統,也不是應用軟件的一部分,而是能夠讓軟件開發者方便的處理通信、輸入和輸出,能夠專注自己應用的部分。
消息中間件解決了應用之間的消息傳遞、解耦、異步的問題。
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
一般中間件都提供了橫向擴展和縱向擴展,橫向擴展就是我們經常說的負載均衡,縱向擴展提供了Master-Slaver;
負載均衡:提供負載均衡的中間件都對外提供服務
Master-Slaver:同時只有一個中間件對外提供服務,當Master出現掛機等問題,Slaver會自動接管
看一個整合的簡圖:
當然activemq也提供了以上2中方式,分別是:Master-Slave和Broker Cluster
準備:
jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3
先來看看Master-Slave模式
1).Shared File System Master Slave
基于ActiveMQ的默認數據庫kahaDB完成的,kahaDB的底層是文件系統。這種方式的集群,Slave的個數沒有限制,哪個ActiveMQ實例先獲取共享文件的鎖,那個實例就是Master,其它的ActiveMQ實例就是Slave,當當前的Master失效,其它的Slave就會去競爭共享文件鎖,誰競爭到了誰就是Master
本次測試在同一臺機器上:
首先更改配置conf/activemq,做如下修改:
<persistenceAdapter> <!--<kahaDB directory="${activemq.data}/kahadb"/>--> <kahaDB directory="D:/kahaDB"/> </persistenceAdapter>將activemq拷貝3份,分別:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分別啟動activemq命令,啟動的日志分別是:

表示當前進程是Master
表示當前進程沒有獲取到鎖,作為Slaver
測試:
下面的例子中分別提供了Producer(Sender類)和Consumer(Receiver類)
我們首先用Producer發送消息給activemq,然后停止Master,然后再用Consumer接受消息,測試結果是可以接受到數據的。
2).JDBC Master Slave
JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,只是把共享文件系統換成了共享數據庫。
修改配置文件conf/activemq
<persistenceAdapter> <!--<kahaDB directory="${activemq.data}/kahadb"/>--> <!--<kahaDB directory="D:/kahaDB"/>--> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter>添加數據源:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean>注:這里使用的是mysql,所以需要mysql驅動程序: mysql-connector-java-5.1.18,講jar包放入lib文件夾下面,驅動版本不對,會出現如下錯誤: Database lock driver override not found for : [mysql_connect ...
分別拷貝到其他幾個文件夾下,分別啟動,啟動成功后我們可以看到數據庫中多了幾張表:
測試方式同上;
官網手冊:http://activemq.apache.org/jdbc-master-slave.html
3).Replicated LevelDB Store
這種方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master
修改配置文件conf/activemq:
<!--<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> <kahaDB directory="D:/kahaDB"/> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter>--> <persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="127.0.0.1:2181" hostname="127.0.0.1" sync="local_disk" zkPath="/activemq/leveldb-stores"/> </persistenceAdapter>首先啟動zookeeper,這里沒有做集群處理,默認端口是2181,然后分別啟動activemq,
啟動之后報錯:"activemq LevelDB IOException handler"。
原因:版本5.10.0存在的依賴沖突。
解決方案:
(1)移除lib目錄中的pax-url-aether-1.5.2.jar包
(2)注釋掉配置文件中的日志配置;
<!-- Allows accessing the server log <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean> -->
測試方式同上;
提供java版的例子:
public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("FirstQueue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("發送的消息" + i); System.out.println("發送消息:" + "ActiveMq 發送的消息" + i); producer.send(message); } } }
public class Receiver { public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }