Java NIO服務器實例
我一直想學習如何用Java寫一個非阻塞IO服務器,但無法從網上找到一個滿足要求的服務器。我找到了這個示例,但仍然沒能解決我的問題。還可以選擇Apache MINA框架。但我的要求相對簡單,MINA對我來說還稍微有點復雜。所以在MINA和一些教程(參見這篇和這篇)的幫助下,我自己寫了一個非阻塞IO服務器。
我的代碼可以從這里下載。這只是個示例代碼,如果需要可以隨意修改它。這個示例由一個抽象的非阻塞服務器和一個配對的阻塞客戶端組成。需要創建一個具體的實現來使用它們——可以通過測試用例來查看這個樣例是如何工作的。兩者都被設計為在自己的線程中運行(因此實現了Runnable接口),而且是單線程的——后面會有更多的并發選項。當客戶端僅連接到單一服務器時是阻塞的,并且僅在自己的線程中運行。客服端還需要等待服務器端的返回信息,所以將客戶端設計為非阻塞是沒有意義的。本服務器只處理標準的TCP連接。如果使用的是UDP、SSL或別的協議的話,需要自己添加實現。
在寫個示例的代碼時,我學到了一些東西。除了調用標準的API來打開和管理連接以外,還掌握了selection keys的不同使用方式、消息處理技巧和線程問題,這些都是十分有用的。
打開和管理一個連接的基本方式在網絡上十分常用,而且在下面的示例代碼段中也有出現(只有代碼片段——可以從代碼下載中取得完整版本)。從打開一個 Selector 開始(一種網絡信道多路復用器 multiplexor)。Selector通過selectionkey來表示每一個信道,然后打開一個指定端口的套接字節服務器。將selector、SelectionKey.OP_ACCEPT作為參數在socket服務器上注冊,任何接入連接在selector上都是有效的。下面的代碼一直在循環等待selector的事件。當事件發生時,如果是一個連接請求,套 字節服務器會接受連接并注冊鏈接發出的消息(通過OP_READ 注冊)。如果它是一個信息(key.isreadable()),處理信息的代碼尚未實現。下面的代碼也很脆弱,任何錯誤都會導致服務器停止工作。
Selector selector = null;
ServerSocketChannel server = null;
try {
selector = Selector.open();
server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
if (key.isConnectable()) {
((SocketChannel)key.channel()).finishConnect();
}
if (key.isAcceptable()) {
// accept connection
SocketChannel client = server.accept();
client.configureBlocking(false);
client.socket().setTcpNoDelay(true);
client.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// ...read messages...
}
}
}
} catch (Throwable e) {
throw new RuntimeException("Server failure: "+e.getMessage());
} finally {
try {
selector.close();
server.socket().close();
server.close();
stopped();
} catch (Exception e) {
// do nothing - server failed
}
} 值得注意的是,一個selection key不代表一個套接字。相反,他們是selector注冊的信道。因此,一個來自客戶端的連接事件(OP_ACCEPT 事件)將使用與客戶端發送消息(OP_READ事件)不同的key通知。這意味著,來自同一個客戶端不同類型的事件將會用不同的key。不要試圖對這些key進行比較。這樣做的好處是,不同的事件 可以用不同的selector注冊(這樣做的原因是線程的——下面會詳細說明)。
當讀取一條信息時,有很多的情況需要考慮。當讀取連接的結果數據時,這個信息可能是不完整的(剩余的數據要晚些才能獲得),也可能包含不止一條消息。因此, 必須考慮消息結尾是如何表示的。讀取數據時要將數據放入緩沖和然后拆分為有效的信息。標識消息結尾通常有以下幾種方式:
- 固定的消息大小。
- 將消息的長度作為消息的前綴。
- 用一個特殊的符號來標識消息的結束。
我的代碼使用了第二種方式。每種方式都會以2個字節開始,用來存儲消息體的字節數(因此消息長度被限制為65535字節以內)。因為數據也是使用ByteBuffers來讀取的,所以了解一下如何使用它們會很有幫助(可以出這里的API鏈接入手)。下面的代碼會讀取數據并將結果傳給readmessage方法。在readMessage方法中這些數據被拆分成獨立的消息。請注意readbuffer的用法。默認緩沖區應盡可小,但也不要設置過小。這樣會造成消息大小經常大于緩沖區。緩沖區越小,處理的速度就越快。但是,如果接收到的消息大小超過緩沖區,那么必須重新緩沖區設置來處理消息。
private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException {
ByteBuffer readBuffer = readBuffers.get(key);
if (readBuffer==null) {
readBuffer = ByteBuffer.allocate(defaultBufferSize);
readBuffers.put(key, readBuffer);
}
if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) {
throw new IOException("Read on closed key");
}
readBuffer.flip();
List<ByteBuffer> result = new ArrayList<ByteBuffer>();
ByteBuffer msg = readMessage(key, readBuffer);
while (msg!=null) {
result.add(msg);
msg = readMessage(key, readBuffer);
}
return result;
} 下面的代碼用來將緩存數據轉化為消息。
private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) {
int bytesToRead;
if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message
byte[] lengthBytes = new byte[messageLength.byteLength()];
readBuffer.get(lengthBytes);
bytesToRead = (int)messageLength.bytesToLength(lengthBytes);
if ((readBuffer.limit()-readBuffer.position())<bytesToRead) {
// Not enough data - prepare for writing again
if (readBuffer.limit()==readBuffer.capacity()) {
// message may be longer than buffer => resize buffer to message size
int oldCapacity = readBuffer.capacity();
ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength());
readBuffer.position(0);
tmp.put(readBuffer);
readBuffer = tmp;
readBuffer.position(oldCapacity);
readBuffer.limit(readBuffer.capacity());
readBuffers.put(key, readBuffer);
return null;
} else {
// rest for writing
readBuffer.position(readBuffer.limit());
readBuffer.limit(readBuffer.capacity());
return null;
}
}
} else {
// Not enough data - prepare for writing again
readBuffer.position(readBuffer.limit());
readBuffer.limit(readBuffer.capacity());
return null;
}
byte[] resultMessage = new byte[bytesToRead];
readBuffer.get(resultMessage, 0, bytesToRead);
// remove read message from buffer
int remaining = readBuffer.remaining();
readBuffer.limit(readBuffer.capacity());
readBuffer.compact();
readBuffer.position(0);
readBuffer.limit(remaining);
return ByteBuffer.wrap(resultMessage);
} 示例中的代碼是單線程的——所有的連接都是由同一個線程處理。也可以使用多線程。盡管在某一時刻只有一個線程可以工作(也就是說,不可能有2個線程都在在執行讀操作),但是讀寫操作可以由不同的線程通過獨立的key來完成。同樣的,在某一時刻只有一個線程可以使用selector。雖然單線程代碼就能滿足我的需要,但是有很多的方法可以并發處理。下面我分別描述使用線程池數據讀事件、使用單一selector和線程處理OP_ACCEPT事件。
- 用一個selector來對應多個客戶端連接。收到accept事件時,會創建一個新的selector并在這個新的selector上注冊讀事件。新創建的selector用來監聽和處理讀事件,這個任務是在線程池中執行的。由于不能確定selector對資源占用的影響,所以不知道這種做法的擴展性如何。
- 每個線程都啟用一個selector,在創建執行線程時通過負載均衡的方式分配一個selector。將客戶端分配給對應的selector,每個線程都在自己的selector中處理讀事件,這是MINA的處理方式。這樣處理問題是如何均衡線程的處理(MINA使用了輪叫round-robin調度算法)——如果不小心,結果會導致是有的線程非常繁忙有的線程處于空閑狀態。
- 所有的事件都在同一個selector上處理,同步時需要小心處理。當傳遞key給某一個線程準備讀取時,要保證這個key沒有正準備被其他的線程所讀取,直到當前的操作結束。
在我想到最好的解決方式之前,selector處理的工作會非常繁重。
我會將如何處理并發這個問題留給感興趣的讀者。祝讀者們在編碼過程中一切順利,我的例子可以在這里下載。
2011年12月22日更新:有讀者來信指出來原始的測試用例中有bug,有些測試用例中使用的是將字節轉換為字節流 InputStreamReader。如果使用了非8位的字符集,那么測試用戶將由于消息長度而失敗(發生在轉意消息頭部時),我已更新了示例中的測試用例修正該問題。
原文鏈接: cordinc 翻譯: ImportNew.com - 一直在路上
譯文鏈接: http://www.importnew.com/13602.html