使用Java NIO編寫高性能的服務器
從JDK 1.4開始,Java的標準庫中就包含了NIO,即所謂的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,當然包括了Socket。NonBlocking的IO就是對select(Unix平臺下)以及 WaitForMultipleObjects(Windows平臺)的封裝,提供了高性能、易伸縮的服務架構。
Server:
/**
*
*/
package nio.file;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
/**
* @author ztz
*
*/
public class NIOServer {
static int BLOCK=4096;
//處理與客戶端的交互
public class HandleClient{
protected FileChannel channel;
protected ByteBuffer buffer;
public HandleClient() throws IOException{
this.channel = new FileInputStream(filename).getChannel();
this.buffer = ByteBuffer.allocate(BLOCK);
}
public ByteBuffer readBlock(){
try {
buffer.clear();
int count = channel.read(buffer);
buffer.flip();
if(count<=0)
return null;
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
public void close(){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected Selector selector;
protected String filename = "XXX";
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
public NIOServer(int port) throws IOException{
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
//獲取Selector
protected Selector getSelector(int port) throws IOException{
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
//監聽端口
public void listen(){
try {
for(;;){
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//處理事件
protected void handleKey(SelectionKey key) throws IOException {
if(key.isAcceptable())//接收請求
{
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable())//讀信息
{
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(clientBuffer);
if(count>0){
clientBuffer.flip();
CharBuffer charBuffer = decoder.decode(clientBuffer);
System.out.println("Client>>"+charBuffer.toString());
SelectionKey wkey = channel.register(selector, SelectionKey.OP_WRITE);
wkey.attach(new HandleClient());
}else{
channel.close();
clientBuffer.clear();
}
}else if(key.isWritable())//寫事件
{
SocketChannel channel = (SocketChannel) key.channel();
HandleClient handle = (HandleClient) key.attachment();
ByteBuffer block = handle.readBlock();
if(block!=null){
channel.write(block);
}else{
handle.close();
channel.close();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
try {
int port = 12345;
NIOServer server = new NIOServer(port);
System.out.println("Listening on "+port);
while(true){
server.listen();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client:
/**
*
*/
package nio.file;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 文件下載客戶端
* @author ztz
*
*/
public class NIOClient {
static int SIZE = 100;
static InetSocketAddress ip = new InetSocketAddress("localhost",12345);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class DownLoad implements Runnable{
int index;
public DownLoad(int index){
this.index = index;
}
@Override
public void run() {
try {
long start = System.currentTimeMillis();
SocketChannel client = SocketChannel.open();
client.configureBlocking(false);
Selector selector = Selector.open();
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(ip);
ByteBuffer buffer = ByteBuffer.allocate(8*1024);
int total = 0;
FOR: for(;;){
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if(key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();
channel.write(encoder.encode(CharBuffer.wrap("Hello from "+index)));
channel.register(selector, SelectionKey.OP_READ);
}
}else if (key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if(count > 0){
total += count;
buffer.clear();
}else{
client.close();
break FOR;
}
}
}
double last = (System.currentTimeMillis()-start)*1.0/1000;
System.out.println("Thread"+index+" download "+total+" bytes in "+last+"s.");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(SIZE);
for(int index = 0;index < SIZE;index++){
exec.execute(new DownLoad(index));
}
exec.shutdown();
}
}
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!