Java NIO之【Scalable IO in Java】
看完了并發網的NIO教程,是否有種意猶未盡的感覺。正常情況下,答案應該是肯定的。那我們下面來看下Doug Lea大神寫的 Scalable IO in Java ,直接可以下載英文版pdf。這邊就當邊學習邊翻譯了。
網絡服務
大部分網路服務有著相同的體系:
- 讀取請求(Read request)
- 對請求進行解碼(Decode request)
- 處理業務邏輯(Process service)
- 對返回值進行編碼(Encode reply)
- 發送返回值(Send reply)
下面我們來看下傳統的設計模型:
其中,每一個handler有可能都要新起一個線程去執行。用偽代碼模擬如下:
public classServce{
publicstaticvoidmain(String[] args)throwsIOException{
ServerSocket ss = new ServerSocket(1234);
while (!Thread.interrupted()) {
new Thread(new Handler(ss.accept())).start();
}
}
static classHandlerimplementsRunnable{
final Socket socket;
Handler(Socket socket) {
this.socket = socket;
}
@Override
publicvoidrun(){
try {
byte[] input = new byte[1024];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException e) {
e.printStackTrace();
}
}
private byte[] process(byte[] cmd) {
return null;
}
}
}
從偽代碼可以看出傳統I/O模型的雛形,需要為每一個接收到的socket連接新建一個線程去執行具體的業務邏輯。
可擴展性的目標
首先,肯定是不滿意上面傳統的I/O設計模型,才有接下來的討論。無休止地新建線程去執行具體業務邏輯,最終無疑會拖垮整個系統。當然,也很容易想到,可以用線程池,但是這樣雖然可以限制線程數量,但是并發數也因此被限制了,所以并不是解決之道。那我們就來看下可擴展性I/O的目標是什么:
- 高負載情況下的優雅降級
- 硬件的升級能持續地給系統帶來性能提升
- 當然也包含可用性和性能的目標:低延遲、高負載等
分治法(Divide and Conquer)
分治法一般是解決可擴展性的最好的途徑。將處理流程分成一些小的任務,每一個任務都包含一個非阻塞操作。當任務準備好的時候去執行它。這里,一個I/O事件通常被作為觸發器。比如下面:
說實話,上面這一話配上這張圖,不是很能理解。被分成的小任務是整個handler,還是比如說read這樣一個操作。感覺是把handler拆成一個個小任務,再往下學吧,應該會越來越清晰。
java.nio提供如下基本的機制:
- 非阻塞的讀和寫
- 與感興趣的I/O事件相關聯的任務分配機制
事件驅動設計
一系列事件驅動設計使得無限可能。這種方式通常比其他方案更有效,原因如下:
- 占用資源少:不需要為每個客戶端開啟一個新線程
- 開銷少:減少上下文切換的開銷,減少鎖的使用
但是,通常也更難編碼,原因如下:
- 必須拆分成許多小的非阻塞單元,但是無法消除所有的阻塞動作,比如說GC、頁錯誤等
- 必須持續追蹤服務的邏輯狀態
Reactor模式
Reactor模式有如下幾個特征:
- Reactor通過調度相應的處理程序來相應I/O事件
- 處理程序執行非阻塞操作
- 通過綁定處理程序來管理事件。
我們先來看下單個線程版本的模型圖:
java.nio中的Channel、Buffer、Selector、SelectionKey類可以支持該模型。上圖如果第一眼不能很好地理解的話,先來看下代碼,涉及到兩個類。
public classReactorimplementsRunnable{
final Selector selector;
final ServerSocketChannel serverSocketChannel;
Reactor(int port) throws IOException {
// 初始化ServerSocketChannel,以非阻塞模式運行
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 初始化Selector
selector = Selector.open();
// 將ServerSocketChannel注冊到Selector上
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 在SelectionKey上綁定一個附屬對象Acceptor
selectionKey.attach(new Acceptor());
}
@Override
publicvoidrun(){
try {
while (!Thread.interrupted()) {
// 阻塞直至事件就緒
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
// 分發
dispatch((SelectionKey)(it.next()));
}
// 需要自己清除selectedKeys
selected.clear();
}
}catch (IOException ex) {
}
}
voiddispatch(SelectionKey k){
/**
* 獲取SelectionKey中的attachment,并執行該attachment的run()方法
* 拿第一個到達的socket連接來看,該attachment為一個Acceptor實例
*/
Runnable r = (Runnable)(k.attachment());
if (r != null) {
r.run();
}
}
classAcceptorimplementsRunnable{
publicvoidrun(){
try {
// 獲取新連接的SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// 具體的處理邏輯
new Handler(selector, socketChannel);
}
} catch (IOException ex) {
}
}
}
}
public classHandlerimplementsRunnable{
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
socket.configureBlocking(false);
// 繼續在Selector上注冊讀事件,此時attachment為當前Handler實例
sk = socket.register(sel, SelectionKey.OP_READ, this);
// 使選擇器上的第一個還沒有返回的選擇操作立即返回
sel.wakeup();
}
booleaninputIsComplete(){
return true;
}
booleanoutputIsComplete(){
return true;
}
voidprocess(){
System.out.println("Handle processing...");
}
@Override
publicvoidrun(){
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
voidread()throwsIOException{
System.out.println("Handle reading...");
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 在SelectionKey上注冊寫事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
voidsend()throwsIOException{
System.out.println("Handle writing...");
socket.write(output);
if (outputIsComplete()) {
//write完就結束了, 關閉select key
sk.cancel();
}
}
}
結合模型圖和代碼,直觀的感受是單個線程可以同時處理多個客戶端請求了。下面列舉下Reactor模式的一些概念:
- Reactor:負責響應I/O事件,當檢測到一個新的事件,將其發送給相應的Handler去處理
- Handler:負責處理非阻塞的行為,同時將handler與事件綁定
Reactor為單個線程,需要處理accept連接,同時發送請求到處理器中。由于只有單個線程,所以handler中的業務需要能夠快速處理完。當然,還能再改進,可以將具體的業務邏輯放到單獨的線程池中去跑,這兒就不實習了。同時,NIO暫時也就看到這里,主要是了解下相關知識,為下面學習Netty做個準備。
來自:http://bboyjing.github.io/2017/04/05/Java-NIO之【Scalable-IO-in-Java】/