ActiveMQ 使用說明
1介紹
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。
ActiveMQ最簡單的形式就是如下圖所示:
參與方有消息生產者、消息存儲轉發者、消費者
消息生產者:負責生產各種消息,發送到broker集群
消息存儲轉發者:broker集群,負責接收生產者生產的消息,向消費者提供消息
消費者:接收消息的一方
2 安裝
activemq安裝很簡單,解壓包至指定位置就可以,這里使用apache-activemq-5.10.0
Windows:apache-activemq-5.10.0-bin.zip
UNIX:apache-activemq-5.10.0-bin.tar.gz
3 啟動
Windows:%ACTIVE_MQ_HOME%bin\win64\activemq.bat
UNIX:nohup $ACTIVE_MQ_HOME/bin/activemq start < /tmp/smlog 2<&1 &
ActiveMQ默認使用的TCP連接端口是61616, 通過查看該端口的信息可以測試ActiveMQ是否成功啟動
檢查已經啟動
查看61616端口是否打開: netstat -an | grep 61616
web管理界面http://10.21.210.43:8161/admin/默認賬號密碼:admin/admin
4 queue和topic
1、JMS Queue 執行 load balancer語義:
一條消息僅能被一個consumer(消費者)收到。如果在message發送的時候沒有可用的consumer,那么它將被保存一直到能處理該 message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡。
點對點消息傳遞域的特點如下:
A.每個消息只能有一個消費者。
B.消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處于運行狀態,它都可以提取消息。
2、Topic 實現 publish和 subscribe 語義:
一條消息被publish時,它將發到所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。
發布/訂閱消息傳遞域的特點如下:
A.每個消息可以有多個消費者。
B.生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之后發布的消息。JMS 規范允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀態時發送的消息。
3、分別對應兩種消息模式:
Point-to-Point (點對點),Publisher/Subscriber Model (發布/訂閱者) 其中在 Publicher /Subscriber 模式下又有Nondurable subscription(非持久訂閱)和 durable subscription (持久化訂閱)2種消息處理方式(支持離線消息)。
A.在點對點消息傳遞域中,目的地被稱為隊列(queue);在發布/訂閱消息傳遞域中,目的地被成為主題(topic)。
B.非持久化訂閱:當consumer不在線時,broker(存儲、轉發消息部件)不會 為訂閱者保存消息
C.持久化訂閱:當consumer不在線時,broker(存儲、轉發消息部件)會為 訂閱者保存消息,當consumer上線后,broker會把所有緩存的消息發送給該consumer即訂閱者
4、非持久化傳輸和持久化傳輸
非持久化傳輸:當broker接收到provider生產的消息后,回復一個確認ACK,然后把消息保存在內存中,consumer從broker的內存中讀取消息,這種方式吞吐量較高,缺點是當某個broker掛掉時,消息會丟失。如果消息積壓過多,broker會把消息寫到臨時文件中,消費時從文件中讀取消息,這樣會降低吞吐量。Broker一旦重啟,這些臨時文件也會被刪除。
持久化傳輸:當broker接收到provider生產的消息后,回復一個確認ACK,然后把消息保存在存儲介質中,例如文件、MYSql、Oracle等。consumer消費消息時,從這些存儲介質中讀取,這種方式比較可靠,但是會降低吞吐量。
5 集群模式(amq的重要機制)
broker cluster + Networks of Brokers
讓broker知道其他broker,客戶端連接的一個broker掛了時,客戶端自動連接到其他broker
#更改默認的conf/activemq.xml
#這里使用了靜態發現,還有一種動態發現
"static:(tcp://10.21.210.43:61616,tcp://10.21.210.44:61616,tcp://10.21.210.45:61616)"/<
|
消費同一個隊列時,cosumer連接的broker掛掉,可以連接其他broker繼續consume
#生產者和消費者使用failover來連接broker cluster
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.21.210.43:61616,tcp://10.21.210.44:61616)");
|
往隊列produce時,客戶端會連上failover中的其中一臺傳輸數據,只有該broker掛掉,才會連failover中其他一臺broker.
當集群中所有broker都掛掉后,客戶端會阻塞等待,只要有broker能恢復過來,客戶端就能自動連接上。