基于NIO的消息路由的實現(一) 前言
一、前言:
已經很久沒有碰編碼了,大概有9年的時間,日新月異的框架和新東西讓我眼花繚亂。之前一直在做web相關的應用。由于項目不大,分布式開發在我編碼的那個年代里沒有做過,后來走上管理崗位才接觸到,僅限于溝通交流和方案的策劃,并沒有真正的做過。如今我有了一點時間和精力,決定自己學習一下,先從簡單的消息通訊開始吧。
好,背景完畢!下面說說我想做的東西,我想做一個基于NIO的消息路由,而并不基于目前已有的各種優秀框架(mina,netty等等),這么做的初衷也許跟我個人的習慣有關,我總是覺得如果不明白原理,即使再好的框架當遭遇問題的時候,我也會無從下手,如果我懂得了原理,再選用其他的框架,也會更得心應手。所以才沒有使用現今那些優秀的框架,或許是我的一點點偏見吧。
我的代碼已經發布在 http://git.oschina.net/java616
目已經完成根據客戶端的標識進行消息的異步轉發,仍會持續的迭代和增加。有興趣的可以下載回去,如果我有做的不好或者不對的地方,敬請指出。
二、一些概念和例程
NIO是啥我就不說了,我們來看一下我理解的NIO工作流程,如圖:
上圖為我所理解的NIO的工作過程,如果存在問題,請批評斧正。概括一下我的理解:
-
SocketChannel:為NIO工作過程中,數據傳輸的通道,客戶端與服務端的每次交互都是通過此通道進行的;
-
Selector(多路復用器):會監控其注冊的通道上面的任何事件,獲得SelectionKey,事件分為OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(這是SelectionKey的四個屬性),OP_ACCEPT應該為服務端接收到客戶端連接時的一種狀態,我在客戶端并沒有用到此狀態;OP_CONNECT則為客戶端已經連接上服務端的一種狀態,我在服務端并沒有使用這個狀態;
-
Buffer:我的應用中,我一直使用ByteBuffer,此類是整個NIO通訊的關鍵,必須理解才能進行通訊的開發,否則可能產生問題;所有的通訊內容都需要在此類中寫入和讀出;
如果想做nio相關的應用,那么一些概念上的東西是不可回避的,在這里推薦:http://www.iteye.com/magazines/132-Java-NIO 。
下面三段代碼,分別完成了服務的創建、服務對事件的監聽以及客戶端對事件的監聽(不可直接拷貝使用,有一些變量沒有聲明,如有興趣,可以去下載我的源碼)。
-
服務的創建
//打開一個serversocket通道,ServerSocketChannel是一個監控是否有新連接進入的通道。 serverSocketChannel = ServerSocketChannel.open(); //將這個serversokect通道設置為非阻塞模式 serverSocketChannel.configureBlocking(false); //綁定serversokect的ip和端口 serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort())); //打開選擇器 selector = Selector.open(); //將此通道注冊給選擇器selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-
服務對事件的監聽
//監聽事件key
selector.select(2000);
//迭代一組事件key
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
//定義一個socket通道
SocketChannel socketChannel = null;
int count = 0;
SelectionKey key = keys.next();
// Logs.info("有網絡事件被觸發,事件類型為:" + key.interestOps());
//刪除Iterator中的當前key,避免重復處理
keys.remove();
if (!key.isValid()) {
continue;
} else if (key.isAcceptable()) {
//從客戶端送來的key中獲取ServerSocket通道
serverSocketChannel = (ServerSocketChannel) key.channel();
//接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到連接才會繼續
socketChannel = serverSocketChannel.accept();
//將此socket通道設置為非阻塞模式
socketChannel.configureBlocking(false);
//將此通道注冊到selector,并等待接收客戶端的讀入數據
socketChannel.register(selector, SelectionKey.OP_READ);
allocToken(socketChannel);
} else if (key.isReadable()) {
//獲取事件key中的channel
socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
//清理緩沖區,便于使用
byteBuffer.clear();
//將channel中的字節流讀入緩沖區
count = socketChannel.read(byteBuffer);
byteBuffer.flip();
//處理粘包
if (count > 0) {
try {
handlePacket(socketChannel, byteBuffer);
} catch (Exception e) {
e.printStackTrace();
// continue;//如果當前包存在非法拋出異常,那么不再進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時注釋
}
} else if (count == 0) {
continue;
} else {
socketChannel.close();
}
} else if (key.isWritable()) {
((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
}
}
-
客戶端對事件的監聽
while (true) {
try {
selector.select(3000);
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
for (int i = 0; keys.hasNext(); i++) {
SelectionKey key = keys.next();
keys.remove();
if (key.isConnectable()) {
socketChannel = (SocketChannel) key.channel();
if (socketChannel.isConnectionPending()) {
if (socketChannel.finishConnect()){
Client.IS_CONNECT =true;
logger.info("-------成功連接服務端!-------");
}
}
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
//獲取事件key中的channel
socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);
//清理緩沖區,便于使用
byteBuffer.clear();
//將channel中的字節流讀入緩沖區
String readStr = "";
int count = socketChannel.read(byteBuffer);
//務必要把buffer的position重置為0
byteBuffer.flip();
handlePacket(byteBuffer, count);
// socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isWritable()) {
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
} catch (IOException e) {
e.printStackTrace();
continue;
}
} 三、我要做的是個啥?
根據我個人對NIO的理解,我的初步想法是要實現一個這樣的東西,如圖:
但在我的不斷深入開發中,發現上面的圖中很多不成熟的內容,作為一個完整的消息通訊的服務,必須包含如下的內容:
1、對接入連接的管理;
2、對連接身份的確認;
3、對異常關閉連接的回收;
4、根據身份對消息的轉發;
5、鏈路的維持;
6、自動重連;
7、消息的異步處理;
8、消息的響應機制;
9、粘包和斷包的處理;
9、配置體系;
10、通訊層與業務層的分離;
………………
網上很多的NIO實例都是可以運行的,但并不能滿足我的工作需要,以上的那些肯定還有沒有考慮全的東西,隨著我一點點的開發會逐漸的浮出水面。
在未來的文章中,我會逐步把我自己制定的通訊協議,各個模塊的結構,以及代碼貼出來,希望大家能夠互相學習,互相幫助。(待續)