Java NIO Socket通信
一 套接字通道
1. 阻塞式套接字通道
與Socket和ServerSocket對應,NIO提供了SocketChannel和ServerSocketChannel對應,這兩種通道同時支持一般的阻塞模式和更高效的非阻塞模式。
客戶端通過SocketChannel.open()方法打開一個Socket通道,如果此時提供了SocketAddress參數,則會自動開始連接,否則需要主動調用connect()方法連接,創建連接后,可以像一般的Channel一樣的用Buffer進行讀寫,這都是阻塞模式的。
服務器端通過ServerSocketChannel.open()創建,并使用bind()方法綁定到一個監聽地址上,最后調用accept()方法阻塞等待客戶端連接。當客戶端連接后會返回一個SocketChannel以實現與客戶端的讀寫交互。
總的來說,阻塞模式即是net包I/O的翻版,只是采用Channel和Buffer實現而已。
2.多路復用套接字通道(Selector實現的非阻塞式IO)
套接字通道多路復用的思想是創建一個Selector,將多個通道對它進行注冊,當套接字有關注的事件發生時,可以選出這個通道進行操作。
服務器端的代碼如下,相關說明就帶在注釋里了:
// 創建一個選擇器,可用close()關閉,isOpen()表示是否處于打開狀態,他不隸屬于當前線程 Selector selector = Selector.open(); // 創建ServerSocketChannel,并把它綁定到指定端口上 ServerSocketChannel server = ServerSocketChannel.open(); server.bind(new InetSocketAddress("127.0.0.1", 7777)); // 設置為非阻塞模式, 這個非常重要 server.configureBlocking(false); // 在選擇器里面注冊關注這個服務器套接字通道的accept事件 // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel server.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 測試等待事件發生,分為直接返回的selectNow()和阻塞等待的select(),另外也可加一個參數表示阻塞超時 // 停止阻塞的方法有兩種: 中斷線程和selector.wakeup(),有事件發生時,會自動的wakeup() // 方法返回為select出的事件數(參見后面的注釋有說明這個值為什么可能為0). // 另外務必注意一個問題是,當selector被select()阻塞時,其他的線程調用同一個selector的register也會被阻塞到select返回為止 // select操作會把發生關注事件的Key加入到selectionKeys中(只管加不管減) if (selector.select() == 0) { // continue; } // 獲取發生了關注時間的Key集合,每個SelectionKey對應了注冊的一個通道 Set<SelectionKey> keys = selector.selectedKeys(); // 多說一句selector.keys()返回所有的SelectionKey(包括沒有發生事件的) for (SelectionKey key : keys) { // OP_ACCEPT 這個只有ServerSocketChannel才有可能觸發 if (key.isAcceptable()) { // 得到與客戶端的套接字通道 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); // 同樣設置為非阻塞模式 channel.configureBlocking(false); // 同樣將于客戶端的通道在selector上注冊,OP_READ對應可讀事件(對方有寫入數據),可以通過key獲取關聯的選擇器 channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } // OP_READ 有數據可讀 if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // 得到附件,就是上面SocketChannel進行register的時候的第三個參數,可為隨意Object ByteBuffer buffer = (ByteBuffer) key.attachment(); // 讀數據 這里就簡單寫一下,實際上應該還是循環讀取到讀不出來為止的 channel.read(buffer); // 改變自身關注事件,可以用位或操作|組合時間 key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } // OP_WRITE 可寫狀態 這個狀態通常總是觸發的,所以只在需要寫操作時才進行關注 if (key.isWritable()) { // 寫數據掠過,可以自建buffer,也可用附件對象(看情況),注意buffer寫入后需要flip // ...... // 寫完就吧寫狀態關注去掉,否則會一直觸發寫事件 key.interestOps(SelectionKey.OP_READ); } // 由于select操作只管對selectedKeys進行添加,所以key處理后我們需要從里面把key去掉 keys.remove(key); } }這里需要著重說明一下select操作做了什么(根據現象推的,具體好像沒有找到這個的文檔說明),他每次檢查keys里面每個Key對應的通道的狀態,如果有關注狀態時,就決定返回,這時會同時將Key對象加入到selectedKeys中,并返回selectedKeys本次變化的對象數(原本就在selectedKeys中的對象是不計的),由于一個Key對應一個通道(可能同時處于多個狀態,所以注意上面的if語句我都沒有寫else),所以select返回0也是有可能的。另外OP_WRITE和OP_CONNET這兩個狀態是不能長期關注的,只在有需要的時候監聽,處理完必須馬上去掉。如果沒有發現有任何關注狀態,select會一直阻塞到有狀態變化或者超時什么的。
SelectionKey的其他幾個方法,attach(Object)為key設置附件,并返回之前的附件;interestOps()和readyOps()返回關注狀態和當前狀態;cancel()為取消注冊;isValid()表示key是否有效(在key取消注冊,通道關閉,選擇器關閉這三個事情發生之前,key均為有效的,但不包括對方關閉通道,所以讀寫應注意異常)。
還有一個狀態上面沒有使用,OP_CONNECT這個主要是用于客戶端,對應的key的方法是isConnectable()表示已經創建好了連接。
非阻塞實現的客戶端如下:
Selector selector = Selector.open(); // 創建一個套接字通道,注意這里必須使用無參形式 SocketChannel channel = SocketChannel.open(); // 設置為非阻塞模式,這個方法必須在實際連接之前調用(所以open的時候不能提供服務器地址,否則會自動連接) channel.configureBlocking(false); // 連接服務器,由于是非阻塞模式,這個方法會發起連接請求,并直接返回false(阻塞模式是一直等到鏈接成功并返回是否成功) channel.connect(new InetSocketAddress("127.0.0.1", 7777)); // 注冊關聯鏈接狀態 channel.register(selector, SelectionKey.OP_CONNECT); while (true) { // 前略 和服務器端的類似 // ... // 獲取發生了關注時間的Key集合,每個SelectionKey對應了注冊的一個通道 Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { // OP_CONNECT 兩種情況,鏈接成功或失敗這個方法都會返回true if (key.isConnectable()) { // 由于非阻塞模式,connect只管發起連接請求,finishConnect()方法會阻塞到鏈接結束并返回是否成功 // 另外還有一個isConnectionPending()返回的是是否處于正在連接狀態(還在三次握手中) if (channel.finishConnect()) { // 鏈接成功了可以做一些自己的處理,略 // ... // 處理完后必須吧OP_CONNECT關注去掉,改為關注OP_READ key.interestOps(SelectionKey.OP_READ); } } // 后略 和服務器端的類似 // ... } }
雖然例子是這樣的,不過服務器和客戶端可以自己單方面選擇是否采用非阻塞模式,用阻塞模式的客戶端連接非阻塞模式的服務器端是OK的。
二 NIO2的異步IO通道
以下API是由Java7提供。老版本無法使用。
異步IO通道的實現有兩種實現方式,一是在阻塞模式的原方法(主要指的是read和write,具體可以查看API文檔)上傳于一個CompletionHandler實例以實現回調,另外也可以令其返回一個Future實例(Java5新增同步工具包java.util.concurrent中的API),然后再適當的時候通過其get方法來獲取返回的結果。異步文件I/O通道為AsynchronousFileChannel,而異步套接字通道為AsynchronousServerSocketChannel,分別對應其各自的原始通道。
異步I/O需要與一個AsynchronousChannelGroup對象關聯,他實質上就是一個用于I/O的線程池。AsynchronousChannelGroup對象可以通過其自身靜態方法的withThreadPool(),withCachedThreadPool(),withFixedThreadPool()提供一個線程池來創建(線程池也是Java5新增同步工具包java.util.concurrent中的API)。在異步通道創建open()時,可將這個對象傳入進行關聯。如果沒有提供這個對象的話,就默認使用系統分組。但是需要注意的是系統分組的線程池是個守護線程池,JVM是可能在沒有讀寫完成前正常結束的。AsynchronousChannelGroup在使用完后需要shutdowm(),這方面和線程池的關閉是類似的。