消息中間件-Activemq之Master-Slaver

jopen 8年前發布 | 15K 次閱讀 ActiveMQ 消息系統

什么叫中間件?
中間件為軟件應用提供了操作系統所提供的服務之外的服務,可以把中間件描述為"軟件膠水"。中間件不是操作系統的一部分,不是數據庫操作系統,也不是應用軟件的一部分,而是能夠讓軟件開發者方便的處理通信、輸入和輸出,能夠專注自己應用的部分。

消息中間件解決了應用之間的消息傳遞、解耦、異步的問題。
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

一般中間件都提供了橫向擴展和縱向擴展,橫向擴展就是我們經常說的負載均衡,縱向擴展提供了Master-Slaver;

負載均衡:提供負載均衡的中間件都對外提供服務
Master-Slaver:同時只有一個中間件對外提供服務,當Master出現掛機等問題,Slaver會自動接管

看一個整合的簡圖:

當然activemq也提供了以上2中方式,分別是:Master-Slave和Broker Cluster

準備:
jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3

先來看看Master-Slave模式

1).Shared File System Master Slave
基于ActiveMQ的默認數據庫kahaDB完成的,kahaDB的底層是文件系統。這種方式的集群,Slave的個數沒有限制,哪個ActiveMQ實例先獲取共享文件的鎖,那個實例就是Master,其它的ActiveMQ實例就是Slave,當當前的Master失效,其它的Slave就會去競爭共享文件鎖,誰競爭到了誰就是Master

本次測試在同一臺機器上:
首先更改配置conf/activemq,做如下修改:

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <kahaDB directory="D:/kahaDB"/>
 </persistenceAdapter>
將activemq拷貝3份,分別:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分別啟動activemq命令,啟動的日志分別是:

表示當前進程是Master


表示當前進程沒有獲取到鎖,作為Slaver

測試:

下面的例子中分別提供了Producer(Sender類)Consumer(Receiver類)
我們首先用Producer發送消息給activemq,然后停止Master,然后再用Consumer接受消息,測試結果是可以接受到數據的。

2).JDBC Master Slave
JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,只是把共享文件系統換成了共享數據庫。

修改配置文件conf/activemq

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <!--<kahaDB directory="D:/kahaDB"/>-->
        <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> 
  </persistenceAdapter>
添加數據源:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
注:這里使用的是mysql,所以需要mysql驅動程序: mysql-connector-java-5.1.18,講jar包放入lib文件夾下面,驅動版本不對,會出現如下錯誤: Database lock driver override not found for : [mysql_connect ...


分別拷貝到其他幾個文件夾下,分別啟動,啟動成功后我們可以看到數據庫中多了幾張表:


測試方式同上;
官網手冊:http://activemq.apache.org/jdbc-master-slave.html

3).Replicated LevelDB Store
這種方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master

修改配置文件conf/activemq:

<!--<persistenceAdapter>
         <kahaDB directory="${activemq.data}/kahadb"/>
         <kahaDB directory="D:/kahaDB"/>
         <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> 
 </persistenceAdapter>-->
 <persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/leveldb"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="127.0.0.1:2181"
        hostname="127.0.0.1"
        sync="local_disk"
        zkPath="/activemq/leveldb-stores"/>
 </persistenceAdapter>
首先啟動zookeeper,這里沒有做集群處理,默認端口是2181,然后分別啟動activemq,
啟動之后報錯:"activemq LevelDB IOException handler"。 

原因:版本5.10.0存在的依賴沖突。

解決方案:
(1)移除lib目錄中的pax-url-aether-1.5.2.jar包
(2)注釋掉配置文件中的日志配置; 

<!-- Allows accessing the server log
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>
-->

測試方式同上;

提供java版的例子:

public class Sender {
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("FirstQueue");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("發送的消息"
                    + i);
            System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
            producer.send(message);
        }
    }
}
public class Receiver {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

來自: http://my.oschina.net/OutOfMemory/blog/596343

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!