通過JAVA NIO實現Socket服務器與客戶端功能

dwd4 9年前發布 | 2K 次閱讀 Java

package niocommunicate;

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;

public class Server {

private Selector selector = getSelector();
private ServerSocketChannel ss = null;
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(20));

private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();

public Selector getSelector() {
    try {
        return Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}

/**
 * 創建非阻塞服務器綁定5555端口
 */
public Server() {
    try {
        ss = ServerSocketChannel.open();
        ss.bind(new InetSocketAddress(5555));
        ss.configureBlocking(false);
        if (selector == null) {
            selector = Selector.open();
        }
        ss.register(selector, SelectionKey.OP_ACCEPT);
    } catch (Exception e) {
        e.printStackTrace();
        close();
    }
}

/**
 * 關閉服務器
 */
private void close() {
    threadPool.shutdown();
    try {
        if (ss != null) {
            ss.close();
        }
        if (selector != null) {
            selector.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
 * 啟動選擇器監聽客戶端事件
 */
private void start() {
    threadPool.execute(new Runnable() {

        @Override
        public void run() {
            try {
                while (true) {
                    if (selector.select(10) == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectedKey = iterator.next();
                        iterator.remove();
                        try {
                            if (selectedKey.isReadable()) {

                                if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                    selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
                                    threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                }

                            } else if (selectedKey.isWritable()) {
                                Object responseMessage = selectedKey.attachment();
                                SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                selectedKey.interestOps(SelectionKey.OP_READ);
                                if (responseMessage != null) {
                                    threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
                                            responseMessage));
                                }
                            } else if (selectedKey.isAcceptable()) {
                                ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                SocketChannel clientSocket = ssc.accept();
                                if (clientSocket != null) {
                                    clientSocket.configureBlocking(false);
                                    clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                }
                            }
                        } catch (CancelledKeyException cc) {
                            selectedKey.cancel();
                            selectionKeyMap.remove(selectedKey.hashCode());
                        }
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
                close();
            }
        }

    });
}

/**
 * 響應數據給客戶端線程
 * @author haoguo
 *
 */
private class WriteClientSocketHandler implements Runnable {
    SocketChannel client;
    Object respnoseMessage;

    WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
        this.client = client;
        this.respnoseMessage = respnoseMessage;
    }

    @Override
    public void run() {
        byte[] responseByteData = null;
        String logResponseString = "";
        if (respnoseMessage instanceof byte[]) {
            responseByteData = (byte[]) respnoseMessage;
            logResponseString = new String(responseByteData);
        } else if (respnoseMessage instanceof String) {
            logResponseString = (String) respnoseMessage;
            responseByteData = logResponseString.getBytes();
        }
        if (responseByteData == null || responseByteData.length == 0) {
            System.out.println("響應的數據為空");
            return;
        }
        try {
            client.write(ByteBuffer.wrap(responseByteData));
            System.out.println("server響應客戶端[" + client.keyFor(selector).hashCode() + "]數據 :[" + logResponseString
                    + "]");
        } catch (IOException e) {
            e.printStackTrace();
            try {
                client.close();
            } catch (IOException e1) {

                e1.printStackTrace();
            }
        }
    }
}

/**
 * 讀客戶端發送數據線程
 * @author haoguo
 *
 */
private class ReadClientSocketHandler implements Runnable {
    private SocketChannel client;
    private ByteBuffer tmp = ByteBuffer.allocate(1024);
    private SelectionKey selectionKey;

    ReadClientSocketHandler(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
        this.client = (SocketChannel) selectionKey.channel();
    }

    @Override
    public void run() {
        try {
            tmp.clear();
            byte[] data = new byte[0];
            int len = -1;
            while ((len = client.read(tmp)) > 0) {
                data = Arrays.copyOf(data, data.length + len);
                System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                tmp.rewind();
            }
            if (data.length == 0) {
                return;
            }
            System.out.println("接收到客戶端[" + client.keyFor(selector).hashCode() + "]數據 :[" + new String(data) + "]");
            // dosomthing
            byte[] response = "response".getBytes();
            client.register(selector, SelectionKey.OP_WRITE, response);
        } catch (IOException e) {

            System.out.println("客戶端[" + selectionKey.hashCode() + "]關閉了連接");
            try {
                SelectionKey selectionKey = client.keyFor(selector);
                selectionKey.cancel();
                client.close();
            } catch (IOException e1) {

                e1.printStackTrace();
            }
        } finally {
            selectionKeyMap.remove(selectionKey.hashCode());
        }
    }
}

public static void main(String[] args) {
    Server server = new Server();
    server.start();
}

}</pre>

package niocommunicate;

import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List;

public class Client { SocketChannel client; Selector selctor = getSelector();

private volatile boolean run = true;

private List<Object> messageQueue = new LinkedList<>();

public Selector getSelector() {
    try {
        return Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}

public Client() {
    try {
        client = SocketChannel.open();
        client.configureBlocking(false);
        client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));
        client.register(selctor, SelectionKey.OP_CONNECT);
    } catch (IOException e) {
        e.printStackTrace();
    }
    new Thread(new Runnable() {

        @Override
        public void run() {
            while (run) {
                try {
                    if (selctor.select(20) == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        if (selectionKey.isConnectable()) {
                            SocketChannel sc = (SocketChannel) selectionKey.channel();
                            sc.finishConnect();
                            sc.register(selctor, SelectionKey.OP_READ);
                        } else if (selectionKey.isWritable()) {
                            selectionKey.interestOps(SelectionKey.OP_READ);
                            Object requestMessage = selectionKey.attachment();
                            SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();
                            byte[] requestByteData = null;
                            if (requestMessage instanceof byte[]) {
                                requestByteData = (byte[]) requestMessage;
                            } else if (requestMessage instanceof String) {
                                requestByteData = ((String) requestMessage).getBytes();
                                System.out.println("client send Message:[" + requestMessage + "]");
                            } else {
                                System.out.println("unsupport send Message Type" + requestMessage.getClass());
                            }
                            System.out.println("requestMessage:" + requestMessage);
                            if (requestByteData != null && requestByteData.length > 0) {
                                try {
                                    writeSocketChannel.write(ByteBuffer.wrap(requestByteData));
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        } else if (selectionKey.isReadable()) {
                            SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer tmp = ByteBuffer.allocate(1024);
                            int len = -1;
                            byte[] data = new byte[0];
                            if ((len = readSocketChannel.read(tmp)) > 0) {
                                data = Arrays.copyOf(data, data.length + len);
                                System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                                tmp.rewind();
                            }
                            if (data.length > 0) {
                                System.out.println("客戶端接收到數據:[" + new String(data) + "]");
                            }
                        }
                    }
                } catch (IOException e1) {
                    e1.printStackTrace();
                    close();
                }
            }
        }
    }).start();
    try {
        Thread.sleep(200);
    } catch (InterruptedException e) {

        e.printStackTrace();
    }
}

public void close() {
    try {
        SelectionKey selectionKey = client.keyFor(selctor);
        selectionKey.cancel();
        client.close();
        run = false;
    } catch (IOException e) {

        e.printStackTrace();
    }
}

public void writeData(String data) {
    messageQueue.add(data);
    while (messageQueue.size() > 0) {
        Object firstSendData = messageQueue.remove(0);
        try {
            client.register(selctor, SelectionKey.OP_WRITE, firstSendData);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(40);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {

    Client client = new Client();
    long t1 = System.currentTimeMillis();
    for (int i = 10; i < 200; i++) {
        client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"
                + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);
    }
    long t2 = System.currentTimeMillis();
    System.out.println("總共耗時:" + (t2 - t1) + "ms");
    client.close();
}

}</pre>

package niocommunicate;

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;

public class Server {

private Selector selector = getSelector();
private ServerSocketChannel ss = null;
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(20));

private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();
private volatile boolean run = true;
private volatile boolean isClose = false;

public Selector getSelector() {
    try {
        return Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}

/**
 * 創建非阻塞服務器綁定5555端口
 */
public Server() {
    try {
        ss = ServerSocketChannel.open();
        ss.bind(new InetSocketAddress(5555));
        ss.configureBlocking(false);
        if (selector == null) {
            selector = Selector.open();
        }
        ss.register(selector, SelectionKey.OP_ACCEPT);
    } catch (Exception e) {
        e.printStackTrace();
        close();
    }
}

public boolean isClose() {
    return isClose;
}

/**
 * 關閉服務器
 */
private void close() {
    run = false;
    isClose = true;
    threadPool.shutdown();
    try {
        if (ss != null) {
            ss.close();
        }
        if (selector != null) {
            selector.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
 * 啟動選擇器監聽客戶端事件
 */
private void start() {
    threadPool.execute(new Runnable() {

        @Override
        public void run() {
            try {
                while (run) {
                    if (selector.select(10) == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectedKey = iterator.next();
                        iterator.remove();
                        try {
                            if (selectedKey.isReadable()) {

                                if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                    selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
                                    threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                }

                            } else if (selectedKey.isWritable()) {
                                SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                selectedKey.interestOps(SelectionKey.OP_READ);
                                List<Object> list = responseMessageQueue.get(selectedKey.hashCode());
                                if (list == null) {
                                    list = new LinkedList<Object>();
                                    responseMessageQueue.put(selectedKey.hashCode(), list);
                                }
                                while (list.size() > 0) {
                                    Object responseMessage = list.remove(0);
                                    if (responseMessage != null) {
                                        threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
                                                responseMessage));
                                    }
                                }
                            } else if (selectedKey.isAcceptable()) {
                                ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                SocketChannel clientSocket = ssc.accept();
                                if (clientSocket != null) {
                                    clientSocket.configureBlocking(false);
                                    clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                }
                            }
                        } catch (CancelledKeyException cc) {
                            selectedKey.cancel();
                            int hashCode = selectedKey.hashCode();
                            selectionKeyMap.remove(hashCode);
                            responseMessageQueue.remove(hashCode);
                        }
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
                close();
            }
        }

    });
}

/**
 * 響應數據給客戶端線程
 *
 * @author haoguo
 *
 */
private class WriteClientSocketHandler implements Runnable {
    SocketChannel client;
    Object respnoseMessage;

    WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
        this.client = client;
        this.respnoseMessage = respnoseMessage;
    }

    @Override
    public void run() {
        byte[] responseByteData = null;
        String logResponseString = "";
        if (respnoseMessage instanceof byte[]) {
            responseByteData = (byte[]) respnoseMessage;
            logResponseString = new String(responseByteData);
        } else if (respnoseMessage instanceof String) {
            logResponseString = (String) respnoseMessage;
            responseByteData = logResponseString.getBytes();
        }
        if (responseByteData == null || responseByteData.length == 0) {
            System.out.println("響應的數據為空");
            return;
        }
        try {
            client.write(ByteBuffer.wrap(responseByteData));
            System.out.println("server響應客戶端[" + client.keyFor(selector).hashCode() + "]數據 :[" + logResponseString
                    + "]");
        } catch (IOException e) {
            e.printStackTrace();
            try {
                SelectionKey selectionKey = client.keyFor(selector);
                if (selectionKey != null) {
                    selectionKey.cancel();
                    int hashCode = selectionKey.hashCode();
                    responseMessageQueue.remove(hashCode);
                }
                if (client != null) {
                    client.close();
                }
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
}

/**
 * 讀客戶端發送數據線程
 *
 * @author haoguo
 *
 */
private class ReadClientSocketHandler implements Runnable {
    private SocketChannel client;
    private ByteBuffer tmp = ByteBuffer.allocate(1024);
    private SelectionKey selectionKey;
    int hashCode;

    ReadClientSocketHandler(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
        this.client = (SocketChannel) selectionKey.channel();
        this.hashCode = selectionKey.hashCode();
    }

    @Override
    public void run() {
        try {
            tmp.clear();
            byte[] data = new byte[0];
            int len = -1;
            while ((len = client.read(tmp)) > 0) {
                data = Arrays.copyOf(data, data.length + len);
                System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                tmp.rewind();
            }
            if (data.length == 0) {
                return;
            }
            String readData = new String(data);
            System.out.println("接收到客戶端[" + hashCode + "]數據 :[" + readData.substring(0, 3) + "]");
            // dosomthing
            byte[] response = ("response" + readData.substring(0, 3)).getBytes();
            List<Object> list = responseMessageQueue.get(hashCode);
            list.add(response);
            client.register(selector, SelectionKey.OP_WRITE);
            // client.register(selector, SelectionKey.OP_WRITE, response);
        } catch (IOException e) {
            System.out.println("客戶端[" + selectionKey.hashCode() + "]關閉了連接");
            try {
                SelectionKey selectionKey = client.keyFor(selector);
                if (selectionKey != null) {
                    selectionKey.cancel();
                }
                if (client != null) {
                    client.close();
                }
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        } finally {
            selectionKeyMap.remove(hashCode);
        }
    }
}

public static void main(String[] args) {
    Server server = new Server();
    server.start();
}

}</pre>

 本文由用戶 dwd4 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!