java aio 編程
Java NIO (JSR 51)定義了Java new I/O API,提案2000年提出,2002年正式發布。 JDK 1.4起包含了相應的API實現。
JAVA NIO2 (JSR 203)定義了更多的 New I/O APIs, 提案2003提出,直到2011年才發布, 最終在JDK 7中才實現。
JSR 203除了提供更多的文件系統操作API(包括可插拔的自定義的文件系統), 還提供了對socket和文件的異步 I/O操作。 同時實現了JSR-51提案中的socket channel全部功能,包括對綁定, option配置的支持以及多播multicast的實現。
當前很多的項目還停留在JAVA NIO的實現上, 對JAVA AIO(asynchronous I/O)著墨不多。 本文整理了一些關于JAVA AIO的介紹,以及netty對AIO的支持。
以下內容只針對socket的I/O操作, 不涉及對文件的處理。
JDK AIO API
首先介紹以下I/O模型。
Unix定義了五種I/O模型, 下圖是五種I/O模型的比較。
- 阻塞I/O
- 非阻塞I/O
- I/O復用(select、poll、linux 2.6種改進的epoll)
- 信號驅動IO(SIGIO)
- 異步I/O(POSIX的aio_系列函數)
</ul>
- 同步I/O: 同步I/O操作導致請求進程阻塞,直至操作完成
- 異步I/O: 異步I/O操作不導致請求阻塞
所以Unix的前四種I/O模型都是同步I/O, 只有最后一種才是異步I/O。
</ul>
- AsynchronousSocketChannel
- Asynchronous connect
- Asynchronous read/write
- Asynchronous scatter/gather (multiple buffers)
- Read/write operations support timeout
- failed method invoked with timeout exception
- Implements NetworkChannel for binding, setting socket options, etc </ul> </li> </ul>
- Asynchronous read/write (connected)
- Asynchronous receive/send (unconnected)
- Implements NetworkChannel for binding, setting socket options, etc.
- Implements MulticastChannel </ul>
- CompletionHandler </ul>
- [Asynchronous I/O Tricks and Tips](http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf
- http://openjdk.java.net/projects/nio/resources/AsynchronousIo.html </ol> </div> 來自:http://colobu.com/2014/11/13/java-aio-introduction/
AsynchronousServerSocketChannel
還實現了Asynchronous acceptAsynchronousDatagramChannel
Java AIO 例子
異步channel API提供了兩種方式監控/控制異步操作(connect,accept, read,write等)。第一種方式是返回java.util.concurrent.Future對象, 檢查Future的狀態可以得到操作是否完成還是失敗,還是進行中, future.get阻塞當前進程。
第二種方式為操作提供一個回調參數java.nio.channels.CompletionHandler,這個回調類包含completed
,failed
兩個方法。
channel的每個I/O操作都為這兩種方式提供了相應的方法, 你可以根據自己的需要選擇合適的方式編程。下面以一個最簡單的Time服務的例子演示如何使用異步I/O。 客戶端連接到服務器后服務器就發送一個當前的時間字符串給客戶端。 客戶端毋須發送請求。 邏輯很簡單。
Server實現
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Server { private static Charset charset = Charset.forName("US-ASCII"); private static CharsetEncoder encoder = charset.newEncoder(); public static void main(String[] args) throws Exception { AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4)); AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("0.0.0.0", 8013)); server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel result, Void attachment) { server.accept(null, this); // 接受下一個連接 try { String now = new Date().toString(); ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n")); //result.write(buffer, null, new CompletionHandler<Integer,Void>(){...}); //callback or Future<Integer> f = result.write(buffer); f.get(); System.out.println("sent to client: " + now); result.close(); } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); } }); group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } }
這個例子使用了兩種方式。
accept
使用了回調的方式, 而發送數據使用了future
的方式。Client實現
public class Client { public static void main(String[] args) throws Exception { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013)); future.get(); ByteBuffer buffer = ByteBuffer.allocate(100); client.read(buffer, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { System.out.println("client received: " + new String(buffer.array())); } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }); Thread.sleep(10000); } }
客戶端也使用了兩種方式,
connect
使用了future方式,而接收數據使用了回調的方式。Netty AIO
Netty也支持AIO并提供了相應的類:
AioEventLoopGroup
,AioCompletionHandler
,AioServerSocketChannel
,AioSocketChannel
,AioSocketChannelConfig
。
其它使用方法和NIO類似。參考
POSIX把I/O操作劃分成兩類:
傳統的Java BIO (blocking I/O)是Unix I/O模型中的第一種。
Java NIO中如果不使用select模式,而只把channel配置成nonblocking則是第二種模型。
Java NIO select實現的是一種多路復用I/O。 底層使用epoll或者相應的poll系統調用, 參看我以前整理的一篇文章: java 和netty epoll實現
第四種模型JDK應該是沒有實現。
Java NIO2增加了對第五種模型的支持,也就是AIO。
OpenJDK在不同平臺上的AIO實現
在不同的操作系統上,AIO由不同的技術實現。
通用實現可以查看這里。
Windows上是使用完成接口(IOCP)實現,可以參看WindowsAsynchronousServerSocketChannelImpl,
其它平臺上使用aio調用UnixAsynchronousServerSocketChannelImpl, UnixAsynchronousSocketChannelImpl, SolarisAsynchronousChannelProvider