ActiveMQ持久化方式
消息持久性對于可靠消息傳遞來說應該是一種比較好的方法,有了消息持久化,即使發送者和接受者不是同時在線或者消息中心在發送者發送消息后宕機了,在消息中心重新啟動后仍然可以將消息發送出去,如果把這種持久化和ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。
消息持久性的原理很簡單,就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,然后試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。
1、AMQ
AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32M,如果一條消息的大小超過了32M,那么這個值必須設置大一點。當一個存儲文件中的消息已經全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用于ActiveMQ5.3之前的版本。默認配置如下:
<persistenceAdapter> <amqPersistenceAdapter directory="activemq-data"maxFileLength="32mb"/> </persistenceAdapter>
屬性如下:
屬性名稱 |
默認值 |
描述 |
directory |
activemq-data |
消息文件和日志的存儲目錄 |
useNIO |
true |
使用NIO協議存儲消息 |
syncOnWrite |
false |
同步寫到磁盤,這個選項對性能影響非常大 |
maxFileLength |
32Mb |
一個消息文件的大小 |
persistentIndex |
true |
消息索引的持久化,如果為false,那么索引保存在內存中 |
maxCheckpointMessageAddSize |
4kb |
一個事務允許的最大消息量 |
cleanupInterval |
30000 |
清除操作周期,單位ms |
indexBinSize |
1024 |
索引文件緩存頁面數,缺省為1024,當amq擴充或者縮減存儲時,會鎖定整個broker,導致一定時間的阻塞,所以這個值應該調整到比較大,但是代碼中實現會動態伸縮,調整效果并不理想。 |
indexKeySize |
96 |
索引key的大小,key是消息ID |
indexPageSize |
16kb |
索引的頁大小 |
directoryArchive |
archive |
存儲被歸檔的消息文件目錄 |
archiveDataLogs |
false |
當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
2、KahaDB
KahaDB是基于文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復的時間比AMQ短,從5.4版本之后KahaDB做為默認的持久化方式。默認配置如下:
<persistenceAdapter> <kahaDB directory="activemq-data"journalMaxFileLength="32mb"/> </persistenceAdapter>
KahaDB的屬性如下:
屬性名稱 |
默認值 |
描述 |
directory |
activemq-data |
消息文件和日志的存儲目錄 |
indexWriteBatchSize |
1000 |
一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中 |
indexCacheSize |
10000 |
內存中,索引的頁大小 |
enableIndexWriteAsync |
false |
索引是否異步寫到消息文件中 |
journalMaxFileLength |
32mb |
一個消息文件的大小 |
enableJournalDiskSyncs |
true |
是否講非事務的消息同步寫入到磁盤 |
cleanupInterval |
30000 |
清除操作周期,單位ms |
checkpointInterval |
5000 |
索引寫入到消息文件的周期,單位ms |
ignoreMissingJournalfiles |
false |
忽略丟失的消息文件,false,當丟失了消息文件,啟動異常 |
checkForCorruptJournalFiles |
false |
檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復 |
checksumJournalFiles |
false |
產生一個checksum,以便能夠檢測journal文件是否損壞。 |
5.4版本之后有效的屬性: |
|
|
archiveDataLogs |
false |
當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
directoryArchive |
null |
存儲被歸檔的消息文件目錄 |
databaseLockedWaitDelay |
10000 |
在使用負載時,等待獲得文件鎖的延遲時間,單位ms |
maxAsyncJobs |
10000 |
同個生產者產生等待寫入的異步消息最大量 |
concurrentStoreAndDispatchTopics |
false |
當寫入消息的時候,是否轉發主題消息 |
concurrentStoreAndDispatchQueues |
true |
當寫入消息的時候,是否轉發隊列消息 |
5.6版本之后有效的屬性: |
|
|
archiveCorruptedIndex |
false |
是否歸檔錯誤的索引 |
每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標隊列提交給filteredKahaDB,那么意味著對所有的隊列有效。如果一個隊列沒有對應的適配器,那么將會拋出一個異常。配置如下:
<persistenceAdapter> <mKahaDBdirectory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- match all queues --> <filteredKahaDBqueue=">"> <persistenceAdapter> <kahaDBjournalMaxFileLength="32mb"/> </persistenceAdapter> </filteredKahaDB> <!-- match all destinations --> <filteredKahaDB> <persistenceAdapter> <kahaDBenableJournalDiskSyncs="false"/> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
如果filteredKahaDB的perDestination屬性設置為true,那么匹配的目標隊列將會得到自己對應的KahaDB實例。配置如下:
<persistenceAdapter> <mKahaDBdirectory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- kahaDB per destinations --> <filteredKahaDB perDestination="true"> <persistenceAdapter> <kahaDBjournalMaxFileLength="32mb" /> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
3、JDBC
可以將消息存儲到數據庫中,例如:Mysql、SQL Server、Oracle、DB2。
配置JDBC適配器:
<persistenceAdapter> <jdbcPersistenceAdapterdataSource="#mysql-ds" createTablesOnStartup="false" /> </persistenceAdapter>
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之后改成false。
Mysql持久化bean: <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="poolPreparedStatements" value="true"/> </bean> SQL Server持久化bean: <bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close"> <property name="serverName" value="SERVERNAME"/> <property name="portNumber" value="PORTNUMBER"/> <property name="databaseName" value="DATABASENAME"/> <property name="user" value="USER"/> <property name="password" value="PASSWORD"/> </bean> Oracle持久化bean: <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> DB2持久化bean: <bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/> <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
4、LevelDB
這種文件系統是從ActiveMQ5.8之后引進的,它和KahaDB非常相似,也是基于文件的本地數據庫儲存形式,但是它提供比KahaDB更快的持久性。與KahaDB不同的是,它不是使用傳統的B-樹來實現對日志數據的提前寫,而是使用基于索引的LevelDB。
默認配置如下:
<persistenceAdapter> <levelDBdirectory="activemq-data"/> </persistenceAdapter>
屬性如下:
屬性名稱 |
默認值 |
描述 |
directory |
"LevelDB" |
數據文件的存儲目錄 |
readThreads |
10 |
系統允許的并發讀線程數量 |
sync |
true |
同步寫到磁盤 |
logSize |
104857600 (100 MB) |
日志文件大小的最大值 |
logWriteBufferSize |
4194304 (4 MB) |
日志數據寫入文件系統的最大緩存值 |
verifyChecksums |
false |
是否對從文件系統中讀取的數據進行校驗 |
paranoidChecks |
false |
盡快對系統內部發生的存儲錯誤進行標記 |
indexFactory |
org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory |
在創建LevelDB索引時使用 |
indexMaxOpenFiles |
1000 |
可供索引使用的打開文件的數量 |
indexBlockRestartInterval |
16 |
Number keys between restart points for delta encoding of keys. |
indexWriteBufferSize |
6291456 (6 MB) |
內存中索引數據的最大值 |
indexBlockSize |
4096 (4 K) |
每個數據塊的索引數據大小 |
indexCacheSize |
268435456 (256 MB) |
使用緩存索引塊允許的最大內存 |
indexCompression |
snappy |
適用于索引塊的壓縮類型 |
logCompression |
none |
適用于日志記錄的壓縮類型 |
5、 下面詳細介紹一下如何將消息持久化到Mysql數據庫中
? 需要將mysql的驅動包放置到ActiveMQ的lib目錄下
? 修改activeMQ的配置文件:
<persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"createTablesOnStartup="false"/> </persistenceAdapter>
在配置文件中的broker節點外增加:
<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <propertyname="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url"value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username"value="root"/> <property name="password" value="root"/> <property name="maxActive"value="200"/> <propertyname="poolPreparedStatements" value="true"/> </bean>
從配置中可以看出數據庫的名稱是activemq,需要手動在MySql中建立這個數據庫。
然后重新啟動activeMQ,會發現activemq多了三張表:
1:activemq_acks
2:activemq_lock
3:activemq_msgs
? 點到點類型
Sender類:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 2000; public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS客戶端到JMS Provider的連接 Connection connection = null; // Session:一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // MessageProducer:消息發送者 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try{ // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //獲取session,FirstQueue是一個服務器的queue destination = session.createQueue("FirstQueue"); // 得到消息生成者【發送者】 producer = session.createProducer(destination); //設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //構造消息 sendMessage(session, producer); //session.commit(); connection.close(); } catch(Exception e){ e.printStackTrace(); }finally{ if(null != connection){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generatedcatch block e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer producer)throws Exception{ for(int i=1; i<=SEND_NUMBER; i++){ TextMessage message = session.createTextMessage("ActiveMQ發送消息"+i); System.out.println("發送消息:ActiveMQ發送的消息"+i); producer.send(message); } } }
Receiver類:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS客戶端到JMS Provider的連接 Connection connection = null; // Session:一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = newActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //得到連接對象 connection =connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建Queue destination = session.createQueue("FirstQueue"); consumer =session.createConsumer(destination); while(true){ //設置接收者接收消息的時間,為了便于測試,這里定為100s TextMessagemessage = (TextMessage)consumer.receive(100000); if(null != message){ System.out.println("收到消息" +message.getText()); }else break; } }catch(Exception e){ e.printStackTrace(); }finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
測試:
測試一:
A、 先運行Sender類,待運行完畢后,運行Receiver類
B、 在此過程中activemq數據庫的activemq_msgs表中沒有數據
C、 再次運行Receiver,消費不到任何信息
測試二:
A、 先運行Sender類
B、 重啟電腦
C、 運行Receiver類,無任何信息被消費
測試三:
A、 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、 先運行Sender類,待運行完畢后,運行Receiver類
C、 在此過程中activemq數據庫的activemq_msgs表中有數據生成,運行完Receiver類后,數據清除
測試四:
A、 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、 運行Sender類
C、 重啟電腦
D、 運行Receiver類,有消息被消費
結論:
通過以上測試,可以發現,在P2P類型中當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在內存中,而當DeliveryMode設置為PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。而且P2P中消息一旦被Consumer消費就從broker中刪除。
? 發布/訂閱類型
Sender類:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 100; public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS客戶端到JMS Provider的連接 Connection connection = null; // Session:一個發送或接收消息的線程 Session session; // MessageProducer:消息發送者 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try{ //得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MQ_test"); // 得到消息生成者【發送者】 producer = session.createProducer(topic); //設置持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //構造消息 sendMessage(session, producer); //session.commit(); connection.close(); } catch(Exception e){ e.printStackTrace(); }finally{ if(null != connection){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generatedcatch block e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer producer)throws Exception{ for(int i=1; i<=SEND_NUMBER; i++){ TextMessage message = session.createTextMessage("ActiveMQ發送消息"+i); System.out.println("發送消息:ActiveMQ發送的消息"+i); producer.send(message); } } }
Receiver類:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS客戶端到JMS Provider的連接 Connection connection = null; // Session:一個發送或接收消息的線程 Session session; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = newActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠得到連接對象 connection =connectionFactory.createConnection(); connection.setClientID("clientID001"); // 啟動 connection.start(); // 獲取操作連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 獲取session Topic topic = session.createTopic("MQ_test"); // 得到消息生成者【發送者】 consumer = session.createDurableSubscriber(topic, "MQ_sub"); while(true){ //設置接收者接收消息的時間,為了便于測試,這里誰定為100s TextMessagemessage = (TextMessage)consumer.receive(100000); if(null != message){ System.out.println("收到消息" +message.getText()); }else break; } }catch(Exception e){ e.printStackTrace(); }finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
測試:
測試一:
A、先啟動Sender類
B、再啟動Receiver類
C、結果無任何記錄被訂閱
測試二:
A、先啟動Receiver類,讓Receiver在相關主題上進行訂閱
B、停止Receiver類,再啟動Sender類
C、待Sender類運行完成后,再啟動Receiver類
D、結果發現相應主題的信息被訂閱
?
來自:http://blog.csdn.net/xyw_eliot/article/details/9128219