Java NIO 使用實例
在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的輸入輸出系統(New Input/Out,NIO),非阻塞是Java NIO實現的重要功能之一 。
1、Buffer
緩沖區,傳輸數據使用,本質是一個數組,Channel中讀數據和寫數據都只能通過Buffer傳輸。
2、Channel
通道,所有的IO流在NIO中都是從Channel開始的,數據可以從Channel讀到Buffer中,也可以從Buffer寫到Channel中,是Buffer對象的唯一接口。
3、Selector
選擇器,它能檢測一個或多個通道 (channel) 上的事件,并將事件分發出去。
使用一個 select 線程就能監聽多個通道上的事件,并基于事件驅動觸發相應的響應。而不需要為每個 channel 去分配一個線程。
4、SelectionKey
包含了事件的狀態信息和時間對應的通道的綁定。
使用NIO的步驟:
1\創建一個Selector實例
2\將該實例注冊到各種通道,指定每個通道上感興趣的IO操作
3\重復執行,選擇器循環:
31\調用一種Select()方法
32\獲取已選鍵值
33\對于已選中鍵集中的每一個鍵:
331\將已選鍵從鍵集中移除
332\獲取信道,并從鍵中獲取附件
333\確定準備就緒的操作并執行;對于accept操作獲得的SocketChannel對象,需將信道設為非阻塞模式,并將其注冊到選擇器中
334\根據需要,修改鍵的興趣操作集
如下代碼:
<span style="font-size: small;"><span>NIOServer.java</span></span> package org.hadoopinternal.nio; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { private static final int TIMEOUT = 300; private static final int PORT = 12112; public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.configureBlocking(false); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while(true) { if(selector.select(TIMEOUT)==0) { System.out.print("."); continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while( iter.hasNext() ) { SelectionKey key = iter.next(); iter.remove(); //Server socket channel has pending connection request? if( key.isAcceptable() ) { SocketChannel channel=listenChannel.accept(); channel.configureBlocking(false); SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ ); NIOServerConnection conn=new NIOServerConnection(connkey); connkey.attach(conn); } if( key.isReadable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleRead(); } if( key.isValid() && key.isWritable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleWrite(); } } } } catch (Exception e) { e.printStackTrace(); } } } <span style="font-size: small;"><span>NIOServerConnection.java: </span></span> package org.hadoopinternal.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class NIOServerConnection { private static final int BUFFSIZE = 1024; SelectionKey key; SocketChannel channel; ByteBuffer buffer; public NIOServerConnection(SelectionKey key) { this.key=key; this.channel=(SocketChannel) key.channel(); buffer=ByteBuffer.allocate(BUFFSIZE); } public void handleRead() throws IOException { long bytesRead=channel.read(buffer); if(bytesRead==-1) { channel.close(); } else { key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE ); } } public void handleWrite() throws IOException { buffer.flip(); channel.write(buffer); if(!buffer.hasRemaining()) { key.interestOps( SelectionKey.OP_READ ); } buffer.compact(); } }
如下代碼為hadoop IPC Server 中的Listener是一個標準的NIO應用:
/** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- //-tion (for idle connections) ran private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } private class Reader implements Runnable { private volatile boolean adding = false; private Selector readSelector = null; Reader(Selector readSelector) { this.readSelector = readSelector; } public void run() { LOG.info("Starting SocketReader"); synchronized (this) { while (running) { SelectionKey key = null; try { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } } /** * This gets reader into the state that waits for the new channel * to be registered with readSelector. If it was waiting in select() * the thread will be woken up, otherwise whenever select() is called * it will return even if there is nothing to read and wait * in while(adding) for finishAdd call */ public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); } public synchronized void finishAdd() { adding = false; this.notify(); } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time * for which the connection was idle. If 'force' is true then all * connections will be looked at for the cleanup. */ private void cleanupConnections(boolean force) { if (force || numConnections > thresholdIdleConnections) { long currentTime = System.currentTimeMillis(); if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { return; } int start = 0; int end = numConnections - 1; if (!force) { start = rand.nextInt() % numConnections; end = rand.nextInt() % numConnections; int temp; if (end < start) { temp = start; start = end; end = temp; } } int i = start; int numNuked = 0; while (i <= end) { Connection c; synchronized (connectionList) { try { c = connectionList.get(i); } catch (Exception e) {return;} } if (c.timedOut(currentTime)) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); numNuked++; end--; c = null; if (!force && numNuked == maxConnectionsToNuke) break; } else i++; } lastCleanupRunTime = System.currentTimeMillis(); } } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } } private void closeCurrentConnection(SelectionKey key, Throwable e) { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); c = null; } } } InetSocketAddress getAddress() { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } } void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } } synchronized void doStop() { if (selector != null) { selector.wakeup(); Thread.yield(); } if (acceptChannel != null) { try { acceptChannel.socket().close(); } catch (IOException e) { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } readPool.shutdown(); } // The method that will return the next reader to work with // Simplistic implementation of round robin for now Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } }