activemq存儲
來自: http://my.oschina.net/goudingcheng/blog/606994
activemq消息存儲
1.1activemq存儲有兩種方式
1.1.1持久化
即使發送者和接受者不是同時在線或者消息中心在發送者發送消息后宕機了,在消息 中心重新啟動后仍然可以將消息發送出去,如果把這種持久化和
ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。消息持久性的原理很簡單,就是在發送者將消息發送出去后,消息中心首先將消息
存儲到本地數據文件、內存數據庫或者遠程數據庫等,然后試圖將消息發送 給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。
1.1.1非持久化。非持久化典型的運用在發送通知,或者實時的數據
1.2activemq是怎么存儲消息的
queens和toptic是采用不同的存儲方式
采取先進先出模式,同一時間,消息只會發送給某一個消費者,只有當該消息被消費并告知已收到時,它才能在代理的存儲中被刪除。
對于持久性訂閱來說,每一個消費者都會獲取消息的拷貝。為了節約空間,代理的存儲介質中只存儲了一份消息,存儲介質的持久訂閱對象為其以后的被存儲的消息維護了一個指針,
消費者消費時,從存儲介質中復制一個消息。消息被所有訂閱者獲取后才能刪除。
1.3kahadb消息存儲
   Kahadb是activemq從版本5.4之后的默認消息存儲引擎。他是基于文件(意味著第三方數據庫并不是先決條件),事務的消息存儲引擎,這個消息存儲能夠使
activemq下載和使用非常的快,kahadb使用事務log作為他的索引(所有destination僅僅有一個索引文件),在生產環境中支持10000個active的連接,每個連接都有單獨的queen
    消息存儲機制是消息中間件最重要的核心部件和性能提升點。一直想對它做一個完整分析,這次趁有時間對kahadb做一個較完整分析。
Kahadb是基于B-tree算法的,具體原理fusesource給了個原理說明(http://fusesource.com/docs/broker/5.4/persistence/KahaDB-Overview.html),下面我們從代碼實現角度進行一個較深入的分析。下面所有的分析都是基于activeMQ 5.4.3版本源碼,該版本里的kahadb版本是V3,且是基于queue進行的源碼分析,topic的實現雖然有不少差異,但整體可參考queue的。
1.3.1配置Kahadb
package com.activemq.store;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
public class EmbeddedBrokerUsingAMQStoreExample {
    public BrokerService createBrokerService() throws Exception{
        
        BrokerService broker=new BrokerService();
        File dataFileDir=new File("target/amq-in-action/kahadb");
        KahaDBStore kaha=new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*100);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(true);
        broker.setPersistenceAdapter(kaha);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
        
    }
}
在activemq里配置<persistenceAdapter>
<broker brokerName="broker" persistent="true" useShutdownHook="false">
....
<persistenceAdapter>
<kahaDB directory="active-data" journalMaxFileLength="16mb">
</persistenceAdapter>
.....
</broker>
2kahaDBm 消息存儲目錄結構
2.1.1db-*.log:
存放完整的每條消息(包括事務、目的地、id、優先級、具體內容等)和producerSequenceIdTracker(用來驗證每個消息生成者發送的消息是否重復的數據結構)。它隨著消息數量的增多,如每32M一個文件,
文件名按照數字進行編號,如db-1.log、db-2.log、db-3.log …
2.1.2db.data:
通過存放多個Btree數據結構來保存各類重要信息,下面一一進行介紹:
 Metadata類的destinations:用來保存該broker上有哪些Queue或隊列
 StoredDestination類的orderIndex屬性中的
defaultPriorityIndex、lowPriorityIndex、highPriorityIndex,這3個btree是為消息優先級排序而設計的(應該是版本5.4引入的,唉,有時候一個功能的引入帶來的代價可能比較大)。它們的主要作用
是為AbstractStoreCursor類的doFillBatch方法服務的,也就是常說的消息指針(message cursors)。當消息指針需要從磁盤文件中裝載一批消息的時候會使用這3個btree實例(kahadb版本小于2的不支
持lowPriorityIndex、highPriorityIndex)StoredDestination類的locationIndex:該btree的主要作用包括:
. 系統重啟進行恢復操作的時候,要移除掉不在db-*.log文件里的消息;
. 在系統進行定時checkpointUpdate時使用
2.1.3archive directory
StoredDestination類的messageIdIndex:該btree的主要作用是消息確認acknowledge操作時,通過消息ID在messageIdIndex中刪除對應的記錄,并依據返回的值刪除orderIndex和locationIndex中的記
錄上面這些就是kahadb中最主用的btree實例。
2.1.4db.redo:用來恢復Btree索引
Storage for Queue: 消息存儲是按照First in, First out(FIFO)的順序存儲的,One message is dispatched to a  singleconsumer at a time. 只有當消息被消費或被確認時,它才會被從消息存
儲中刪除。For durable sunscribers to a topic: 每個Consumer獲取一個消息的Copy。為了節省存儲空間,每個消息只有一個Copy被Broker存儲。存儲維護Durable Subscriber的指針,該指針指向下一
個存儲的消息,并且分發該消息的Copy給它的Consumer. 因為在這種情況下,每個消息都有多個潛在消費者,所以消息只有當被成功的傳遞給每個注冊的Durabel Subscriber后,才會被刪除KahaDB消息存儲
聯合使用快速的事務處理:Journal以及數據日志文件,該日志文件是消息ID的索引,并且 在內存中緩存消息AMQ消息存儲,類似于KahaDB。它使用了journal事務來確保穩定的持久、恢復、高性能的索引。
這種存儲方式主要用于大數據(消息)量的存儲。對每個索引文件,有兩個單獨的文件;一個用于每個Destination。如果每個Broker有上千個Queue,此時使用AMQ消息存儲是不合適的,并且恢復數據也很慢,
因為所有的索引文件需要rebuid,需要Broker掃描所有的Data logs去再次構建索引。
2.2下表是KahaDB的配置選項:
 KahaDB Properties
property name                      default value                                     Comments
directory                          activemq-data                     the path to the directory to use to store the message store data and log files 
IndexDirectory                                                        If set, configures where the KahaDB index files will be stored. If not set, the index 
                                                                      files are stored in the directory specified by the 'directory' attribute.
                                                                       Available as of ActiveMQ 5.10
indexWriteBatchSize                1000                               number of indexes written in a batch
indexCacheSize                     10000                              number of index pages cached in memory
enableIndexWriteAsync              false                              if set, will asynchronously write indexes
journalMaxFileLength               32mb                               a hint to set the maximum size of the message data logs
enableJournalDiskSyncs             true                               ensure every journal write is followed by a disk sync (JMS durability requirement)
cleanupInterval                    30000                              time (ms) before checking for a discarding/moving message data logs that are no longer used
checkpointInterval                 5000                               time (ms) before checkpointing the journal
ignoreMissingJournalfiles          false                              If enabled, will ignore a missing message log file
checkForCorruptJournalFiles        false                              If enabled, will check for corrupted Journal files on startup and try and recover them
checksumJournalFiles               false truev5.9                     create a checksum for a journal file - to enable checking for corrupted journals
archiveDataLogs                    false                              If enabled, will move a message data log to the archive directory instead of deleting it.
directoryArchive                   null                               Define the directory to move data logs to when they all the messages they contain have been consumed.
maxAsyncJobs                       10000                              the maximum number of asynchronous messages that will be queued awaiting storage (should be the same
                                                                      as the number of concurrent MessageProducers)
concurrentStoreAndDispatchTopics   false                              enable the dispatching of Topic messages to interested clients to happen concurrently with message 
                                                                      storage (Warning: Enabling this property is not recommended)
concurrentStoreAndDispatchQueues   true                                  enable the dispatching of Queue messages to interested clients to happen concurrently with message storage
archiveCorruptedIndex              false                              If enabled, corrupted indexes found at startup will be archived (not deleted)
preallocationStrategy              sparse_file                        (as of 5.12.0) This setting configures how the broker will try to preallocate the journal files when a new journal file is needed. The default allocation strategy sets the file length, but does not populate it with any data. The 'os_kernel_copy' strategy delegates the preallocation to the Operating System. The 'zeros' strategy configures ActiveMQ to do the preallocation by writing 0x00 to all of the positions in the journal file.
2.3amq消息存儲
amq是一種結合事務用來可持久化高性能的索引他能夠使存儲消息采用最好的方式,但是由于每個索引都采用2個獨立的文件,這里每個destination都有一個索引,在擁有成千上萬個queen的情況下不應該采用amq
并且如果amq關閉的不徹底的話,恢復將會變得特別慢,,這是因為amq要重建索引, AMQ消息存儲,類似于KahaDB。它使用了journal事務來確保穩定的持久、恢復、高性能的索引。這種存
儲方式主要用于大數據(消息)量的存儲。對每個索引文件,有兩個單獨的文件;一個用于每個Destination。如果每個Broker有上千個Queue,此時使用AMQ消息存儲是不合適的,并且恢復數據也很慢,因為所有的索引文
件需要rebuid,需要Broker掃描所有的Data logs去再次構建索引。
2.3.1amq消息內部
1.data log作為消息日志
2.緩存(cache)
3.the reference store
the amq消息存儲目錄結構
1.a lock file:確保僅僅一個broker能訪問數據
2.a temp-storeage directory用來存儲非持久化消息
3.the kr-store 這個目錄結構被amq消息存儲引用(the data &&state directory)
4.日志目錄
5備份目錄
配置amq消息存儲
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <amqPersistenceAdapter directory="target/Broker2-data/activemq-data"
                syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100"
                maxFileLength="10mb" />
        </persistenceAdapter>
    </broker>
</beans>
5.4jdbc消息存儲
JDBC消息存儲由三個表組成,,兩個表用來保存消息,第三個用來鎖表,確保只有一個activemq能夠訪問數據庫
Activemq_msgs sqltable
colunmn                default type                      desc
id                     integer                           the sequeence id used to retrieve the message 
container              varchar(250)                      the destination of the message 
msgid_prod             varchar(250)                      the id of the message producer
msgid_seq              integer                           the producer sequeence numer for the message , this together with the msgid_prod is equivalent to the jms messageid
expiration             bigint                            the time in milliseconds when the message will expire
msg                    blob                              the serialized message itself
topics 和queen被分解和存儲在Activemq_msgs
這里有兩張獨立的表用來保存持久的消息訂閱和id for 持久化消息訂閱者收到的上一條消息activemq
activemq_acks sql table
column name                default type                  desc
container                  varchar(250)                  the destination of the message 
sub_dest                   varchar(250)                  the destination of the durable subscriber
client_id                  varchar(250)                  the client of the durable subscriber
sub_name                   varchar(250)                  the subscriber name of the durable subscriber
selector                   varchar(250)                  the selector of the durable subscriber
last_acked_id              integer                       the sequeence id of last message receive by this subcriber
對于持久化消息訂閱者來說last_acked_id sequeence 被用來指向Activemq_msgs并且使他們 能夠很容易從發ctivemq_msgs select出來
activemq——lock表
column name                default type                  desc'
id                         integer                       a unique id for the lock
broker name                varchar(250)                   the name of the active broker that has the lock 
這章表用來確保只有一個activemqbroker 實例在一段時間內能夠接入數據庫,如果一個activemq broker 不能抓住database鎖,, 代理就不能初始化, 并且等到lock free(或者shutdown)
配置jdbc message store
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
        </persistenceAdapter>
    </broker>
    
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="admin"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean> 
</beans>
5.4.4利用jdbc消息存儲active mq日志(改善journal日志)
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory
            journalLogFiles="4"
            journalLogFileSize="32768"
            useJournal="true"
            useQuikJournal="true"
            dataSource="derby-ds"
            dataDirectory="activemq-data" />
            
        </persistenceFactory>
    </broker>
    
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb"/>
        <property name="createDatabase" value="create"/>
    </bean> 
</beans>
5.5內存消息存儲
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
        <transportConnector uri="tcp://localhost:61635">
    </transportConnectors>
    </broker>
    
</beans>
public BrokerService createEmbeddedBroker() throws Exception{
        
        BrokerService broker=new BrokerService();
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
        
    }
    
    public void createRetroactiveConsumer() throws Exception {
        ConnectionFactory fac = new ActiveMQConnectionFactory();
        Connection connection = fac.createConnection();
        connection.start();
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        Topic topic = session
                .createTopic("TEST.TOPIC?consumer.retroactive=true");
        MessageConsumer consumer = session.createConsumer(topic);
    }