Java NIO基本使用
NIO是Java提供的非阻塞I/O API.
非阻塞的意義在于可以使用一個線程對大量的數據連接進行處理,非常適用于"短數據長連接"的應用場景,例如即時通訊軟件.
在一個阻塞C/S系統中,服務器要為每一個客戶連接開啟一個線程阻塞等待客戶端發送的消息.若使用非阻塞技術,服務器可以使用一個線程對連接進行輪 詢,無須阻塞等待.這大大減少了內存資源的浪費,也避免了服務器在客戶線程中不斷切換帶來的CPU消耗,服務器對CPU的有效使用率大大提高.
其核心概念包括Channel,Selector,SelectionKey,Buffer.
Channel是I/O通道,可以向其注冊Selector,應用成功可以通過select操作獲取當前通道已經準備好的可以無阻塞執行的操作.這由SelectionKey表示.
SelectionKey的常量字段SelectionKey.OP_***分別對應Channel的幾種操作例如connect(),accept(),read(),write().
select操作后得到SelectionKey.OP_WRITE或者READ即可在Channel上面無阻塞調用read和write方 法,Channel的讀寫操作均需要通過Buffer進行.即讀是講數據從通道中讀入Buffer然后做進一步處理.寫需要先將數據寫入Buffer然后 通道接收Buffer.
下面是一個使用NIO的基本C/S示例.該示例只為顯示如何使用基本的API而存在,其代碼的健壯性,合理性都不具參考價值.
這個示例,實現一個簡單的C/S,客戶端想服務器端發送消息,服務器將收到的消息打印到控制臺.現實的應用中需要定義發送數據使用的協議,以幫助服 務器解析消息.本示例只是無差別的使用默認編碼將收到的字節轉換字符并打印.通過改變初始分配的ByteBuffer的容量,可以看到打印消息的變化.容 量越小,對一條消息的處理次數就越多,容量大就可以在更少的循環次數內讀完整個消息.所以真是的應用場景,要考慮適當的緩存大小以提高效率.
首先是Server
package hadix.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
 * User: hAdIx
 * Date: 11-11-2
 * Time: 上午11:26
 */
public class Server {
    private Selector selector;
    private ByteBuffer readBuffer = ByteBuffer.allocate(8);//調整緩存的大小可以看到打印輸出的變化
    private Map<SocketChannel, byte[]> clientMessage = new ConcurrentHashMap<>();
    public void start() throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("localhost", 8001));
        selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (!Thread.currentThread().isInterrupted()) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                }
                keyIterator.remove();
            }
        }
    }
    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();
        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            clientMessage.remove(socketChannel);
            return;
        }
        byte[] bytes = clientMessage.get(socketChannel);
        if (bytes == null) {
            bytes = new byte[0];
        }
        if (numRead > 0) {
            byte[] newBytes = new byte[bytes.length + numRead];
            System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
            System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead);
            clientMessage.put(socketChannel, newBytes);
            System.out.println(new String(newBytes));
        } else {
            String message = new String(bytes);
            System.out.println(message);
        }
    }
    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = ssc.accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("a new client connected");
    }
    public static void main(String[] args) throws IOException {
        System.out.println("server started...");
        new Server().start();
    }
}然后是Clientpackage hadix.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
 * User: hAdIx
 * Date: 11-11-2
 * Time: 上午11:26
 */
public class Client {
    public void start() throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.connect(new InetSocketAddress("localhost", 8001));
        Selector selector = Selector.open();
        sc.register(selector, SelectionKey.OP_CONNECT);
        Scanner scanner = new Scanner(System.in);
        while (true) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            System.out.println("keys=" + keys.size());
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                if (key.isConnectable()) {
                    sc.finishConnect();
                    sc.register(selector, SelectionKey.OP_WRITE);
                    System.out.println("server connected...");
                    break;
                } else if (key.isWritable()) {
                    System.out.println("please input message");
                    String message = scanner.nextLine();
                    ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
                    sc.write(writeBuffer);
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        new Client().start();
    }
}
此外有一個代碼寫得更好的例子,非常值得參考.http://rox-xmlrpc.sourceforge.net/niotut/index.html
這個例子里面的客戶端將消息發送給服務器,服務器收到后立即寫回給客戶端.例子中代碼雖然也沒有做有意義的處理,但是其結構比較合理,值得以此為基礎進行現實應用的擴展開發.
