基于總線的消息服務(BBMS)的設計與實現
前言
異步事件的通知機制在比較有規模的軟件設計中必然會有涉及,比如GUI程序中的事件監聽器,應用程序模塊之間的通信,以及分布式應用中的消息機制等。如果使用語言原生的對象注冊通信,則耦合度不可避免的會增大,也就是說,當時間發生時,A要通知B,則A必須知道B的存在。耦合度的增大在一定程度上必然會影響靈活性。所以,另一種模式就是今天要說的總線模式(BUS Based),即所有的監聽器將自己掛在總線上,這些監聽器互相之間是無法直接通信的,它們可以向總線上push消息,或者從總線上得到消息,從而實現相互間的通信,當然,這種模式會在性能上有一定的額外開銷。
BBMS的主頁在google code上:http://code.google.com/p/bbms/
總線機制
bbms的客戶端程序通過將自己注冊在BUS Server上來等待異步事件,這個過程可以是本地的,也可以是遠程的。本地的BUS可以作為GUI框架中的事件分發者 (dispatcher).JMS(Java Message Service)提供企業級的軟件模塊之間的通信機制,可以使得多個不同的應用集成為一個大型的應用。通過使用BBMS的遠程接口,同樣可以達到這樣的效果。
BBMS的API
- /**
- *
- * @author juntao.qiu
- *
- */
- public class Test{
- public static void main(String[] args) throws RemoteException{
- /*
- * create a notifiable entry, declare that it's care of
- * TIMEOUT, CLOSE, and READY event.
- */
- Configuration config = new RMIServerConfiguration(null, 0);
- CommonNotifiableEntry entry1 =
- new CommonNotifiableEntry(config, "client1",
- MessageTypes.MESSAGE_TIMEOUT |
- MessageTypes.MESSAGE_CLOSE |
- MessageTypes.MESSAGE_READY);
- /*
- * create another notifiable entry, declare that it's care of
- * OPEN, CLOSE, and TIMEOUT event.
- */
- CommonNotifiableEntry entry2 =
- new CommonNotifiableEntry(config, "client2",
- MessageTypes.MESSAGE_OPEN |
- MessageTypes.MESSAGE_CLOSE |
- MessageTypes.MESSAGE_TIMEOUT);
- // register them to the remote Message BUS to listener events
- entry1.register();
- entry2.register();
- // new a message, of type MESSAGE_OPEN.
- Message msg = new CommonMessage(
- entry1.getId(),
- entry2.getId(),
- MessageTypes.MESSAGE_OPEN,
- "busying now");
- // deliver it to entry2, which is from entry1
- entry1.post(msg);
- // create a message, of type MESSAGE_CLICKED, the entry2
- // does not handle this type, it'll not be deliver to entry2
- Message msgCannotBeReceived = new CommonMessage(
- entry1.getId(),
- entry2.getId(),
- MessageTypes.MESSAGE_CLICKED,
- "cliked evnet");
- entry1.post(msgCannotBeReceived);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // re use the message object to send another message entry
- msg.setSource(entry2.getId());
- msg.setTarget(entry1.getId());
- msg.setType(MessageTypes.MESSAGE_READY);
- msg.setBody("okay now");
- entry2.post(msg);
- // unregister self when all works are done or
- // don't want to listen any more
- entry1.unregister();
- entry2.unregister();
- }
- }
API的設計,最好可以做到簡單,易用。BBMS也盡力要做到這一點,每一個notifiable(可別通知的)的對象,可以將自己注冊到BUS上,當消息抵達時,BUS管理器會調用這個對象上的update方法,進行通知。
- This is client2, get message from : client1, it said that : busying now
- This is client1, get message from : client2, it said that : okay now
這個是MS運行的一個簡單流程圖。
BUS的實現
BUS接口的定義,可以向BUS上注冊一個notifiableEntry(可被通知的對象),或者卸載這個對象,同時,可以向BUS中post一條消息。
- package bbms.framework;
- /**
- * @author juntao.qiu
- */
- public interface Bus extends java.rmi.Remote{
- /**
- * mount an notifiable entry on bus
- * @param entry
- */
- public void mount(NotifiableEntry entry) throws java.rmi.RemoteException;
- /**
- * unmount the notifiable entry on bus
- * @param entry
- */
- public void unmount(NotifiableEntry entry) throws java.rmi.RemoteException;
- /**
- * post a new message to Message Bus
- * @param message
- */
- public void post(Message message) throws java.rmi.RemoteException;
- }
BUS的實現比較有意思,其中維護兩個鏈表,一個是監聽器鏈表,一個是消息鏈表,掛載在總線上的實體向BUS發送一條消息,這個過程會立即返回。因為發送消息的過程可能由于網絡原因或其他原因而延遲,而消息的發送者沒有必要等待消息的傳遞,所以BUS中有一個主動線程,這個線程在BUS中放入新的消息時被喚醒,并對監聽器鏈表進行遍歷,將消息分發出去。由于BUS是一個服務級的程序,所以這個主動線程被設計成為一個daemon線程,除非顯式的退出或者出錯,否則BUS將會一直運行。
- /**
- *
- * @author juntao.qiu
- * worker thread, dispatch message to appropriate listener
- *
- */
- private class Daemon implements Runnable{
- private boolean loop = true;
- public void run(){
- while(loop){
- if(messages.size() == 0){
- synchronized(messages){
- try {messages.wait();}
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- processIncomingMessage();
- }
- }
- }
- BUS中的內部工作者線程。它被作為一個Daemon線程:
- private MessageBus() throws RemoteException{
- listeners = new LinkedList<NotifiableEntry>();
- messages = new LinkedList<Message>();
- Daemon daemon = new Daemon();
- daemonThread = new Thread(daemon);
- daemonThread.setPriority(Thread.NORM_PRIORITY + 3);
- daemonThread.setDaemon(true);
- daemonThread.start();
- while(!daemonThread.isAlive());
- }
- 消息的定義
- public interface Message{
- public int getType();
- public void setType(int type);
- public String getTarget();
- public void setTarget(String target);
- public String getSource();
- public void setSource(String source);
- public Object getBody();
- public void setBody(Object body);
- }
為了更通用起見,消息體部分可以包含任何對象。消息類型參考了windows的消息機制,可以將消息進行復合:
- /*
- * 0x8000 = 1000 0000 0000 0000
- * 0x4000 = 0100 0000 0000 0000
- * 0x2000 = 0010 0000 0000 0000
- * 0x1000 = 0001 0000 0000 0000
- *
- * it's very useful when you want to combine some messages
- * together, and the user can simply determine what exactly
- * what you want. Refer the implementation of MessageBus.java
- * for more details.
- */
- public static final int MESSAGE_TIMEOUT = 0x8000;
- public static final int MESSAGE_CLICKED = 0x4000;
- public static final int MESSAGE_CLOSE = 0x2000;
- public static final int MESSAGE_OPEN = 0x1000;
- public static final int MESSAGE_READY = 0x0800;
- public static final int MESSAGE_BUSY = 0x0400;
- public static final int MESSAGE_WAIT = 0x0200;
- public static final int MESSAGE_OKAY = 0x0100;
總結
BBMS如果進行適當的擴展,可以完全實現JMS規范中涉及到的所有主題,如訂閱模式(BBMS現在的實現中只有PTP模式,及點對點的模式,發送消息和接受消息的實體都必須同時在線)。BBMS主要面向的是輕量級的消息傳遞,比如GUI,分布式的GUI等。如果有興趣,可以到BBMS 的頁面上看一看:http://code.google.com/p/bbms/
來源:http://www.cnblogs.com/abruzzi/archive/2009/07/25/1531068.html,作者:abruzzi