基于總線的消息服務(BBMS)的設計與實現

openkk 13年前發布 | 4K 次閱讀

前言
異步事件的通知機制在比較有規模的軟件設計中必然會有涉及,比如GUI程序中的事件監聽器,應用程序模塊之間的通信,以及分布式應用中的消息機制等。如果使用語言原生的對象注冊通信,則耦合度不可避免的會增大,也就是說,當時間發生時,A要通知B,則A必須知道B的存在。耦合度的增大在一定程度上必然會影響靈活性。所以,另一種模式就是今天要說的總線模式(BUS Based),即所有的監聽器將自己掛在總線上,這些監聽器互相之間是無法直接通信的,它們可以向總線上push消息,或者從總線上得到消息,從而實現相互間的通信,當然,這種模式會在性能上有一定的額外開銷。

BBMS的主頁在google code上:http://code.google.com/p/bbms/

總線機制


bbms的客戶端程序通過將自己注冊在BUS Server上來等待異步事件,這個過程可以是本地的,也可以是遠程的。本地的BUS可以作為GUI框架中的事件分發者 (dispatcher).JMS(Java Message Service)提供企業級的軟件模塊之間的通信機制,可以使得多個不同的應用集成為一個大型的應用。通過使用BBMS的遠程接口,同樣可以達到這樣的效果。

BBMS的API

  1. /**  
  2.  *   
  3.  * @author juntao.qiu  
  4.  *  
  5.  */  
  6. public class Test{   
  7.     public static void main(String[] args) throws RemoteException{   
  8.         /*  
  9.          * create a notifiable entry, declare that it's care of  
  10.          * TIMEOUT, CLOSE, and READY event.  
  11.          */  
  12.         Configuration config = new RMIServerConfiguration(null0);   
  13.         CommonNotifiableEntry entry1 =    
  14.             new CommonNotifiableEntry(config, "client1",    
  15.                 MessageTypes.MESSAGE_TIMEOUT |    
  16.                 MessageTypes.MESSAGE_CLOSE |    
  17.                 MessageTypes.MESSAGE_READY);   
  18.            
  19.         /*  
  20.          * create another notifiable entry, declare that it's care of  
  21.          * OPEN, CLOSE, and TIMEOUT event.  
  22.          */  
  23.         CommonNotifiableEntry entry2 =    
  24.             new CommonNotifiableEntry(config, "client2",    
  25.                 MessageTypes.MESSAGE_OPEN |    
  26.                 MessageTypes.MESSAGE_CLOSE |    
  27.                 MessageTypes.MESSAGE_TIMEOUT);   
  28.            
  29.         // register them to the remote Message BUS to listener events   
  30.         entry1.register();   
  31.         entry2.register();   
  32.            
  33.         // new a message, of type MESSAGE_OPEN.   
  34.         Message msg = new CommonMessage(   
  35.                 entry1.getId(),   
  36.                 entry2.getId(),   
  37.                 MessageTypes.MESSAGE_OPEN,   
  38.                 "busying now");   
  39.            
  40.         // deliver it to entry2, which is from entry1   
  41.         entry1.post(msg);   
  42.            
  43.         // create a message, of type MESSAGE_CLICKED, the entry2   
  44.         // does not handle this type, it'll not be deliver to entry2   
  45.         Message msgCannotBeReceived = new CommonMessage(   
  46.                 entry1.getId(),   
  47.                 entry2.getId(),   
  48.                 MessageTypes.MESSAGE_CLICKED,   
  49.                 "cliked evnet");   
  50.         entry1.post(msgCannotBeReceived);   
  51.            
  52.         try {   
  53.             Thread.sleep(2000);   
  54.         } catch (InterruptedException e) {   
  55.             e.printStackTrace();   
  56.         }   
  57.            
  58.         // re use the message object to send another message entry   
  59.         msg.setSource(entry2.getId());   
  60.         msg.setTarget(entry1.getId());   
  61.         msg.setType(MessageTypes.MESSAGE_READY);   
  62.         msg.setBody("okay now");   
  63.         entry2.post(msg);   
  64.            
  65.         // unregister self when all works are done or    
  66.         // don't want to listen any more   
  67.         entry1.unregister();   
  68.         entry2.unregister();   
  69.     }   
  70. }   

API的設計,最好可以做到簡單,易用。BBMS也盡力要做到這一點,每一個notifiable(可別通知的)的對象,可以將自己注冊到BUS上,當消息抵達時,BUS管理器會調用這個對象上的update方法,進行通知。

  1. This is client2, get message from : client1, it said that : busying now   
  2. This is client1, get message from : client2, it said that : okay now   


這個是MS運行的一個簡單流程圖。

BUS的實現
BUS接口的定義,可以向BUS上注冊一個notifiableEntry(可被通知的對象),或者卸載這個對象,同時,可以向BUS中post一條消息。

  1. package bbms.framework;   
  2.   
  3. /**  
  4.  * @author juntao.qiu  
  5.  */  
  6. public interface Bus extends java.rmi.Remote{   
  7.     /**  
  8.      * mount an notifiable entry on bus  
  9.      * @param entry  
  10.      */  
  11.     public void mount(NotifiableEntry entry) throws java.rmi.RemoteException;   
  12.        
  13.     /**  
  14.      * unmount the notifiable entry on bus  
  15.      * @param entry  
  16.      */  
  17.     public void unmount(NotifiableEntry entry) throws java.rmi.RemoteException;   
  18.        
  19.     /**  
  20.      * post a new message to Message Bus  
  21.      * @param message  
  22.      */  
  23.     public void post(Message message) throws java.rmi.RemoteException;   
  24. }   
  25.   

BUS的實現比較有意思,其中維護兩個鏈表,一個是監聽器鏈表,一個是消息鏈表,掛載在總線上的實體向BUS發送一條消息,這個過程會立即返回。因為發送消息的過程可能由于網絡原因或其他原因而延遲,而消息的發送者沒有必要等待消息的傳遞,所以BUS中有一個主動線程,這個線程在BUS中放入新的消息時被喚醒,并對監聽器鏈表進行遍歷,將消息分發出去。由于BUS是一個服務級的程序,所以這個主動線程被設計成為一個daemon線程,除非顯式的退出或者出錯,否則BUS將會一直運行。

  1.     /**  
  2.      *   
  3.      * @author juntao.qiu  
  4.      * worker thread, dispatch message to appropriate listener  
  5.      *  
  6.      */  
  7.     private class Daemon implements Runnable{   
  8.         private boolean loop = true;   
  9.         public void run(){   
  10.             while(loop){   
  11.                 if(messages.size() == 0){   
  12.                     synchronized(messages){   
  13.                         try {messages.wait();}    
  14.                         catch (InterruptedException e) {   
  15.                             e.printStackTrace();   
  16.                         }   
  17.                     }   
  18.                 }   
  19.                 processIncomingMessage();   
  20.             }   
  21.         }   
  22.     }   
  23. BUS中的內部工作者線程。它被作為一個Daemon線程:   
  24.     private MessageBus() throws RemoteException{   
  25.         listeners = new LinkedList<NotifiableEntry>();   
  26.         messages = new LinkedList<Message>();   
  27.         Daemon daemon = new Daemon();   
  28.         daemonThread = new Thread(daemon);   
  29.         daemonThread.setPriority(Thread.NORM_PRIORITY + 3);   
  30.         daemonThread.setDaemon(true);   
  31.         daemonThread.start();   
  32.            
  33.         while(!daemonThread.isAlive());   
  34.     }   
  35. 消息的定義   
  36. public interface Message{   
  37.     public int getType();   
  38.     public void setType(int type);   
  39.        
  40.     public String getTarget();   
  41.     public void setTarget(String target);   
  42.        
  43.     public String getSource();   
  44.     public void setSource(String source);   
  45.        
  46.     public Object getBody();   
  47.     public void setBody(Object body);   
  48. }   

為了更通用起見,消息體部分可以包含任何對象。消息類型參考了windows的消息機制,可以將消息進行復合:

  1. /*   
  2.  * 0x8000 = 1000 0000 0000 0000  
  3.  * 0x4000 = 0100 0000 0000 0000  
  4.  * 0x2000 = 0010 0000 0000 0000  
  5.  * 0x1000 = 0001 0000 0000 0000  
  6.  *    
  7.  * it's very useful when you want to combine some messages   
  8.  * together, and the user can simply determine what exactly   
  9.  * what you want. Refer the implementation of MessageBus.java   
  10.  * for more details.   
  11.  */   
  12. public static final int MESSAGE_TIMEOUT = 0x8000;   
  13. public static final int MESSAGE_CLICKED = 0x4000;   
  14. public static final int MESSAGE_CLOSE = 0x2000;   
  15. public static final int MESSAGE_OPEN = 0x1000;   
  16.   
  17. public static final int MESSAGE_READY = 0x0800;   
  18. public static final int MESSAGE_BUSY = 0x0400;   
  19. public static final int MESSAGE_WAIT = 0x0200;   
  20. public static final int MESSAGE_OKAY = 0x0100;   

總結
BBMS如果進行適當的擴展,可以完全實現JMS規范中涉及到的所有主題,如訂閱模式(BBMS現在的實現中只有PTP模式,及點對點的模式,發送消息和接受消息的實體都必須同時在線)。BBMS主要面向的是輕量級的消息傳遞,比如GUI,分布式的GUI等。如果有興趣,可以到BBMS 的頁面上看一看:http://code.google.com/p/bbms/

來源:http://www.cnblogs.com/abruzzi/archive/2009/07/25/1531068.html,作者:abruzzi

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!