java NIO詳解
前言
我們在寫java程序的時候,為了進行優化,把全部的精力用在了處理效率上,但是對IO的關注卻很少。這也可能是由以前java早期時JVM在解釋字節碼時速度慢,運行速率大大低于本地編譯代碼,因此以前往往忽視了IO的優化。
但是現在JVM在運行時優化已前進了一大步,現在的java應用程序更多的是受IO的束縛,也就是將時間花在等待數據傳輸上。現在有了NIO,就可以減少IO的等待時間,從而提升IO的效率。
java NIO的思維導圖:
javaNIO
JVM利弊
JVM 是把雙刃劍。它提供了統一的操作環境,讓 Java 程序員不用再為操作系統環境的區別而煩惱。與特定平臺相關的細枝末節大都被隱藏了起來,因而代碼寫得又快又容易。但是隱藏操作系統的技術細節也意味著某些個性鮮明、功能強大的特性被擋在了門外。
怎么辦呢?如果您是程序員,可以使用 Java 本地接口( JNI )編寫本地代碼,直接使用操作系統特性。這樣的話,不同的操作系統的局限性就體現出來了。為了解決這一問題,java.nio 軟件包提供了新的抽象。具體地說,就是 Channel 和 Selector類。它們提供了使用 I/O 服務的通用 API,JDK 1.4 以前的版本是無法使用這些服務的。天下還是沒有免費的午餐:您無法使用每一種操作系統的每一種特性,但是這些新類還是提供了強大的新框架,涵蓋了當今商業操作系統普遍提供的高效 I/O 特性。不僅如此,java.nio.channels.spi還提供了新的服務提供接口(SPI),允許接入新型通道和選擇器,同時又不違反規范的一致性。
隨著 NIO 的面世,Java 已經為嚴肅的商業、娛樂、科研和學術應用做好了準備。在這些領域,高性能 I/O 是必不可少的。
NIO原理
NIO與IO的區別
首先來講一下傳統的IO和NIO的區別,傳統的IO又稱BIO,即阻塞式IO,NIO就是非阻塞IO了。還有一種 AIO 就是異步IO,這里不加闡述了。
Java IO的各種流是阻塞的。這意味著,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據完全寫入。該線程在此期間不能再干任何事情了。 Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,但是它僅能得到目前可用的數據,如果目前沒有數據可用時,就什么都不會獲取。而不是保持線程阻塞,所以直至數據變的可以讀取之前,該線程可以繼續做其他的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。 線程通常將非阻塞IO的空閑時間用于在其它通道上執行IO操作,所以一個單獨的線程現在可以管理多個輸入和輸出通道(channel)。
緩沖區Buffer
一個Buffer對象是固定數量的數據的容器。其作用是一個存儲器,或者分段運輸區,在這里數據可被存儲并在之后用于檢索。盡管緩沖區作用于它們存儲的原始數據類型,但緩沖區十分傾向于處理字節。非字節緩沖區可以在后臺執行從字節或到字節的轉換,這取決于緩沖區是如何創建的。
緩沖區的工作與通道緊密聯系。通道是 I/O 傳輸發生時通過的入口,而緩沖區是這些數據傳輸的來源或目標。對于離開緩沖區的傳輸,您想傳遞出去的數據被置于一個緩沖區,被傳送到通道。對于傳回緩沖區的傳輸,一個通道將數據放置在您所提供的緩沖區中。這種在協同對象(通常是您所寫的對象以及一到多個 Channel 對象)之間進行的緩沖區數據傳遞是高效數據處理的關鍵。
以下是一個新創建的ByteBuffer:
ByteBuffer
位置被設為 0,而且容量和上界被設為 10,剛好經過緩沖區能夠容納的最后一個字節。標記最初未定義。容量是固定的,但另外的三個屬性可以在使用緩沖區時改變。
其中的四個屬性的含義分別如下:
- 容量(Capacity):緩沖區能夠容納的數據元素的最大數量。這一個容量在緩沖區創建時被設定,并且永遠不能改變。
- 上界(Limit):緩沖區的第一個不能被讀或寫的元素。或者說,緩沖區中現存元素的計數。
- 位置(Position):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。
- 標記(Mark):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。
Buffer的常見方法如下所示:
- flip(): 寫模式轉換成讀模式
- rewind():將 position 重置為 0 ,一般用于重復讀。
- clear() :清空 buffer ,準備再次被寫入 (position 變成 0 , limit 變成 capacity) 。
- compact(): 將未讀取的數據拷貝到 buffer 的頭部位。
- mark(): reset():mark 可以標記一個位置, reset 可以重置到該位置。
- Buffer 常見類型: ByteBuffer 、 MappedByteBuffer 、 CharBuffer 、 DoubleBuffer 、 FloatBuffer 、 IntBuffer 、 LongBuffer 、 ShortBuffer 。
通道Channel
通道(Channel)是 java.nio 的第二個主要創新。它們既不是一個擴展也不是一項增強,而是全新、極好的 Java I/O 示例,提供與 I/O 服務的直接連接。Channel 用于在字節緩沖區和位于通道另一側的實體(通常是一個文件或套接字)之間有效地傳輸數據。
通道是一種途徑,借助該途徑,可以用最小的總開銷來訪問操作系統本身的 I/O 服務。緩沖區則是通道內部用來發送和接收數據的端點。通道channel充當連接I/O服務的導管,入下圖所示
channel
通道特性
通道可以是單向或者雙向的。一個 channel 類可能實現定義read( )方法的 ReadableByteChannel 接口,而另一個 channel 類也許實現 WritableByteChannel 接口以提供 write( )方法。實現這兩種接口其中之一的類都是單向的,只能在一個方向上傳輸數據。如果一個類同時實現這兩個接口,那么它是雙向的,可以雙向傳輸數據。
每一個 file 或 socket 通道都實現全部三個接口。從類定義的角度而言,這意味著全部 file 和 socket 通道對象都是雙向的。這對于 sockets 不是問題,因為它們一直都是雙向的,不過對于 files 卻是個問題了。我們知道,一個文件可以在不同的時候以不同的權限打開。從 FileInputStream 對象的getChannel( )方法獲取的 FileChannel 對象是只讀的,不過從接口聲明的角度來看卻是雙向的,因為FileChannel 實現 ByteChannel 接口。在這樣一個通道上調用 write( )方法將拋出未經檢查的NonWritableChannelException 異常,因為 FileInputStream 對象總是以 read-only 的權限打開文件。
通道會連接一個特定 I/O 服務且通道實例(channel instance)的性能受它所連接的 I/O 服務的特征限制,記住這很重要。一個連接到只讀文件的 Channel 實例不能進行寫操作,即使該實例所屬的類可能有 write( )方法。基于此,程序員需要知道通道是如何打開的,避免試圖嘗試一個底層 I/O服務不允許的操作。
通道可以以阻塞(blocking)或非阻塞(nonblocking)模式運行。非阻塞模式的通道永遠不會讓調用的線程休眠。請求的操作要么立即完成,要么返回一個結果表明未進行任何操作。只有面向流的(stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。
選擇器Selector
選擇器提供選擇執行已經就緒的任務的能力,這使得多元I/O成為可能,就緒選擇和多元執行使得單線程能夠有效率的同時管理多個I/O通道(channels),簡單言之就是selector充當一個監視者,您需要將之前創建的一個或多個可選擇的通道注冊到選擇器對象中。一個表示通道和選擇器的鍵將會被返回。選擇鍵會記住您關心的通道。它們也會追蹤對應的通道是否已經就緒當您調用一個選擇器對象的 select( )方法時,相關的鍵會被更新,用來檢查所有被注冊到該選擇器的通道。您可以獲取一個鍵的集合,從而找到當時已經就緒的通道。通過遍歷這些鍵,您可以選擇出每個從上次您調用 select( )開始直到現在,已經就緒的通道。
傳統的socket監控
傳統的監控多個 socket 的 Java 解決方案是為每個 socket 創建一個線程并使得線程可以在 read( )調用中阻塞,直到數據可用。這事實上將每個被阻塞的線程當作了 socket 監控器,并將 Java 虛擬機的線程調度當作了通知機制。這兩者本來都不是為了這種目的而設計的。程序員和 Java 虛擬機都為管理所有這些線程的復雜性和性能損耗付出了代價,這在線程數量的增長時表現得更為突出。
選擇器屬性
- 選擇器(Selector)
選擇器類管理著一個被注冊的通道集合的信息和它們的就緒狀態。通道是和選擇器一起被注冊的,并且使用選擇器來更新通道的就緒狀態。當這么做的時候,可以選擇將被激發的線程掛起,直到有就緒的的通道。 - 可選擇通道(SelectableChannel)
SelectableChannel 可以被注冊到 Selector 對象上,同時可以指定對那個選擇器而言,那種操作是感興趣的。一個通道可以被注冊到多個選擇器上,但對每個選擇器而言只能被注冊一次。 - 選擇鍵(SelectionKey)
選擇鍵封裝了特定的通道與特定的選擇器的注冊關系。選擇鍵對象被SelectableChannel.register( ) 返回并提供一個表示這種注冊關系的標記。選擇鍵包含了兩個比特集(以整數的形式進行編碼),指示了該注冊關系所關心的通道操作,以及通道已經準備好的操作。
下圖體現了就緒選擇注冊和Selector的關系
Selector
一個單獨的通道對象可以被注冊到多個選擇器上。可以調用 isRegistered( )方法來檢查一個通道是否被注冊到任何一個選擇器上。這個方法沒有提供關于通道被注冊到哪個選擇器上的信息,而只能知道它至少被注冊到了一個選擇器上。此外,在一個鍵被取消之后,直到通道被注銷為止,可能有時間上的延遲。這個方法只是一個提示,而不是確切的答案。
鍵對象
鍵對象表示了一種特定的注冊關系。當應該終結這種關系的時候,可以調用 SelectionKey對象的 cancel( )方法。可以通過調用 isValid( )方法來檢查它是否仍然表示一種有效的關系。當鍵被取消時,它將被放在相關的選擇器的已取消的鍵的集合里。注冊不會立即被取消,但鍵會立即失效。當再次調用 select( )方法時(或者一個正在進行的 select()調用結束時),已取消的鍵的集合中的被取消的鍵將被清理掉,并且相應的注銷也將完成。通道會被注銷,而新的SelectionKey 將被返回。
SelectionKey 類定義了四個便于使用的布爾方法來為您測試這些比特值:isReadable( ),isWritable( ),isConnectable( ), 和 isAcceptable( )。每一個方法都與使用特定掩碼來測試 readyOps( )方法的結果的效果相同。例如:
if (key.isWritable( ))
等價于:
if ((key.readyOps( ) & SelectionKey.OP_WRITE) != 0)
這四個方法在任意一個 SelectionKey 對象上都能安全地調用。不能在一個通道上注冊一個它不支持的操作,這種操作也永遠不會出現在 ready 集合中。調用一個不支持的操作將總是返回 false,因為這種操作在該通道上永遠不會準備好。
停止選擇過程
有三種方式可以喚醒在select()方法中睡眠的線程。
- 調用wakeup()
調用 Selector 對象的 wakeup( )方法將使得選擇器上的第一個還沒有返回的選擇操作立即返回。如果當前沒有在進行中的選擇,那么下一次對 select( )方法的一種形式的調用將立即返回。后續的選擇操作將正常進行。在選擇操作之間多次調用 wakeup( )方法與調用它一次沒有什么不同。有時這種延遲的喚醒行為并不是您想要的。您可能只想喚醒一個睡眠中的線程,而使得后續的選擇繼續正常地進行。您可以通過在調用 wakeup( )方法后調用 selectNow( )方法來繞過這個問題。盡管如此,如果您將您的代碼構造為合理地關注于返回值和執行選擇集合,那么即使下一個 select( )方法的調用在沒有通道就緒時就立即返回,也應該不會有什么不同。不管怎么說,您應該為可能發生的事件做好準備。 - 調用 close( )
如果選擇器的 close( )方法被調用,那么任何一個在選擇操作中阻塞的線程都將被喚醒,就像wakeup( )方法被調用了一樣。與選擇器相關的通道將被注銷,而鍵將被取消。 - 調用 interrupt( )
如果睡眠中的線程的 interrupt( )方法被調用,它的返回狀態將被設置。如果被喚醒的線程之后將試圖在通道上執行 I/O 操作,通道將立即關閉,然后線程將捕捉到一個異常。這是由于在第三章中已經探討過的通道的中斷語義。使用 wakeup( )方法將會優雅地將一個在 select( )方法中睡眠的線程喚醒。如果您想讓一個睡眠的線程在直接中斷之后繼續執行,需要執行一些步驟來清理中斷狀態
簡單的NIO服務器
下面是一個簡單的NIO服務器的例子,使用select()來為多個通道提供服務。
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import javax.swing.text.html.HTMLDocument.Iterator;
/**
- Simple echo-back server which listens for incoming stream connections and
- echoes back whatever it reads. A single Selector object is used to listen to
- the server socket (to accept new connections) and all the active socket
- channels.
@author zale (zalezone.cn)
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;
public static void main(String[] argv) throws Exception
{
new SelectSockets().go(argv);
}
public void go(String[] argv) throws Exception
{
int port = PORT_NUMBER;
if (argv.length > 0)
{ // 覆蓋默認的監聽端口
port = Integer.parseInt(argv[0]);
}
System.out.println("Listening on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();// 打開一個未綁定的serversocketchannel
ServerSocket serverSocket = serverChannel.socket();// 得到一個ServerSocket去和它綁定
Selector selector = Selector.open();// 創建一個Selector供下面使用
serverSocket.bind(new InetSocketAddress(port));//設置server channel將會監聽的端口
serverChannel.configureBlocking(false);//設置非阻塞模式
serverChannel.register(selector, SelectionKey.OP_ACCEPT);//將ServerSocketChannel注冊到Selector
while (true)
{
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
if (n == 0)
{
continue; // nothing to do
}
java.util.Iterator<SelectionKey> it = selector.selectedKeys().iterator();// Get an iterator over the set of selected keys
//在被選擇的set中遍歷全部的key
while (it.hasNext())
{
SelectionKey key = (SelectionKey) it.next();
// 判斷是否是一個連接到來
if (key.isAcceptable())
{
ServerSocketChannel server =(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel,SelectionKey.OP_READ);//注冊讀事件
sayHello(channel);//對連接進行處理
}
//判斷這個channel上是否有數據要讀
if (key.isReadable())
{
readDataFromSocket(key);
}
//從selected set中移除這個key,因為它已經被處理過了
it.remove();
}
}
}
// ----------------------------------------------------------
/**
- Register the given channel with the given selector for the given
- operations of interest
*/
protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception
{
if (channel == null)
{
return; // 可能會發生
}
// 設置通道為非阻塞
channel.configureBlocking(false);
// 將通道注冊到選擇器上
channel.register(selector, ops);
}
// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent acccess.
//對所有的通道使用相同的緩沖區。單線程為所有的通道進行服務,所以并發訪問沒有風險
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
/**
- Sample data handler method for a channel with data ready to read.
- 對于一個準備讀入數據的通道的簡單的數據處理方法
@param key
*
A SelectionKey object associated with a channel determined by
the selector to be ready for reading. If the channel returns
an EOF condition, it is closed here, which automatically
invalidates the associated key. The selector will then
de-register the channel on the next select call.
一個選擇器決定了和通道關聯的SelectionKey object是準備讀狀態。如果通道返回EOF,通道將被關閉。
并且會自動使相關的key失效,選擇器然后會在下一次的select call時取消掉通道的注冊
*/
protected void readDataFromSocket(SelectionKey key) throws Exception
{
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // 清空Buffer
// Loop while data is available; channel is nonblocking
//當可以讀到數據時一直循環,通道為非阻塞
while ((count = socketChannel.read(buffer)) > 0)
{
buffer.flip(); // 將緩沖區置為可讀
// Send the data; don't assume it goes all at once
//發送數據,不要期望能一次將數據發送完
while (buffer.hasRemaining())
{
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
//這里的循環是無意義的,具體按實際情況而定
buffer.clear(); // Empty buffer
}
if (count < 0)
{
// Close channel on EOF, invalidates the key
//讀取結束后關閉通道,使key失效
socketChannel.close();
}
}
// ----------------------------------------------------------
/**
- Spew a greeting to the incoming client connection.
*
- @param channel
The newly connected SocketChannel to say hello to./
private void sayHello(SocketChannel channel) throws Exception
{
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}</code></pre>
原理解釋
上面這個例子實現了一個簡單的服務器,它創建了 ServerSocketChannel 和 Selector 對象,并將通道注冊到選擇器上。我們不在注冊的鍵中保存服務器 socket 的引用,因為它永遠不會被注銷。這個無限循環在最上面先調用了 select( ),這可能會無限期地阻塞。當選擇結束時,就遍歷選擇鍵并檢查已經就緒的通道。
如果一個鍵指示與它相關的通道已經準備好執行一個 accecpt( )操作,我們就通過鍵獲取關聯的通道,并將它轉換為 SeverSocketChannel 對象。我們都知道這么做是安全的,因為只有ServerSocketChannel 支持 OP_ACCEPT 操作。我們也知道我們的代碼只把對一個單一的ServerSocketChannel 對象的 OP_ACCEPT 操作進行了注冊。通過對服務器 socket 通道的引用,我 們調用了它 的 accept( )方法 ,來獲取剛到達 的 socket 的句 柄。返回的 對象的類型 是
SocketChannel,也是一個可選擇的通道類型。這時,與創建一個新線程來從新的連接中讀取數據不同,我們只是簡單地將 socket 同多注冊到選擇器上。我們通過傳入 OP_READ 標記,告訴選擇器我們關心新的 socket 通道什么時候可以準備好讀取數據。
如果鍵指示通道還沒有準備好執行 accept( ),我們就檢查它是否準備好執行 read( )。任何一個這么指示的 socket 通道一定是之前 ServerSocketChannel 創建的 SocketChannel 對象之一,并且被注冊為只對讀操作感興趣。對于每個有數據需要讀取的 socket 通道,我們調用一個公共的方法來讀取并處理這個帶有數據的 socket。需要注意的是這個公共方法需要準備好以非阻塞的方式處理 socket 上的不完整的數據。它需要迅速地返回,以其他帶有后續輸入的通道能夠及時地得到處理。例 4-1 中只是簡單地對數據進行響應,將數據寫回 socket,傳回給發送者。
在循環的底部,我們通過調用 Iterator(迭代器)對象的 remove()方法,將鍵從已選擇的鍵的集合中移除。鍵可以直接從 selectKeys()返回的 Set 中移除,但同時需要用 Iterator 來檢查集合,您需要使用迭代器的 remove()方法來避免破壞迭代器內部的狀態。
并發性
選擇器對象是線程安全的,但它們包含的鍵集合不是。通過 keys( )和 selectKeys( )返回的鍵的集合是 Selector 對象內部的私有的 Set 對象集合的直接引用。這些集合可能在任意時間被改變。已注冊的鍵的集合是只讀的。如果您試圖修改它,那么您得到的獎品將是一個java.lang.UnsupportedOperationException,但是當您在觀察它們的時候,它們可能發生了改變的話,您仍然會遇到麻煩。Iterator 對象是快速失敗的(fail-fast):如果底層的 Set 被改變了,它們將會拋出 java.util.ConcurrentModificationException,因此如果您期望在多個線程間共享選擇器和/或鍵,請對此做好準備。您可以直接修改選擇鍵,但請注意您這么做時可能會徹底破壞另一個線程的 Iterator。
如果在多個線程并發地訪問一個選擇器的鍵的集合的時候存在任何問題,您可以采取一些步驟來合理地同步訪問。在執行選擇操作時,選擇器在 Selector 對象上進行同步,然后是已注冊的鍵的集合,最后是已選擇的鍵的集合,按照這樣的順序。已取消的鍵的集合也在選擇過程的的第 1步和第 3 步之間保持同步(當與已取消的鍵的集合相關的通道被注銷時)。
在多線程的場景中,如果您需要對任何一個鍵的集合進行更改,不管是直接更改還是其他操作帶來的副作用,您都需要首先以相同的順序,在同一對象上進行同步。鎖的過程是非常重要的。如果競爭的線程沒有以相同的順序請求鎖,就將會有死鎖的潛在隱患。如果您可以確保否其他線程不會同時訪問選擇器,那么就不必要進行同步了。
Selector 類的 close( )方法與 slect( )方法的同步方式是一樣的,因此也有一直阻塞的可能性。在選擇過程還在進行的過程中,所有對 close( )的調用都會被阻塞,直到選擇過程結束,或者執行選擇的線程進入睡眠。在后面的情況下,執行選擇的線程將會在執行關閉的線程獲得鎖時立即被喚醒,并關閉選擇器。
選擇過程的可擴展性
對于單CPU的系統用一個線程來為多個通道提供服務可能是個好主意,但是對于多個CPU的系統來說就可能不能使其他CPU高效發揮作用。
一個比較好的優化策略是對所有的可選擇通道使用一個選擇器,并將對就緒通道的服務委托給其他線程。根據部署的條件,線程池的大小是可以調整的(或者它自己進行動態的調整)。
另外,有些通道要求比其他通道有更高的響應速度,可以通過使用兩個選擇器來解決:一個為命令連接服務,另一個為普通連接服務。與將所有準備好的通道放到同一個線程池的做法不同,通道可以根據功能由不同的工作線程來處理。它們可能可以是日志線程池,命令/控制線程池,狀態請求線程池,等等。
服務線程池服務器示例
這個例子是上一個簡單服務器的一般性的選擇循環的擴展。它覆寫了 readDataFromSocket( )方法,并使用線程池來為準備好數據用于讀取的通道提供服務。與在主線程中同步地讀取數據不同,這個版本的實現將 SelectionKey 對象傳遞給為其服務的工作線程。
使用線程池來為通道提供服務
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
/**
- Specialization of the SelectSockets class which uses a thread pool to service
- channels. The thread pool is an ad-hoc implementation quicky lashed togther
- in a few hours for demonstration purposes. It's definitely not production
- quality.
*
- @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSocketsThreadPool extends SelectSockets
{
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool(MAX_THREADS);
// -------------------------------------------------------------
public static void main(String[] argv) throws Exception
{
new SelectSocketsThreadPool().go(argv);
}
// -------------------------------------------------------------
/**
- Sample data handler method for a channel with data ready to read. This
- method is invoked from(被調用) the go( ) method in the parent class. This handler
- delegates(委托) to a worker thread in a thread pool to service the channel,
- then returns immediately.
*
- @param key
A SelectionKey object representing a channel determined by the
selector to be ready for reading. If the channel returns an
EOF condition, it is closed here, which automatically
invalidates the associated key. The selector will then
de-register the channel on the next select call./
protected void readDataFromSocket(SelectionKey key) throws Exception
{
WorkerThread worker = pool.getWorker();
if (worker == null)
{
// No threads available. Do nothing. The selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return;
}
// Invoking this wakes up the worker thread, then returns
worker.serviceChannel(key);
}
// ---------------------------------------------------------------
/**
- A very simple thread pool class. The pool size is set at construction
- time and remains fixed. Threads are cycled through a FIFO idle queue.
*/
private class ThreadPool
{
List idle = new LinkedList();
ThreadPool(int poolSize)
{
// Fill up the pool with worker threads
for (int i = 0; i < poolSize; i++)
{
WorkerThread thread = new WorkerThread(this);
// Set thread name for debugging. Start it.
thread.setName("Worker" + (i + 1));
thread.start();
idle.add(thread);
}
}
/**
- Find an idle worker thread, if any. Could return null.
*/
WorkerThread getWorker()
{
WorkerThread worker = null;
synchronized (idle)
{
if (idle.size() > 0)
{
worker = (WorkerThread) idle.remove(0);
}
}
return (worker);
}
/**
- Called by the worker thread to return itself to the idle pool.
*/
void returnWorker(WorkerThread worker)
{
synchronized (idle)
{
idle.add(worker);
}
}
}
/**
- A worker thread class which can drain(排空) channels and echo-back(回顯) the input.
- Each instance is constructed with a reference(參考) to the owning thread pool
- object. When started, the thread loops forever waiting to be awakened to
- service the channel associated with a SelectionKey object. The worker is
- tasked by calling its serviceChannel( ) method with a SelectionKey
- object. The serviceChannel( ) method stores the key reference in the
- thread object then calls notify( ) to wake it up. When the channel has
- been drained, the worker thread returns itself to its parent pool.
*/
private class WorkerThread extends Thread
{
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread(ThreadPool pool)
{
this.pool = pool;
}
// Loop forever waiting for work to do
public synchronized void run()
{
System.out.println(this.getName() + " is ready");
while (true)
{
try
{
// Sleep and release object lock
//休眠并且釋放掉對象鎖
this.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
// Clear interrupt status
this.interrupted();
}
if (key == null)
{
continue; // just in case
}
System.out.println(this.getName() + " has been awakened");
try
{
drainChannel(key);
}
catch (Exception e)
{
System.out.println("Caught '" + e + "' closing channel");
// Close channel and nudge selector
try
{
key.channel().close();
}
catch (IOException ex)
{
ex.printStackTrace();
}
key.selector().wakeup();
}
key = null;
// Done. Ready for more. Return to pool
this.pool.returnWorker(this);
}
}
/**
- Called to initiate a unit of work by this worker thread on the
- provided SelectionKey object. This method is synchronized, as is the
- run( ) method, so only one key can be serviced at a given time.
- Before waking the worker thread, and before returning to the main
- selection loop, this key's interest set is updated to remove OP_READ.
- This will cause the selector to ignore read-readiness for this
- channel while the worker thread is servicing it.
- 通過一個被提供SelectionKey對象的工作線程來初始化一個工作集合,這個方法是同步的,所以
- 里面的run方法只有一個key能被服務在同一個時間,在喚醒工作線程和返回到主循環之前,這個key的
- 感興趣的集合被更新來刪除OP_READ,這將會引起工作線程在提供服務的時候選擇器會忽略讀就緒的通道
*/
synchronized void serviceChannel(SelectionKey key)
{
this.key = key;
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this.notify(); // Awaken the thread
}
/**
- The actual code which drains the channel associated with the given
- key. This method assumes the key has been modified prior to
- invocation to turn off selection interest in OP_READ. When this
- method completes it re-enables OP_READ and calls wakeup( ) on the
- selector so the selector will resume watching this channel.
*/
void drainChannel(SelectionKey key) throws Exception
{
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // 清空buffer
// Loop while data is available; channel is nonblocking
while ((count = channel.read(buffer)) > 0)
{
buffer.flip(); // make buffer readable
// Send the data; may not go all at once
while (buffer.hasRemaining())
{
channel.write(buffer);
}
// WARNING: the above loop is evil.
// See comments in superclass.
buffer.clear(); // Empty buffer
}
if (count < 0)
{
// Close channel on EOF; invalidates the key
channel.close();
return;
}
// Resume interest in OP_READ
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
// Cycle the selector so this key is active again
key.selector().wakeup();
}
}
}</code></pre>
原理解釋
由于執行選擇過程的線程將重新循環并幾乎立即再次調用 select( ),鍵的 interest 集合將被修改,并將 interest(感興趣的操作)從讀取就緒(read-rreadiness)狀態中移除。這將防止選擇器重復地調用 readDataFromSocket( )(因為通道仍然會準備好讀取數據,直到工作線程從它那里讀取數據)。當工作線程結束為通道提供的服務時,它將再次更新鍵的 ready 集合,來將 interest 重新放到讀取就緒集合中。它也會在選擇器上顯式地調用 wakeup( )。如果主線程在 select( )中被阻塞,這將使它繼續執行。這個選擇循環會再次執行一個輪回(可能什么也沒做)并帶著被更新的鍵重新進入select( )。
總結
對于java NIO的常見框架有Mina,Netty等,關于Mina和Netty到底哪個框架比較好,因為還未深入進行研究,
所以也不敢下定論,但個人還是傾向Netty框架吧。下一步準備好好研究一下Netty框架。
來自:http://www.importnew.com/22623.html