使用Java NIO編寫高性能的服務器

jopen 11年前發布 | 25K 次閱讀 Java NIO Java開發

從JDK 1.4開始,Java的標準庫中就包含了NIO,即所謂的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,當然包括了Socket。NonBlocking的IO就是對select(Unix平臺下)以及 WaitForMultipleObjects(Windows平臺)的封裝,提供了高性能、易伸縮的服務架構。

Server:

/**
 * 
 */
package nio.file;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;

/**
 * @author ztz
 *
 */
public class NIOServer {

    static int BLOCK=4096;
    //處理與客戶端的交互
    public class HandleClient{
        protected FileChannel channel;
        protected ByteBuffer buffer;
        public HandleClient() throws IOException{
            this.channel = new FileInputStream(filename).getChannel();
            this.buffer = ByteBuffer.allocate(BLOCK);
        }
        public ByteBuffer readBlock(){
            try {
                buffer.clear();
                int count = channel.read(buffer);
                buffer.flip();
                if(count<=0)
                    return null;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return buffer;
        }
        public void close(){
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    protected Selector selector;
    protected String filename = "XXX";
    protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
    protected CharsetDecoder decoder;
    public NIOServer(int port) throws IOException{
        selector = this.getSelector(port);
        Charset charset = Charset.forName("GB2312");
        decoder = charset.newDecoder();
    }
    //獲取Selector
    protected Selector getSelector(int port) throws IOException{
        ServerSocketChannel server = ServerSocketChannel.open();
        Selector sel = Selector.open();
        server.socket().bind(new InetSocketAddress(port));
        server.configureBlocking(false);
        server.register(sel, SelectionKey.OP_ACCEPT);
        return sel;
    }
    //監聽端口
    public void listen(){
            try {
                for(;;){
                selector.select();
                Iterator iter = selector.selectedKeys().iterator();
                while(iter.hasNext()){
                    SelectionKey key = (SelectionKey) iter.next();
                    iter.remove();
                    handleKey(key);
                }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

    }
    //處理事件
    protected void handleKey(SelectionKey key) throws IOException {
        if(key.isAcceptable())//接收請求
        {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
        }else if(key.isReadable())//讀信息
        {
            SocketChannel channel = (SocketChannel) key.channel();
            int count = channel.read(clientBuffer);
            if(count>0){
                clientBuffer.flip();
                CharBuffer charBuffer = decoder.decode(clientBuffer);
                System.out.println("Client>>"+charBuffer.toString());
                SelectionKey wkey = channel.register(selector, SelectionKey.OP_WRITE);
                wkey.attach(new HandleClient());
            }else{
                channel.close();
                clientBuffer.clear();
            }
        }else if(key.isWritable())//寫事件
        {
            SocketChannel channel = (SocketChannel) key.channel();
            HandleClient handle = (HandleClient) key.attachment();
            ByteBuffer block = handle.readBlock();
            if(block!=null){
                channel.write(block);
            }else{
                handle.close();
                channel.close();
            }
        }
    }
    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            int port = 12345;
            NIOServer server = new NIOServer(port);
            System.out.println("Listening on "+port);
            while(true){
                server.listen();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

Client:

/**
 * 
 */
package nio.file;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 文件下載客戶端
 * @author ztz
 *
 */
public class NIOClient {

    static int SIZE = 100;
    static InetSocketAddress ip = new InetSocketAddress("localhost",12345);
    static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
    static class DownLoad implements Runnable{

        int index;
        public DownLoad(int index){
            this.index = index;
        }
        @Override
        public void run() {
            try {
                long start = System.currentTimeMillis();
                SocketChannel client = SocketChannel.open();
                client.configureBlocking(false);
                Selector selector = Selector.open();
                client.register(selector, SelectionKey.OP_CONNECT);
                client.connect(ip);
                ByteBuffer buffer = ByteBuffer.allocate(8*1024);
                int total = 0;
                FOR: for(;;){
                    selector.select();
                    Iterator iter = selector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = (SelectionKey) iter.next();
                        iter.remove();
                        if(key.isConnectable()){
                            SocketChannel channel = (SocketChannel) key.channel();
                            if(channel.isConnectionPending()){
                                channel.finishConnect();
                                channel.write(encoder.encode(CharBuffer.wrap("Hello from "+index)));
                                channel.register(selector, SelectionKey.OP_READ);

                            }
                        }else if (key.isReadable()){
                            SocketChannel channel = (SocketChannel) key.channel();
                            int count = channel.read(buffer);
                            if(count > 0){
                                total += count;
                                buffer.clear();
                            }else{
                                client.close();
                                break FOR;
                            }
                        }
                    }

                    double last = (System.currentTimeMillis()-start)*1.0/1000;
                    System.out.println("Thread"+index+" download "+total+" bytes in "+last+"s.");

                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(SIZE);
        for(int index = 0;index < SIZE;index++){
            exec.execute(new DownLoad(index));
        }
        exec.shutdown();
    }

}

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!