基于總線模式的消息服務
前言
一直以來,都對異步事件很感興趣,比如一個應用在運行一個耗時的過程時,最好的方式是提交這個耗時的過程給一個專門的工作線程,然后立即返回到主線程上,進行其他的任務,而工作線程完成耗時任務后,異步的通知主線程,這個過程本身是很有意思的。傳統的事件-監聽器模型可以較好的解決這個問題,不過事件和監聽器兩者的耦合往往略顯緊密,所以需要另一種實現,使得這兩者的耦合盡量小,那樣模塊可以比較通用。
總線模式
前幾天跟同事討論了下Swing中的消息機制,同事給我講了下總線模式的消息機制,感覺很有意思,于是周末就自己實現了下。具體的思路是這樣的:
- 系統中存在一個消息服務(Message Service),即總線
- 監聽器對象,通過實現一個可被通知的對象的接口,將自己注冊在消息服務上
- 可被通知的對象可以向消息總線上post消息,就這個對象而言,它對其他注冊在總線上的對象是一無所知的
- 消息服務進行消息的調度和轉發,將消息(事件)發送給指定的對象,從而傳遞這個異步事件
這個思路最大的好處是,事件被抽象成消息(Message),具有統一的格式,便于傳遞。掛在總線上的監聽器互相不知道對方的存在,監聽器可以指定自己感興趣的消息類型,消息可以是廣播的形式,也可以是點對點的。(后來參看了下JMS,其中有pub/sub的模式(即訂閱模式),不過,對于異步消息的傳遞來說,這個可以不必實現)
消息服務
消息服務可以將一大堆分布在不同物理機上的應用整合起來,進行通信,可以將一些小的應用整合為一個大的,可用的應用系統。
用一個例子來說吧:
public class Test{
public static void main(String[] args) throws RemoteException{
/*
* 創建一個可被通知的對象(監聽器), 這個監聽器關注這樣幾個事件
* TIMEOUT, CLOSE, and READY
*/
Configuration config = new RMIServerConfiguration(null, 0);
CommonNotifiableEntry entry1 =
new CommonNotifiableEntry(config, "client1",
MessageTypes.MESSAGE_TIMEOUT |
MessageTypes.MESSAGE_CLOSE |
MessageTypes.MESSAGE_READY);
/*
* 創建另一個監聽器, 這個監聽器關注這樣幾個事件
* OPEN, CLOSE, and TIMEOUT.
*/
CommonNotifiableEntry entry2 =
new CommonNotifiableEntry(config, "client2",
MessageTypes.MESSAGE_OPEN |
MessageTypes.MESSAGE_CLOSE |
MessageTypes.MESSAGE_TIMEOUT);
// 將監聽器掛在BUS上
entry1.register();
entry2.register();
// 創建一個新的消息, MESSAGE_OPEN類型.
Message msg = new CommonMessage(
entry1.getId(),
entry2.getId(),
MessageTypes.MESSAGE_OPEN,
"busying now");
// 傳遞給entry2
entry1.post(msg);
// 創建一個MESSAGE_CLICKED類型的消息, entry2
// 不關注這個類型的消息,所以此消息不會被傳遞
Message msgCannotBeReceived = new CommonMessage(
entry1.getId(),
entry2.getId(),
MessageTypes.MESSAGE_CLICKED,
"cliked evnet");
entry1.post(msgCannotBeReceived);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// re use the message object to send another message entry
msg.setSource(entry2.getId());
msg.setTarget(entry1.getId());
msg.setType(MessageTypes.MESSAGE_READY);
msg.setBody("okay now");
entry2.post(msg);
// 卸載這些監聽器,當程序退出,或者
// 或者監聽器不在關注事件發生的時候
entry1.unregister();
entry2.unregister();
}
}
當前,這個系統可以支持遠程的消息傳遞(通過java的RMI機制),不過對于尋址方面還沒有做進一步的處理,有時間再來完善吧。
消息服務的實現
下面我把消息服務的主要實現部分貼出來分析一下:
/**
*
* @author Abruzzi
*
*/
public class MessageBus extends UnicastRemoteObject implements Bus{
private static MessageBus instance;
private List<NotifiableEntry> listeners;
private List<Message> messages;
private Thread daemonThread = null;
public static MessageBus getInstance() throws RemoteException{
if(instance == null){
instance = new MessageBus();
}
return instance;
}
private MessageBus() throws RemoteException{
listeners = new LinkedList<NotifiableEntry>();
messages = new LinkedList<Message>();
Daemon daemon = new Daemon();
daemonThread = new Thread(daemon);
daemonThread.setPriority(Thread.NORM_PRIORITY + 3);
daemonThread.setDaemon(true);
daemonThread.start();
while(!daemonThread.isAlive());
}
/**
* mount notifiable object to listener list
*/
public void mount(NotifiableEntry entry) throws RemoteException{
synchronized(listeners){
listeners.add(entry);
listeners.notifyAll();
}
}
/**
* unmount the special notifiable object from listener
*/
public void unmount(NotifiableEntry entry) throws RemoteException{
synchronized(listeners){
listeners.remove(entry);
listeners.notifyAll();
}
}
/**
* post a new message into the bus
* @param message
*/
public void post(Message message) throws RemoteException{
synchronized(messages){
messages.add(message);
messages.notifyAll();
}
}
/**
*
* @author Abruzzi
* worker thread, dispatch message to appropriate listener
*
*/
private class Daemon implements Runnable{
private boolean loop = true;
public void run(){
while(loop){
if(messages.size() == 0){
synchronized(messages){
try {messages.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
processIncomingMessage();
}
}
}
/**
* process the incoming message, remove the first message from
* queue, and then check all listeners to see whether should
* deliver the message to or not.
*/
private void processIncomingMessage(){
Message msg;
synchronized(messages){
msg = messages.remove(0);
}
String target = null;
int type = 0;
int mask = 0;
try {
target = msg.getTarget();
type = msg.getType();
if(target == MessageTypes.SENDTOALL){
for(NotifiableEntry entry : listeners){
mask = entry.getSense();
if((mask & type) == type){entry.update(msg);}
}
}else{
for(NotifiableEntry entry : listeners){
mask = entry.getSense();
if(entry.getId().equals(target) && (mask & type) == type){
entry.update(msg);
}
}
}
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
消息總線是一個RMI對象,其中mount(), unmout(), post()等方法可以被遠程調用。MessageBus維護兩個列表,一個消息列表,一個監聽器列表。當消息被post到總線上后,post會立即返回,然后工作線程啟動,取出消息并將其分發到合適的監聽器上。
可能,對同步的處理上考慮不夠周全,下來再繼續修改。
P.S.我將這個項目托管在google code上了,叫BBMS(Bus Based Message Service),感興趣的可以去看看:http://code.google.com/p/bbms/。