基于NIO的消息路由的實現(一) 前言
一、前言:
已經很久沒有碰編碼了,大概有9年的時間,日新月異的框架和新東西讓我眼花繚亂。之前一直在做web相關的應用。由于項目不大,分布式開發在我編碼的那個年代里沒有做過,后來走上管理崗位才接觸到,僅限于溝通交流和方案的策劃,并沒有真正的做過。如今我有了一點時間和精力,決定自己學習一下,先從簡單的消息通訊開始吧。
好,背景完畢!下面說說我想做的東西,我想做一個基于NIO的消息路由,而并不基于目前已有的各種優秀框架(mina,netty等等),這么做的初衷也許跟我個人的習慣有關,我總是覺得如果不明白原理,即使再好的框架當遭遇問題的時候,我也會無從下手,如果我懂得了原理,再選用其他的框架,也會更得心應手。所以才沒有使用現今那些優秀的框架,或許是我的一點點偏見吧。
我的代碼已經發布在 http://git.oschina.net/java616
目已經完成根據客戶端的標識進行消息的異步轉發,仍會持續的迭代和增加。有興趣的可以下載回去,如果我有做的不好或者不對的地方,敬請指出。
二、一些概念和例程
NIO是啥我就不說了,我們來看一下我理解的NIO工作流程,如圖:
上圖為我所理解的NIO的工作過程,如果存在問題,請批評斧正。概括一下我的理解:
-
SocketChannel:為NIO工作過程中,數據傳輸的通道,客戶端與服務端的每次交互都是通過此通道進行的;
-
Selector(多路復用器):會監控其注冊的通道上面的任何事件,獲得SelectionKey,事件分為OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(這是SelectionKey的四個屬性),OP_ACCEPT應該為服務端接收到客戶端連接時的一種狀態,我在客戶端并沒有用到此狀態;OP_CONNECT則為客戶端已經連接上服務端的一種狀態,我在服務端并沒有使用這個狀態;
-
Buffer:我的應用中,我一直使用ByteBuffer,此類是整個NIO通訊的關鍵,必須理解才能進行通訊的開發,否則可能產生問題;所有的通訊內容都需要在此類中寫入和讀出;
如果想做nio相關的應用,那么一些概念上的東西是不可回避的,在這里推薦:http://www.iteye.com/magazines/132-Java-NIO 。
下面三段代碼,分別完成了服務的創建、服務對事件的監聽以及客戶端對事件的監聽(不可直接拷貝使用,有一些變量沒有聲明,如有興趣,可以去下載我的源碼)。
-
服務的創建
//打開一個serversocket通道,ServerSocketChannel是一個監控是否有新連接進入的通道。 serverSocketChannel = ServerSocketChannel.open(); //將這個serversokect通道設置為非阻塞模式 serverSocketChannel.configureBlocking(false); //綁定serversokect的ip和端口 serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort())); //打開選擇器 selector = Selector.open(); //將此通道注冊給選擇器selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-
服務對事件的監聽
//監聽事件key selector.select(2000); //迭代一組事件key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { //定義一個socket通道 SocketChannel socketChannel = null; int count = 0; SelectionKey key = keys.next(); // Logs.info("有網絡事件被觸發,事件類型為:" + key.interestOps()); //刪除Iterator中的當前key,避免重復處理 keys.remove(); if (!key.isValid()) { continue; } else if (key.isAcceptable()) { //從客戶端送來的key中獲取ServerSocket通道 serverSocketChannel = (ServerSocketChannel) key.channel(); //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到連接才會繼續 socketChannel = serverSocketChannel.accept(); //將此socket通道設置為非阻塞模式 socketChannel.configureBlocking(false); //將此通道注冊到selector,并等待接收客戶端的讀入數據 socketChannel.register(selector, SelectionKey.OP_READ); allocToken(socketChannel); } else if (key.isReadable()) { //獲取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock()); //清理緩沖區,便于使用 byteBuffer.clear(); //將channel中的字節流讀入緩沖區 count = socketChannel.read(byteBuffer); byteBuffer.flip(); //處理粘包 if (count > 0) { try { handlePacket(socketChannel, byteBuffer); } catch (Exception e) { e.printStackTrace(); // continue;//如果當前包存在非法拋出異常,那么不再進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時注釋 } } else if (count == 0) { continue; } else { socketChannel.close(); } } else if (key.isWritable()) { ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ); } }
-
客戶端對事件的監聽
while (true) { try { selector.select(3000); Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); for (int i = 0; keys.hasNext(); i++) { SelectionKey key = keys.next(); keys.remove(); if (key.isConnectable()) { socketChannel = (SocketChannel) key.channel(); if (socketChannel.isConnectionPending()) { if (socketChannel.finishConnect()){ Client.IS_CONNECT =true; logger.info("-------成功連接服務端!-------"); } } socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //獲取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK); //清理緩沖區,便于使用 byteBuffer.clear(); //將channel中的字節流讀入緩沖區 String readStr = ""; int count = socketChannel.read(byteBuffer); //務必要把buffer的position重置為0 byteBuffer.flip(); handlePacket(byteBuffer, count); // socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isWritable()) { socketChannel.register(selector, SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); continue; } }
三、我要做的是個啥?
根據我個人對NIO的理解,我的初步想法是要實現一個這樣的東西,如圖:
但在我的不斷深入開發中,發現上面的圖中很多不成熟的內容,作為一個完整的消息通訊的服務,必須包含如下的內容:
1、對接入連接的管理;
2、對連接身份的確認;
3、對異常關閉連接的回收;
4、根據身份對消息的轉發;
5、鏈路的維持;
6、自動重連;
7、消息的異步處理;
8、消息的響應機制;
9、粘包和斷包的處理;
9、配置體系;
10、通訊層與業務層的分離;
………………
網上很多的NIO實例都是可以運行的,但并不能滿足我的工作需要,以上的那些肯定還有沒有考慮全的東西,隨著我一點點的開發會逐漸的浮出水面。
在未來的文章中,我會逐步把我自己制定的通訊協議,各個模塊的結構,以及代碼貼出來,希望大家能夠互相學習,互相幫助。(待續)