Java NIO 實現進程通訊,解決用戶自定義數據的組包和拆分粘包的問題
TCP通訊過程中,由于網絡原因或者其他原因,經常出現粘包和半包現象。所以在具體編程中需要考慮。
下邊的 java 代碼是用 NIO 實現的一個Server端,消息的通訊格式為:
4字節int類型 [包頭] + 包體.
包頭描述出包體的長度。
package com.sof.nio;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class Reactor implements Runnable {
private static Logger logger = LoggerFactory.getLogger(Reactor.class); final Selector selector; final ServerSocketChannel serverSocket; public Reactor(String ip, int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { logger.debug("selector is waitting event...."); selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); if (keys.size() == 0) { logger.debug("nothing happened"); continue; } for (SelectionKey key : keys) { if (key.isAcceptable()) { logger.debug("Acceptable event happened"); } else if (key.isReadable()) { logger.debug("Readable event happened"); } else if (key.isWritable()) { logger.debug("Writeable event happened"); } else { logger.debug("others event happened"); } dispatch((SelectionKey) key); } keys.clear(); } } catch (IOException ex) { logger.error(ex.getMessage()); ex.printStackTrace(); } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } } public class Acceptor implements Runnable { public synchronized void run() { try { SocketChannel c = serverSocket.accept(); logger.info("got a new connection from: " + c.socket().toString()); if (c != null) { new Handler(selector, c); } } catch (IOException ex) { logger.error(ex.getMessage()); ex.printStackTrace(); } } }}
package com.sof.nio;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import com.sof.bas.Bytes2util; import com.sof.bas.Util2Bytes;
final public class Handler implements Runnable { private static Logger logger = LoggerFactory.getLogger(Handler.class); final SocketChannel socket; final SelectionKey sk;
static final int MESSAGE_LENGTH_HEAD = 4; byte[] head = new byte[4]; int bodylen = -1; Handler(Selector selector, SocketChannel socket) throws IOException { this.socket = socket; socket.configureBlocking(false); sk = socket.register(selector, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } public void run() { try { read(); } catch (IOException ex) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } logger.info("got a disconnect from " + socket.socket().toString()); sk.cancel(); } } public synchronized void read() throws IOException { ByteBuffer input = ByteBuffer.allocate(1024); socket.read(input); input.flip(); //讀取數據的原則: 要么讀取一個完整的包頭,要么讀取一個完整包體。不滿足這兩種情況,不對ByteBuffer進行任何的get操作 //但是要注意可能發生上次讀取了一個完整的包頭,下次讀才讀取一個完整包體情況。 //所以包頭部分必須用類的成員變量進行暫時的存儲,當完整讀取包頭和包體后,在給業務處理部分。 logger.debug("1: remain=" + input.remaining() + " bodylen=" + bodylen); while(input.remaining() > 0) { if (bodylen < 0) //還沒有生成完整的包頭部分, 該變量初始值為-1,并且在拼湊一個完整的消息包以后,再將該值設置為-1 { if ( input.remaining() >= MESSAGE_LENGTH_HEAD) //ByteBuffer緩沖區的字節數夠拼湊一個包頭 { input.get(head, 0, 4); bodylen = Util2Bytes.bytes2bigint(head); logger.debug("2: remain=" + input.remaining() + " bodylen=" + bodylen); } else//ByteBuffer緩沖區的字節數不夠拼湊一個包頭,什么操作都不做,退出這次處理,繼續等待 { logger.debug("3: remain=" + input.remaining() + " bodylen=" + bodylen); break; } } else if(bodylen > 0) //包頭部分已經完整生成. { if (input.remaining() >= bodylen) //緩沖區的內容夠一個包體部分 { byte[] body = new byte[bodylen]; input.get(body, 0, bodylen); byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen]; System.arraycopy(head, 0, headandbody, 0, head.length); System.arraycopy(body,0, headandbody, head.length, body.length); bodylen = -1; logger.debug("4: remain=" + input.remaining() + " bodylen=" + bodylen); Bytes2util.outputHex(headandbody, 16); } else ///緩沖區的內容不夠一個包體部分,繼續等待,跳出循環等待下次再出發該函數 { System.out.println("5: remain=" + input.remaining() + " bodylen=" + bodylen); break; } } else if(bodylen == 0) //沒有包體部分,僅僅有包頭的情況 { byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen]; System.arraycopy(head, 0, headandbody, 0, head.length); Bytes2util.outputHex(headandbody, 16); bodylen = -1; } } sk.interestOps(SelectionKey.OP_READ); }}</pre>