Java NIO服務器實例

jopen 10年前發布 | 13K 次閱讀 NIO 網絡工具包

我一直想學習如何用Java寫一個非阻塞IO服務器,但無法從網上找到一個滿足要求的服務器。我找到了這個示例,但仍然沒能解決我的問題。還可以選擇Apache MINA框架。但我的要求相對簡單,MINA對我來說還稍微有點復雜。所以在MINA和一些教程(參見這篇這篇)的幫助下,我自己寫了一個非阻塞IO服務器。

我的代碼可以從這里下載。這只是個示例代碼,如果需要可以隨意修改它。這個示例由一個抽象的非阻塞服務器和一個配對的阻塞客戶端組成。需要創建一個具體的實現來使用它們——可以通過測試用例來查看這個樣例是如何工作的。兩者都被設計為在自己的線程中運行(因此實現了Runnable接口),而且是單線程的——后面會有更多的并發選項。當客戶端僅連接到單一服務器時是阻塞的,并且僅在自己的線程中運行。客服端還需要等待服務器端的返回信息,所以將客戶端設計為非阻塞是沒有意義的。本服務器只處理標準的TCP連接。如果使用的是UDP、SSL或別的協議的話,需要自己添加實現。

在寫個示例的代碼時,我學到了一些東西。除了調用標準的API來打開和管理連接以外,還掌握了selection keys的不同使用方式、消息處理技巧和線程問題,這些都是十分有用的。

打開和管理一個連接的基本方式在網絡上十分常用,而且在下面的示例代碼段中也有出現(只有代碼片段——可以從代碼下載中取得完整版本)。從打開一個 Selector 開始(一種網絡信道多路復用器 multiplexor)。Selector通過selectionkey來表示每一個信道,然后打開一個指定端口的套接字節服務器。將selector、SelectionKey.OP_ACCEPT作為參數在socket服務器上注冊,任何接入連接在selector上都是有效的。下面的代碼一直在循環等待selector的事件。當事件發生時,如果是一個連接請求,套 字節服務器會接受連接并注冊鏈接發出的消息(通過OP_READ 注冊)。如果它是一個信息(key.isreadable()),處理信息的代碼尚未實現。下面的代碼也很脆弱,任何錯誤都會導致服務器停止工作。

Selector selector = null;
ServerSocketChannel server = null;
try { 
    selector = Selector.open(); 
    server = ServerSocketChannel.open(); 
    server.socket().bind(new InetSocketAddress(port)); 
    server.configureBlocking(false); 
    server.register(selector, SelectionKey.OP_ACCEPT); 
    while (true) {
        selector.select();
        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { 
            SelectionKey key = i.next(); 
            i.remove(); 
            if (key.isConnectable()) { 
                ((SocketChannel)key.channel()).finishConnect(); 
            } 
            if (key.isAcceptable()) { 
                // accept connection 
                SocketChannel client = server.accept(); 
                client.configureBlocking(false); 
                client.socket().setTcpNoDelay(true); 
                client.register(selector, SelectionKey.OP_READ);
            } 
            if (key.isReadable()) { 
                // ...read messages...
            } 
        }
    }           
} catch (Throwable e) { 
    throw new RuntimeException("Server failure: "+e.getMessage());
} finally {
    try {
        selector.close();
        server.socket().close();
        server.close();
        stopped();
    } catch (Exception e) {
        // do nothing - server failed
    }
}

值得注意的是,一個selection key不代表一個套接字。相反,他們是selector注冊的信道。因此,一個來自客戶端的連接事件(OP_ACCEPT 事件)將使用與客戶端發送消息(OP_READ事件)不同的key通知。這意味著,來自同一個客戶端不同類型的事件將會用不同的key。不要試圖對這些key進行比較。這樣做的好處是,不同的事件 可以用不同的selector注冊(這樣做的原因是線程的——下面會詳細說明)。

當讀取一條信息時,有很多的情況需要考慮。當讀取連接的結果數據時,這個信息可能是不完整的(剩余的數據要晚些才能獲得),也可能包含不止一條消息。因此, 必須考慮消息結尾是如何表示的。讀取數據時要將數據放入緩沖和然后拆分為有效的信息。標識消息結尾通常有以下幾種方式:

  1. 固定的消息大小。
  2. 將消息的長度作為消息的前綴。
  3. 用一個特殊的符號來標識消息的結束。

我的代碼使用了第二種方式。每種方式都會以2個字節開始,用來存儲消息體的字節數(因此消息長度被限制為65535字節以內)。因為數據也是使用ByteBuffers來讀取的,所以了解一下如何使用它們會很有幫助(可以出這里的API鏈接入手)。下面的代碼會讀取數據并將結果傳給readmessage方法。在readMessage方法中這些數據被拆分成獨立的消息。請注意readbuffer的用法。默認緩沖區應盡可小,但也不要設置過小。這樣會造成消息大小經常大于緩沖區。緩沖區越小,處理的速度就越快。但是,如果接收到的消息大小超過緩沖區,那么必須重新緩沖區設置來處理消息。

private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException { 
    ByteBuffer readBuffer = readBuffers.get(key); 
    if (readBuffer==null) {
        readBuffer = ByteBuffer.allocate(defaultBufferSize); 
        readBuffers.put(key, readBuffer); 
    }
    if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) {
        throw new IOException("Read on closed key");
    }

    readBuffer.flip(); 
    List<ByteBuffer> result = new ArrayList<ByteBuffer>();

    ByteBuffer msg = readMessage(key, readBuffer);
    while (msg!=null) {
        result.add(msg);
        msg = readMessage(key, readBuffer);
    }

    return result;
}

下面的代碼用來將緩存數據轉化為消息。

private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) {
    int bytesToRead; 
    if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message  
        byte[] lengthBytes = new byte[messageLength.byteLength()];
        readBuffer.get(lengthBytes);
        bytesToRead = (int)messageLength.bytesToLength(lengthBytes);
        if ((readBuffer.limit()-readBuffer.position())<bytesToRead) { 
            // Not enough data - prepare for writing again 
            if (readBuffer.limit()==readBuffer.capacity()) {
                // message may be longer than buffer => resize buffer to message size
                int oldCapacity = readBuffer.capacity();
                ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength());
                readBuffer.position(0);
                tmp.put(readBuffer);
                readBuffer = tmp;                   
                readBuffer.position(oldCapacity); 
                readBuffer.limit(readBuffer.capacity()); 
                readBuffers.put(key, readBuffer); 
                return null;
            } else {
                // rest for writing
                readBuffer.position(readBuffer.limit()); 
                readBuffer.limit(readBuffer.capacity()); 
                return null; 
            }
        } 
    } else { 
        // Not enough data - prepare for writing again 
        readBuffer.position(readBuffer.limit()); 
        readBuffer.limit(readBuffer.capacity()); 
        return null; 
    } 
    byte[] resultMessage = new byte[bytesToRead];
    readBuffer.get(resultMessage, 0, bytesToRead); 
    // remove read message from buffer
    int remaining = readBuffer.remaining();
    readBuffer.limit(readBuffer.capacity());
    readBuffer.compact();
    readBuffer.position(0);
    readBuffer.limit(remaining);
    return ByteBuffer.wrap(resultMessage);
}

示例中的代碼是單線程的——所有的連接都是由同一個線程處理。也可以使用多線程。盡管在某一時刻只有一個線程可以工作(也就是說,不可能有2個線程都在在執行讀操作),但是讀寫操作可以由不同的線程通過獨立的key來完成。同樣的,在某一時刻只有一個線程可以使用selector。雖然單線程代碼就能滿足我的需要,但是有很多的方法可以并發處理。下面我分別描述使用線程池數據讀事件、使用單一selector和線程處理OP_ACCEPT事件。

  1. 用一個selector來對應多個客戶端連接。收到accept事件時,會創建一個新的selector并在這個新的selector上注冊讀事件。新創建的selector用來監聽和處理讀事件,這個任務是在線程池中執行的。由于不能確定selector對資源占用的影響,所以不知道這種做法的擴展性如何。
  2. 每個線程都啟用一個selector,在創建執行線程時通過負載均衡的方式分配一個selector。將客戶端分配給對應的selector,每個線程都在自己的selector中處理讀事件,這是MINA的處理方式。這樣處理問題是如何均衡線程的處理(MINA使用了輪叫round-robin調度算法)——如果不小心,結果會導致是有的線程非常繁忙有的線程處于空閑狀態。
  3. 所有的事件都在同一個selector上處理,同步時需要小心處理。當傳遞key給某一個線程準備讀取時,要保證這個key沒有正準備被其他的線程所讀取,直到當前的操作結束。
    在我想到最好的解決方式之前,selector處理的工作會非常繁重。

我會將如何處理并發這個問題留給感興趣的讀者。祝讀者們在編碼過程中一切順利,我的例子可以在這里下載。

2011年12月22日更新:有讀者來信指出來原始的測試用例中有bug,有些測試用例中使用的是將字節轉換為字節流 InputStreamReader。如果使用了非8位的字符集,那么測試用戶將由于消息長度而失敗(發生在轉意消息頭部時),我已更新了示例中的測試用例修正該問題。

原文鏈接: cordinc 翻譯: ImportNew.com - 一直在路上
譯文鏈接: http://www.importnew.com/13602.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!