基于NIO的消息路由的實現(一) 前言

fg68 9年前發布 | 10K 次閱讀 NIO

一、前言:

已經很久沒有碰編碼了,大概有9年的時間,日新月異的框架和新東西讓我眼花繚亂。之前一直在做web相關的應用。由于項目不大,分布式開發在我編碼的那個年代里沒有做過,后來走上管理崗位才接觸到,僅限于溝通交流和方案的策劃,并沒有真正的做過。如今我有了一點時間和精力,決定自己學習一下,先從簡單的消息通訊開始吧。

好,背景完畢!下面說說我想做的東西,我想做一個基于NIO的消息路由,而并不基于目前已有的各種優秀框架(mina,netty等等),這么做的初衷也許跟我個人的習慣有關,我總是覺得如果不明白原理,即使再好的框架當遭遇問題的時候,我也會無從下手,如果我懂得了原理,再選用其他的框架,也會更得心應手。所以才沒有使用現今那些優秀的框架,或許是我的一點點偏見吧。

我的代碼已經發布在 http://git.oschina.net/java616

目已經完成根據客戶端的標識進行消息的異步轉發,仍會持續的迭代和增加。有興趣的可以下載回去,如果我有做的不好或者不對的地方,敬請指出。

二、一些概念和例程

NIO是啥我就不說了,我們來看一下我理解的NIO工作流程,如圖:

基于NIO的消息路由的實現(一) 前言

上圖為我所理解的NIO的工作過程,如果存在問題,請批評斧正。概括一下我的理解:

  • SocketChannel:為NIO工作過程中,數據傳輸的通道,客戶端與服務端的每次交互都是通過此通道進行的;

  • Selector(多路復用器):會監控其注冊的通道上面的任何事件,獲得SelectionKey,事件分為OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(這是SelectionKey的四個屬性),OP_ACCEPT應該為服務端接收到客戶端連接時的一種狀態,我在客戶端并沒有用到此狀態;OP_CONNECT則為客戶端已經連接上服務端的一種狀態,我在服務端并沒有使用這個狀態;

  • Buffer:我的應用中,我一直使用ByteBuffer,此類是整個NIO通訊的關鍵,必須理解才能進行通訊的開發,否則可能產生問題;所有的通訊內容都需要在此類中寫入和讀出;


如果想做nio相關的應用,那么一些概念上的東西是不可回避的,在這里推薦:http://www.iteye.com/magazines/132-Java-NIO 。

下面三段代碼,分別完成了服務的創建、服務對事件的監聽以及客戶端對事件的監聽(不可直接拷貝使用,有一些變量沒有聲明,如有興趣,可以去下載我的源碼)。

  • 服務的創建

//打開一個serversocket通道,ServerSocketChannel是一個監控是否有新連接進入的通道。
serverSocketChannel = ServerSocketChannel.open();
//將這個serversokect通道設置為非阻塞模式
serverSocketChannel.configureBlocking(false);
//綁定serversokect的ip和端口
serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));
//打開選擇器
selector = Selector.open();
//將此通道注冊給選擇器selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 服務對事件的監聽

                //監聽事件key
                selector.select(2000);
                //迭代一組事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定義一個socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有網絡事件被觸發,事件類型為:" + key.interestOps());
                    //刪除Iterator中的當前key,避免重復處理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //從客戶端送來的key中獲取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到連接才會繼續
                        socketChannel = serverSocketChannel.accept();
                        //將此socket通道設置為非阻塞模式
                        socketChannel.configureBlocking(false);
                        //將此通道注冊到selector,并等待接收客戶端的讀入數據
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //獲取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理緩沖區,便于使用
                        byteBuffer.clear();
                        //將channel中的字節流讀入緩沖區
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //處理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//如果當前包存在非法拋出異常,那么不再進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時注釋
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }

  • 客戶端對事件的監聽

            while (true) {
                try {

                    selector.select(3000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    for (int i = 0; keys.hasNext(); i++) {

                        SelectionKey key = keys.next();
                        keys.remove();
                        if (key.isConnectable()) {
                            socketChannel = (SocketChannel) key.channel();
                            if (socketChannel.isConnectionPending()) {
                                if (socketChannel.finishConnect()){
                                    Client.IS_CONNECT =true;
                                    logger.info("-------成功連接服務端!-------");
                                }

                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            //獲取事件key中的channel
                            socketChannel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);
                            //清理緩沖區,便于使用
                            byteBuffer.clear();
                            //將channel中的字節流讀入緩沖區
                            String readStr = "";
                            int count = socketChannel.read(byteBuffer);
                            //務必要把buffer的position重置為0
                            byteBuffer.flip();

                            handlePacket(byteBuffer, count);
//                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isWritable()) {
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }

                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }

            }

三、我要做的是個啥?

根據我個人對NIO的理解,我的初步想法是要實現一個這樣的東西,如圖:

基于NIO的消息路由的實現(一) 前言

但在我的不斷深入開發中,發現上面的圖中很多不成熟的內容,作為一個完整的消息通訊的服務,必須包含如下的內容:

1、對接入連接的管理;

2、對連接身份的確認;

3、對異常關閉連接的回收;

4、根據身份對消息的轉發;

5、鏈路的維持;

6、自動重連;

7、消息的異步處理;

8、消息的響應機制;

9、粘包和斷包的處理;

9、配置體系;

10、通訊層與業務層的分離;

………………

網上很多的NIO實例都是可以運行的,但并不能滿足我的工作需要,以上的那些肯定還有沒有考慮全的東西,隨著我一點點的開發會逐漸的浮出水面。

在未來的文章中,我會逐步把我自己制定的通訊協議,各個模塊的結構,以及代碼貼出來,希望大家能夠互相學習,互相幫助。(待續)

來自:http://my.oschina.net/u/2397619/blog/493486

 本文由用戶 fg68 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!