ActiveMQ的插件開發介紹
ActiveMQ是一個流行的開源MQ,我們也大規模應用在網站的方方面面,每天處理上億消息,取得了較好效果。ActiveMQ有一個很好很強大的插 件體系,提供了很強的擴展能力,ActiveMQ本身就是使用這一套插件體系實現了很多擴展功能,包括他的權限管理,日志管理,事務等模塊都是作為一個插 件集成的,我們自己也在消息路由、補償式事務方面使用了它的插件功能,確實非常方便。
在ActiveMQ中,Broker代表一個運行的MQ節點,ActiveMQ的插件實際上是基于Broker的一個Filter鏈,整個設計類似于 servlet的Filter結構,所有的Plugin構成一個鏈式結構,每個插件實際上都是一個"Interceptor",類結構圖如下:

其中Broker接口封裝了一個AMQ節點的方方面面的方法,包括連接管理、session管理、消息的發送和接收以及其它的一些功 能,BrokerFilter實現這個接口,并提供了鏈式結構支持,可以攔截所有Broker方法的實現并傳遞結果給鏈式結構的下一個,形成了一個完整 的"職責鏈"模式,具體層次關系如下,其中,"System Plugin"是指AMQ內部使用Plugin機制實現的一些系統功能,用戶不能定制,"AMQ Plugin"指的是ActiveMQ已經實現好了,可以在配置文件中自由選擇的一些插件,例如簡單的安全插件,JAAS安全插件和DLQ插件等等,用戶 插件就是指用戶自己實現的amq插件,需要用戶把相關jar包放入到amq的啟動classpath中,并在配置文件中進行配置才能正確加載的插件。

在上面這個層次結構中,最下面的RegionBroker是核心組件,在其之上的都是Broker的插件,繼承之于BrokerFilter,和Broker保持接口兼容但是擴展Broker的功能。
下面舉一個簡單的例子,具體說明一下AMQ的插件是如何工作的。
我們在使用AMQ的過程中發現,在測試環境維護方面有很大的麻煩,具體表現在很多同學在測試項目的時候往往只關注自己項目牽涉的隊列,不會去消費其 他"不相關"的隊列,這樣導致的一個問題就是ActiveMQ經常發生大量數據阻塞,導致測試環境不可用,影響相關項目的測試工作。為了避免這個問題,我 們假定在測試環境可以定義以下一些限制條件:
1、 所有隊列堆積消息不超過1000條,超過之后立即清除。
2、 消息超過1個小時沒有消費,就直接過期。
我們可以編寫一個簡單的amq插件來完成這兩個限制條件:
首先,編寫一個插件安裝類:
package com.alibaba.napoli.plugins;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageControlBrokerPlugin implements BrokerPlugin {
private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageControlBrokerPlugin");
return new MessageControlBroker(broker);
}
}
其次,編寫真正的插件實現:
package com.alibaba.napoli.plugins;
import java.io.IOException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 開發環境管理插件,符合兩個條件進行消息清理:<br>
* 1 消息累積超過1000條
* 2 消息超過1個小時無人消費
* @author guolin.zhuanggl
*
*/
public class MessageControlBroker extends BrokerFilter {
public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
private static final long DEFAULT_EXPIRATION = 3600*1000;
private static final long DEFAULT_PURGE_COUNT = 1000;
public MessageControlBroker(Broker next) {
super(next);
}
@Override
public void messageExpired(ConnectionContext context,
MessageReference message) {
Message msg = null;
try {
msg = message.getMessage();
} catch (IOException e) {
log.error("failed to fetch content: ",e);
}
purgeMessage(msg);
// TODO Auto-generated method stub
super.messageExpired(context, message);
}
/**
* 清除隊列中的所有消息
*/
private void purgeMessage(Message message){
Destination r = message.getRegionDestination();
if(r instanceof Queue){
try {
//如果累積消息超過1000個,清除隊列消息
if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
((Queue) r).purge();
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed to purge queue "+r.getName(),e);
}
}
}
/**
* 當消息發送時,全部設置過期時間1個小時,測試環境專用!!!
*/
@Override
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
long oldExp = messageSend.getExpiration();
messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
purgeMessage(messageSend);
super.send(producerExchange, messageSend);
}
}
然后,將這兩個類打包為myplugin.jar,并放在activemq啟動目錄下的lib目錄下
最后,在activemq.xml文件中增加一個簡單的spring配置項:
<bean xmlns="http://www.springframework.org/schema/beans"
id="purgePlugin"
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
然后,重啟activemq,就會發現這個插件已經被加載。