Java阻塞IO與非阻塞IO
IO:
IO 是主存和外部設備 ( 硬盤、終端和網絡等 ) 拷貝數據的過程。 IO 是操作系統的底層功能實現,底層通過 I/O 指令進行完成。
阻塞與非阻塞:
一輛從 A 開往 B 的公共汽車上,路上有很多點可能會有人下車。司機不知道哪些點會有哪些人會下車,對于需要下車的人,如何處理更好?
- 司機過程中定時詢問每個乘客是否到達目的地,若有人說到了,那么司機停車,乘客下車。 ( 類似阻塞式 )
- 每個人告訴售票員自己的目的地,然后睡覺,司機只和售票員交互,到了某個點由售票員通知乘客下車。 ( 類似非阻塞 )
很顯然,每個人要到達某個目的地可以認為是一個線程,司機可以認為是 CPU 。在阻塞式里面,每個線程需要不斷的輪詢,上下文切換,以達到找到目的地的結果。而在非阻塞方式里,每個乘客 ( 線程 ) 都在睡覺 ( 休眠 ) ,只在真正外部環境準備好了才喚醒,這樣的喚醒肯定不會阻塞。
阻塞式I/O:(傳統的IO)
以網絡應用為例,在傳統IO方式(阻塞IO)中需要監聽一個ServerSocket,接受請求的連接為其提供服務(服務通常包括了處理請求并發送響應)下圖是服務器的生命周期圖,其中標有粗黑線條的部分表明會發生I/O阻塞。
此方式在遇到多請求時,只能等待前面的請求完成后才能處理新的請求,所以通常在Java中處理阻塞I/O要用到線程(大量的線程)。代碼如下
public class TCPServer {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(10000);
System.out.println("server start...");
while (true) {
Socket s = ss.accept();
new LogicThread(s);//開一個線程來處理請求,這里面調用InputStream.read()讀取請求信息
}
} catch (Exception e) {
e.printStackTrace();
}
}
} 可以分析創建服務器的每個具體步驟。首先創建ServerSocket
ServerSocket server=new ServerSocket(10000);
然后接受新的連接請求
Socket newConnection=server.accept();//阻塞
在LogicThread中處理請求
InputStream in = newConnection.getInputStream();
InputStreamReader reader = new InputStreamReader(in);
BufferedReader buffer = new BufferedReader(reader);
Request request = new Request();
while(!request.isComplete()) {
String line = buffer.readLine();//阻塞
request.addLine(line);
} 生命周期如下圖所示:
傳統IO方式(阻塞I/O)在調用InputStream.read()/buffer.readLine()方法時是阻塞的,它會一直等到數據到來或緩沖區已滿時或超時時才會返回,并且產生了大量String類型垃圾,盡管可以使用StringBuffer優化;同樣,在調用ServerSocket.accept()方法時,也會一直阻塞到有客戶端連接才會返回,每個客戶端連接過來后,服務端都會啟動一個線程去處理該客戶端的請求。并且多線程處理多個連接。每個線程擁有自己的棧空間并且占用一些 CPU 時間。每個線程遇到外部未準備好的時候,都會阻塞掉。阻塞的結果就是會帶來大量的進程上下文切換。且大部分進程上下文切換可能是無意義的。比如假設一個線程監聽某一個端口,一天只會有幾次請求進來,但是該 cpu 不得不為該線程不斷做上下文切換嘗試,大部分的切換以阻塞告終。
非阻塞式I/O(NIO):也可以說成“New I/O”
核心類:
1.Buffer 為所有的原始類型提供 (Buffer) 緩存支持。
2.Charset 字符集編碼解碼解決方案
3.Channel 一個新的原始 I/O 抽象,用于讀寫Buffer類型,通道可以認為是一種連接,可以是到特定設備,程序或者是網絡的連接。通道的類等級結構圖如下
圖中ReadableByteChannel和WritableByteChannel分別用于讀寫。
GatheringByteChannel可以從使用一次將多個Buffer中的數據寫入通道,相反的,ScatteringByteChannel則可以一次將數據從通道讀入多個Buffer中。你還可以設置通道使其為阻塞或非阻塞I/O操作服務。
為了使通道能夠同傳統I/O類相容,Channel類提供了靜態方法創建Stream或Reader
4.Selector
在過去的阻塞I/O中,我們一般知道什么時候可以向stream中讀或寫,因為方法調用直到stream準備好時返回。但是使用非阻塞通道,我們需要一些方法來知道什么時候通道準備好了。在NIO包中,設計Selector就是為了這個目的。SelectableChannel可以注冊特定的事件,而不是在事件發生時通知應用,通道跟蹤事件。然后,當應用調用Selector上的任意一個selection方法時,它查看注冊了的通道看是否有任何感興趣的事件發生。
下面是java NIO的工作原理:
- 由一個專門的線程來處理所有的 IO 事件,并負責分發。
- 事件驅動機制:事件到的時候觸發,而不是同步的去監視事件。
- 線程通訊:線程之間通過 wait,notify 等方式通訊。保證每次上下文切換都是有意義的。減少無謂的線程切換。
如下圖所示:
(注:每個線程的處理流程大概都是讀取數據、解碼、計算處理、編碼、發送響應。)
Java NIO的服務端只需啟動一個專門的線程來處理所有的 IO 事件。java NIO采用了雙向通道(channel)進行數據傳輸,而不是單向的流(stream),在通道上可以注冊我們感興趣的事件。一共有以下四種事件:
服務端接收客戶端連接事件 SelectionKey.OP_ACCEPT(16)
客戶端連接服務端事件 SelectionKey.OP_CONNECT(8)
讀事件 SelectionKey.OP_READ(1)
寫事件 SelectionKey.OP_WRITE(4)
服務端和客戶端各自維護一個管理通道的對象,我們稱之為selector,該對象能檢測一個或多個通道 (channel) 上的事件。我們以服務端為例,如果服務端的selector上注冊了讀事件,某時刻客戶端給服務端發送了一些數據,阻塞I/O這時會調用read()方法阻塞地讀取數據,而NIO的服務端會在selector中添加一個讀事件。服務端的處理線程會輪詢地訪問selector,如果訪問selector時發現有感興趣的事件到達,則處理這些事件,如果沒有感興趣的事件到達,則處理線程會一直阻塞直到感興趣的事件到達為止。
完整的非阻塞IO實例
/**
* NIO服務端
*/
public class NIOServer {
//通道管理器
private Selector selector;
/**
* 獲得一個ServerSocket通道,并對該通道做一些初始化的工作
* @param port 綁定的端口號
* @throws IOException
*/
public void initServer(int port) throws IOException {
// 獲得一個ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設置通道為非阻塞
serverChannel.configureBlocking(false);
// 將該通道對應的ServerSocket綁定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 獲得一個通道管理器
this.selector = Selector.open();
//將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后,
//當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
System.out.println("服務端啟動成功!");
// 輪詢訪問selector
while (true) {
//當注冊的事件到達時,方法返回;否則,該方法會一直阻塞
selector.select();
// 獲得selector中選中的項的迭代器,選中的項為注冊的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 刪除已選的key,以防重復處理
ite.remove();
// 客戶端請求連接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 獲得和客戶端連接的通道
SocketChannel channel = server.accept();
// 設置成非阻塞
channel.configureBlocking(false);
//在這里可以給客戶端發送信息哦
channel.write(ByteBuffer.wrap(new String("向客戶端發送了一條信息").getBytes()));
//在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權限。
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
/**
* 處理讀取客戶端發來的信息 的事件
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException{
// 服務器可讀取消息:得到事件發生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 創建讀取的緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服務端收到信息:"+msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);// 將消息回送給客戶端
}
/**
* 啟動服務端測試
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
server.listen();
}
}
/**
* NIO客戶端
*/
public class NIOClient {
//通道管理器
private Selector selector;
/**
* 獲得一個Socket通道,并對該通道做一些初始化的工作
* @param ip 連接的服務器的ip
* @param port 連接的服務器的端口號
* @throws IOException
*/
public void initClient(String ip,int port) throws IOException {
// 獲得一個Socket通道
SocketChannel channel = SocketChannel.open();
// 設置通道為非阻塞
channel.configureBlocking(false);
// 獲得一個通道管理器
this.selector = Selector.open();
// 客戶端連接服務器,其實方法執行并沒有實現連接,需要在listen()方法中調
//用channel.finishConnect();才能完成連接
channel.connect(new InetSocketAddress(ip,port));
//將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
// 輪詢訪問selector
while (true) {
selector.select();
// 獲得selector中選中的項的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 刪除已選的key,以防重復處理
ite.remove();
// 連接事件發生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在連接,則完成連接
if(channel.isConnectionPending()){
channel.finishConnect();
}
// 設置成非阻塞
channel.configureBlocking(false);
//在這里可以給服務端發送信息哦
channel.write(ByteBuffer.wrap(new String("向服務端發送了一條信息").getBytes()));
//在和服務端連接成功之后,為了可以接收到服務端的信息,需要給通道設置讀的權限。
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
/**
* 處理讀取服務端發來的信息 的事件
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException{
// 服務器可讀取消息:得到事件發生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 創建讀取的緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("客戶端收到信息:"+msg);
// ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
// channel.write(outBuffer);// 將消息回送給服務器端
}
/**
* 啟動客戶端測試
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOClient client = new NIOClient();
client.initClient("localhost",8000);
client.listen();
}
} 參考;
http://www.iteye.com/topic/834447
http://weixiaolu.iteye.com/blog/1479656
http://www.360doc.com/content/12/0604/15/9579107_215842144.shtml