基于總線模式的消息服務

rocgege 14年前發布 | 5K 次閱讀

前言

一直以來,都對異步事件很感興趣,比如一個應用在運行一個耗時的過程時,最好的方式是提交這個耗時的過程給一個專門的工作線程,然后立即返回到主線程上,進行其他的任務,而工作線程完成耗時任務后,異步的通知主線程,這個過程本身是很有意思的。傳統的事件-監聽器模型可以較好的解決這個問題,不過事件監聽器兩者的耦合往往略顯緊密,所以需要另一種實現,使得這兩者的耦合盡量小,那樣模塊可以比較通用。

總線模式

前幾天跟同事討論了下Swing中的消息機制,同事給我講了下總線模式的消息機制,感覺很有意思,于是周末就自己實現了下。具體的思路是這樣的:

  • 系統中存在一個消息服務(Message Service),即總線
  • 監聽器對象,通過實現一個可被通知的對象的接口,將自己注冊在消息服務上
  • 可被通知的對象可以向消息總線上post消息,就這個對象而言,它對其他注冊在總線上的對象是一無所知的
  • 消息服務進行消息的調度和轉發,將消息(事件)發送給指定的對象,從而傳遞這個異步事件

這個思路最大的好處是,事件被抽象成消息(Message),具有統一的格式,便于傳遞。掛在總線上的監聽器互相不知道對方的存在,監聽器可以指定自己感興趣的消息類型,消息可以是廣播的形式,也可以是點對點的。(后來參看了下JMS,其中有pub/sub的模式(即訂閱模式),不過,對于異步消息的傳遞來說,這個可以不必實現)

消息服務

消息服務可以將一大堆分布在不同物理機上的應用整合起來,進行通信,可以將一些小的應用整合為一個大的,可用的應用系統

用一個例子來說吧:

public class Test{
    public static void main(String[] args) throws RemoteException{
        /*
         * 創建一個可被通知的對象(監聽器), 這個監聽器關注這樣幾個事件
         * TIMEOUT, CLOSE, and READY
         */
        Configuration config = new RMIServerConfiguration(null, 0);
        CommonNotifiableEntry entry1 = 
            new CommonNotifiableEntry(config, "client1", 
                MessageTypes.MESSAGE_TIMEOUT | 
                MessageTypes.MESSAGE_CLOSE | 
                MessageTypes.MESSAGE_READY);

        /*
         * 創建另一個監聽器, 這個監聽器關注這樣幾個事件
         * OPEN, CLOSE, and TIMEOUT.
         */
        CommonNotifiableEntry entry2 = 
            new CommonNotifiableEntry(config, "client2", 
                MessageTypes.MESSAGE_OPEN | 
                MessageTypes.MESSAGE_CLOSE | 
                MessageTypes.MESSAGE_TIMEOUT);

        // 將監聽器掛在BUS上
        entry1.register();
        entry2.register();

        // 創建一個新的消息, MESSAGE_OPEN類型.
        Message msg = new CommonMessage(
                entry1.getId(),
                entry2.getId(),
                MessageTypes.MESSAGE_OPEN,
                "busying now");

        // 傳遞給entry2
        entry1.post(msg);

        // 創建一個MESSAGE_CLICKED類型的消息, entry2
        // 不關注這個類型的消息,所以此消息不會被傳遞
        Message msgCannotBeReceived = new CommonMessage(
                entry1.getId(),
                entry2.getId(),
                MessageTypes.MESSAGE_CLICKED,
                "cliked evnet");
        entry1.post(msgCannotBeReceived);

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // re use the message object to send another message entry
        msg.setSource(entry2.getId());
        msg.setTarget(entry1.getId());
        msg.setType(MessageTypes.MESSAGE_READY);
        msg.setBody("okay now");
        entry2.post(msg);

        // 卸載這些監聽器,當程序退出,或者
        // 或者監聽器不在關注事件發生的時候
        entry1.unregister();
        entry2.unregister();
    }
}


 

當前,這個系統可以支持遠程的消息傳遞(通過java的RMI機制),不過對于尋址方面還沒有做進一步的處理,有時間再來完善吧。

消息服務的實現

下面我把消息服務的主要實現部分貼出來分析一下:

/**
 * 
 * @author Abruzzi
 *
 */
public class MessageBus extends UnicastRemoteObject implements Bus{
    private static MessageBus instance;
    private List<NotifiableEntry> listeners;
    private List<Message> messages;
    private Thread daemonThread = null;

    public static MessageBus getInstance() throws RemoteException{
        if(instance == null){
            instance = new MessageBus();
        }
        return instance;
    }

    private MessageBus() throws RemoteException{
        listeners = new LinkedList<NotifiableEntry>();
        messages = new LinkedList<Message>();
        Daemon daemon = new Daemon();
        daemonThread = new Thread(daemon);
        daemonThread.setPriority(Thread.NORM_PRIORITY + 3);
        daemonThread.setDaemon(true);
        daemonThread.start();

        while(!daemonThread.isAlive());
    }

    /**
     * mount notifiable object to listener list
     */
    public void mount(NotifiableEntry entry) throws RemoteException{
        synchronized(listeners){
            listeners.add(entry);
            listeners.notifyAll();
        }
    }

    /**
     * unmount the special notifiable object from listener
     */
    public void unmount(NotifiableEntry entry) throws RemoteException{
        synchronized(listeners){
            listeners.remove(entry);
            listeners.notifyAll();
        }
    }

    /**
     * post a new message into the bus
     * @param message
     */
    public void post(Message message) throws RemoteException{
        synchronized(messages){
            messages.add(message);
            messages.notifyAll();
        }
    }

    /**
     * 
     * @author Abruzzi
     * worker thread, dispatch message to appropriate listener
     *
     */
    private class Daemon implements Runnable{
        private boolean loop = true;
        public void run(){
            while(loop){
                if(messages.size() == 0){
                    synchronized(messages){
                        try {messages.wait();} 
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                processIncomingMessage();
            }
        }
    }

    /**
     * process the incoming message, remove the first message from
     * queue, and then check all listeners to see whether should 
     * deliver the message to or not.
     */
    private void processIncomingMessage(){
        Message msg;
        synchronized(messages){
            msg = messages.remove(0);
        }
        String target = null;
        int type = 0;
        int mask = 0;
        try {
            target = msg.getTarget();
            type = msg.getType();
            if(target == MessageTypes.SENDTOALL){
                for(NotifiableEntry entry : listeners){
                    mask = entry.getSense();
                    if((mask & type) == type){entry.update(msg);}
                }
            }else{
                for(NotifiableEntry entry : listeners){
                    mask = entry.getSense();
                    if(entry.getId().equals(target) && (mask & type) == type){
                        entry.update(msg);
                    }
                }
            }
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }

}


消息總線是一個RMI對象,其中mount(), unmout(), post()等方法可以被遠程調用。MessageBus維護兩個列表,一個消息列表,一個監聽器列表。當消息被post到總線上后,post會立即返回,然后工作線程啟動,取出消息并將其分發到合適的監聽器上。

可能,對同步的處理上考慮不夠周全,下來再繼續修改。

P.S.我將這個項目托管在google code上了,叫BBMS(Bus Based Message Service),感興趣的可以去看看:http://code.google.com/p/bbms/

 

 本文由用戶 rocgege 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!