Java NIO服務器實例
我一直想學習如何用Java寫一個非阻塞IO服務器,但無法從網上找到一個滿足要求的服務器。我找到了這個示例,但仍然沒能解決我的問題。還可以選擇Apache MINA框架。但我的要求相對簡單,MINA對我來說還稍微有點復雜。所以在MINA和一些教程(參見這篇和這篇)的幫助下,我自己寫了一個非阻塞IO服務器。
我的代碼可以從這里下載。這只是個示例代碼,如果需要可以隨意修改它。這個示例由一個抽象的非阻塞服務器和一個配對的阻塞客戶端組成。需要創建一個具體的實現來使用它們——可以通過測試用例來查看這個樣例是如何工作的。兩者都被設計為在自己的線程中運行(因此實現了Runnable接口),而且是單線程的——后面會有更多的并發選項。當客戶端僅連接到單一服務器時是阻塞的,并且僅在自己的線程中運行。客服端還需要等待服務器端的返回信息,所以將客戶端設計為非阻塞是沒有意義的。本服務器只處理標準的TCP連接。如果使用的是UDP、SSL或別的協議的話,需要自己添加實現。
在寫個示例的代碼時,我學到了一些東西。除了調用標準的API來打開和管理連接以外,還掌握了selection keys的不同使用方式、消息處理技巧和線程問題,這些都是十分有用的。
打開和管理一個連接的基本方式在網絡上十分常用,而且在下面的示例代碼段中也有出現(只有代碼片段——可以從代碼下載中取得完整版本)。從打開一個 Selector 開始(一種網絡信道多路復用器 multiplexor)。Selector通過selectionkey來表示每一個信道,然后打開一個指定端口的套接字節服務器。將selector、SelectionKey.OP_ACCEPT作為參數在socket服務器上注冊,任何接入連接在selector上都是有效的。下面的代碼一直在循環等待selector的事件。當事件發生時,如果是一個連接請求,套 字節服務器會接受連接并注冊鏈接發出的消息(通過OP_READ 注冊)。如果它是一個信息(key.isreadable()),處理信息的代碼尚未實現。下面的代碼也很脆弱,任何錯誤都會導致服務器停止工作。
Selector selector = null; ServerSocketChannel server = null; try { selector = Selector.open(); server = ServerSocketChannel.open(); server.socket().bind(new InetSocketAddress(port)); server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); if (key.isConnectable()) { ((SocketChannel)key.channel()).finishConnect(); } if (key.isAcceptable()) { // accept connection SocketChannel client = server.accept(); client.configureBlocking(false); client.socket().setTcpNoDelay(true); client.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // ...read messages... } } } } catch (Throwable e) { throw new RuntimeException("Server failure: "+e.getMessage()); } finally { try { selector.close(); server.socket().close(); server.close(); stopped(); } catch (Exception e) { // do nothing - server failed } }
值得注意的是,一個selection key不代表一個套接字。相反,他們是selector注冊的信道。因此,一個來自客戶端的連接事件(OP_ACCEPT 事件)將使用與客戶端發送消息(OP_READ事件)不同的key通知。這意味著,來自同一個客戶端不同類型的事件將會用不同的key。不要試圖對這些key進行比較。這樣做的好處是,不同的事件 可以用不同的selector注冊(這樣做的原因是線程的——下面會詳細說明)。
當讀取一條信息時,有很多的情況需要考慮。當讀取連接的結果數據時,這個信息可能是不完整的(剩余的數據要晚些才能獲得),也可能包含不止一條消息。因此, 必須考慮消息結尾是如何表示的。讀取數據時要將數據放入緩沖和然后拆分為有效的信息。標識消息結尾通常有以下幾種方式:
- 固定的消息大小。
- 將消息的長度作為消息的前綴。
- 用一個特殊的符號來標識消息的結束。
我的代碼使用了第二種方式。每種方式都會以2個字節開始,用來存儲消息體的字節數(因此消息長度被限制為65535字節以內)。因為數據也是使用ByteBuffers來讀取的,所以了解一下如何使用它們會很有幫助(可以出這里的API鏈接入手)。下面的代碼會讀取數據并將結果傳給readmessage方法。在readMessage方法中這些數據被拆分成獨立的消息。請注意readbuffer的用法。默認緩沖區應盡可小,但也不要設置過小。這樣會造成消息大小經常大于緩沖區。緩沖區越小,處理的速度就越快。但是,如果接收到的消息大小超過緩沖區,那么必須重新緩沖區設置來處理消息。
private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException { ByteBuffer readBuffer = readBuffers.get(key); if (readBuffer==null) { readBuffer = ByteBuffer.allocate(defaultBufferSize); readBuffers.put(key, readBuffer); } if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) { throw new IOException("Read on closed key"); } readBuffer.flip(); List<ByteBuffer> result = new ArrayList<ByteBuffer>(); ByteBuffer msg = readMessage(key, readBuffer); while (msg!=null) { result.add(msg); msg = readMessage(key, readBuffer); } return result; }
下面的代碼用來將緩存數據轉化為消息。
private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) { int bytesToRead; if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message byte[] lengthBytes = new byte[messageLength.byteLength()]; readBuffer.get(lengthBytes); bytesToRead = (int)messageLength.bytesToLength(lengthBytes); if ((readBuffer.limit()-readBuffer.position())<bytesToRead) { // Not enough data - prepare for writing again if (readBuffer.limit()==readBuffer.capacity()) { // message may be longer than buffer => resize buffer to message size int oldCapacity = readBuffer.capacity(); ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength()); readBuffer.position(0); tmp.put(readBuffer); readBuffer = tmp; readBuffer.position(oldCapacity); readBuffer.limit(readBuffer.capacity()); readBuffers.put(key, readBuffer); return null; } else { // rest for writing readBuffer.position(readBuffer.limit()); readBuffer.limit(readBuffer.capacity()); return null; } } } else { // Not enough data - prepare for writing again readBuffer.position(readBuffer.limit()); readBuffer.limit(readBuffer.capacity()); return null; } byte[] resultMessage = new byte[bytesToRead]; readBuffer.get(resultMessage, 0, bytesToRead); // remove read message from buffer int remaining = readBuffer.remaining(); readBuffer.limit(readBuffer.capacity()); readBuffer.compact(); readBuffer.position(0); readBuffer.limit(remaining); return ByteBuffer.wrap(resultMessage); }
示例中的代碼是單線程的——所有的連接都是由同一個線程處理。也可以使用多線程。盡管在某一時刻只有一個線程可以工作(也就是說,不可能有2個線程都在在執行讀操作),但是讀寫操作可以由不同的線程通過獨立的key來完成。同樣的,在某一時刻只有一個線程可以使用selector。雖然單線程代碼就能滿足我的需要,但是有很多的方法可以并發處理。下面我分別描述使用線程池數據讀事件、使用單一selector和線程處理OP_ACCEPT事件。
- 用一個selector來對應多個客戶端連接。收到accept事件時,會創建一個新的selector并在這個新的selector上注冊讀事件。新創建的selector用來監聽和處理讀事件,這個任務是在線程池中執行的。由于不能確定selector對資源占用的影響,所以不知道這種做法的擴展性如何。
- 每個線程都啟用一個selector,在創建執行線程時通過負載均衡的方式分配一個selector。將客戶端分配給對應的selector,每個線程都在自己的selector中處理讀事件,這是MINA的處理方式。這樣處理問題是如何均衡線程的處理(MINA使用了輪叫round-robin調度算法)——如果不小心,結果會導致是有的線程非常繁忙有的線程處于空閑狀態。
- 所有的事件都在同一個selector上處理,同步時需要小心處理。當傳遞key給某一個線程準備讀取時,要保證這個key沒有正準備被其他的線程所讀取,直到當前的操作結束。
在我想到最好的解決方式之前,selector處理的工作會非常繁重。
我會將如何處理并發這個問題留給感興趣的讀者。祝讀者們在編碼過程中一切順利,我的例子可以在這里下載。
2011年12月22日更新:有讀者來信指出來原始的測試用例中有bug,有些測試用例中使用的是將字節轉換為字節流 InputStreamReader。如果使用了非8位的字符集,那么測試用戶將由于消息長度而失敗(發生在轉意消息頭部時),我已更新了示例中的測試用例修正該問題。
原文鏈接: cordinc 翻譯: ImportNew.com - 一直在路上
譯文鏈接: http://www.importnew.com/13602.html