Java并發編程之NIO簡明教程
在傳統的架構中,對于客戶端的每一次請求,服務器都會創建一個新的線程或者利用線程池復用去處理用戶的一個請求,然后返回給用戶結果,這樣做在高并發的情況下會存在非常嚴重的性能問題:對于用戶的每一次請求都創建一個新的線程是需要一定內存的,同時線程之間頻繁的上下文切換也是一個很大的開銷。
p.s: 本文涉及的完整實例代碼都可以在我的 GitHub 上面下載。
什么是Selector
NIO的核心就是Selector,讀懂了Selector就理解了異步機制的實現原理,下面先來簡單的介紹一下什么是Selector。現在對于客戶端的每一次請求到來時我們不再立即創建一個線程進行處理,相反以epool為例子當一個事件準備就緒之后通過回調機制將描述符加入到阻塞隊列中,下面只需要通過遍歷阻塞隊列對相應的事件進行處理就行了,通過這種回調機制整個過程都不需要對于每一個請求都去創建一個線程去單獨處理。上面的解釋還是有些抽象,下面我會通過具體的代碼實例來解釋,在這之前我們先來了解一下NIO中兩個基礎概念Buffer和Channel。
如果大家對于多路IO復用比如select/epool完全沒有陌生的話,建議先讀一下我的這篇Linux下的五種IO模型 :-)
Buffer
以ByteBuffer為例子,我們可以通過ByteBuffer.allocate(n)來分配n個字節的緩沖區,對于緩沖區有四個重要的屬性:
- capacity,緩沖區的容量,也就是我們上面指定的n。
- position,當前指針指向的位置。
- mark,前一個位置,這里我們下面再解釋。
- limit,最大能讀取或者寫入的位置。
如上圖所示,Buffer實際上也是分為兩種,一種用于寫數據,一種用于讀取數據。
put
通過直接閱讀ByteBuffer源碼可以清晰看出put方法是把一個byte變量x放到緩沖區中去,同時position加1:
publicByteBufferput(bytex){
hb[ix(nextPutIndex())] = x;
return this;
}
final int nextPutIndex(){
if (position >= limit)
throw new BufferOverflowException();
return position++;
}
get
get方法是從緩沖區中讀取一個字節,同時position加一:
public byte get(){
return hb[ix(nextGetIndex())];
}
final int nextGetIndex(){
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
flip
如果我們想將buffer從寫數據的情況變成讀數據的情況,可以直接使用flip方法:
public finalBufferflip(){
limit = position;
position = 0;
mark = -1;
return this;
}
mark和reset
mark是記住當前的位置用的,也就是保存position的值:
public finalBuffermark(){
mark = position;
return this;
}
如果我們在對緩沖區讀寫之前就調用了mark方法,那么以后當position位置變化之后,想回到之前的位置可以調用reset會將mark的值重新賦給position:
public finalBufferreset(){
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
Channel
利用NIO,當我們讀取數據的時候,會先從buffer加載到channel,而寫入數據的時候,會先入到channel然后通過channel轉移到buffer中去。channel給我們提供了兩個方法:通過 channel.read(buffer) 可以將channel中的數據寫入到buffer中,而通過 channel.write(buffer) 則可以將buffer中的數據寫入到到channel中。
Channel的話分為四種:
- FileChannel從文件中讀寫數據。
- DatagramChannel以UDP的形式從網絡中讀寫數據。
- SocketChannel以TCP的形式從網絡中讀寫數據。
- ServerSocketChannel允許你監聽TCP連接。
因為今天我們的重點是Selector,所以來看一下SocketChannel的用法。在下面的代碼利用SocketChannel模擬了一個簡單的server-client程序
WebServer 的代碼如下,和傳統的sock程序并沒有太多的差異,只是我們引入了buffer和channel的概念:
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 5000));
SocketChannel socketChannel = ssc.accept();
ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer);
readBuffer.flip();
while (readBuffer.hasRemaining()) {
System.out.println((char)readBuffer.get());
}
socketChannel.close();
ssc.close();
WebClient 的代碼如下:
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 5000));
ByteBuffer writeBuffer = ByteBuffer.allocate(128);
writeBuffer.put("hello world".getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
socketChannel.close();
Scatter / Gather
在上面的client程序中,我們也可以同時將多個buffer中的數據放入到一個數組后然后統一放入到channel后傳遞給服務器:
ByteBuffer buffer1 = ByteBuffer.allocate(128);
ByteBuffer buffer2 = ByteBuffer.allocate(16);
buffer1.put("hello ".getBytes());
buffer2.put("world".getBytes());
buffer1.flip();
buffer2.flip();
ByteBuffer[] bufferArray = {buffer1, buffer2};
socketChannel.write(bufferArray);
Selector
通過使用selector,我們可以通過一個線程來同時管理多個channel,省去了創建線程以及線程之間進行上下文切換的開銷。
創建一個selector
通過調用selector類的靜態方法open我們就可以創建一個selector對象:
Selector selector = Selector.open();
注冊channel
為了保證selector能夠監聽多個channel,我們需要將channel注冊到selector當中:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
我們可以監聽四種事件:
- SelectionKey.OP_CONNECT:當客戶端的嘗試連接到服務器
- SelectionKey.OP_ACCEPT:當服務器接受來自客戶端的請求
- SelectionKey.OP_READ:當服務器可以從channel中讀取數據
- SelectionKey.OP_WRITE:當服務器可以向channel中寫入數據
對SelectorKey調用 channel 方法可以得到key對應的channel:
Channel channel = key.channel();
而key自身感興趣的監聽事件也可以通過 interestOps 來獲得:
int interestSet = selectionKey.interestOps();
對selector調用 selectedKeys() 方法我們可以得到注冊的所有key:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
實戰
服務器的代碼如下:
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 5000));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuff = ByteBuffer.allocate(128);
ByteBuffer writeBuff = ByteBuffer.allocate(128);
writeBuff.put("received".getBytes());
writeBuff.flip(); // make buffer ready for reading
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuff.clear(); // make buffer ready for writing
socketChannel.read(readBuff);
readBuff.flip(); // make buffer ready for reading
System.out.println(new String(readBuff.array()));
key.interestOps(SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
writeBuff.rewind(); // sets the position back to 0
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuff);
key.interestOps(SelectionKey.OP_READ);
}
}
}
客戶端程序的代碼如下,各位讀者可以同時在終端下面多開幾個程序來同時模擬多個請求,而對于多個客戶端的程序我們的服務器始終只用一個線程來處理多個請求。一個很常見的應用場景就是多個用戶同時往服務器上傳文件,對于每一個上傳請求我們不在單獨去創建一個線程去處理,同時利用Executor/Future我們也可以不用阻塞在IO操作中而是立即返回用戶結果。
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 5000));
ByteBuffer writeBuffer = ByteBuffer.allocate(32);
ByteBuffer readBuffer = ByteBuffer.allocate(32);
writeBuffer.put("hello".getBytes());
writeBuffer.flip(); // make buffer ready for reading
while (true) {
writeBuffer.rewind(); // sets the position back to 0
socketChannel.write(writeBuffer); // hello
readBuffer.clear(); // make buffer ready for writing
socketChannel.read(readBuffer); // recieved
}
來自:https://www.ziwenxie.site/2017/08/22/java-nio/