Java NIO 實現進程通訊,解決用戶自定義數據的組包和拆分粘包的問題

jopen 12年前發布 | 95K 次閱讀 Java NIO 網絡工具包

TCP通訊過程中,由于網絡原因或者其他原因,經常出現粘包和半包現象。所以在具體編程中需要考慮。

下邊的 java 代碼是用 NIO 實現的一個Server端,消息的通訊格式為:


4字節int類型 [包頭] + 包體.

包頭描述出包體的長度。


package com.sof.nio;

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;

import java.util.Set;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

public class Reactor implements Runnable {

private static Logger logger = LoggerFactory.getLogger(Reactor.class);

final Selector selector;
final ServerSocketChannel serverSocket;

public Reactor(String ip, int port) throws IOException
{
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(port));
    serverSocket.configureBlocking(false);
    SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    sk.attach(new Acceptor());
}

public void run()
{
    try
    {
        while (!Thread.interrupted())
        {
            logger.debug("selector is waitting  event....");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            if (keys.size() == 0)
            {
                logger.debug("nothing happened");
                continue;
            }

            for (SelectionKey key : keys)
            {
                if (key.isAcceptable())
                {
                    logger.debug("Acceptable event happened");
                }
                else if (key.isReadable())
                {
                    logger.debug("Readable event happened");
                }
                else if (key.isWritable())
                {
                    logger.debug("Writeable event happened");
                }
                else
                {
                    logger.debug("others event happened");
                }
                dispatch((SelectionKey) key);
            }
            keys.clear();
        }
    }
    catch (IOException ex)
    {
        logger.error(ex.getMessage());
        ex.printStackTrace();
    }
}

void dispatch(SelectionKey k)
{
    Runnable r = (Runnable) (k.attachment());
    if (r != null)
    {
        r.run();
    }
}

public class Acceptor implements Runnable
{
    public synchronized void run()
    {
        try
        {
            SocketChannel c = serverSocket.accept();
            logger.info("got a new connection from:  " + c.socket().toString());
            if (c != null)
            {
                new Handler(selector, c);
            }
        }
        catch (IOException ex)
        {
            logger.error(ex.getMessage());
            ex.printStackTrace();
        }
    }
}

}

package com.sof.nio;

import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import com.sof.bas.Bytes2util; import com.sof.bas.Util2Bytes;

final public class Handler implements Runnable { private static Logger logger = LoggerFactory.getLogger(Handler.class); final SocketChannel socket; final SelectionKey sk;

static final int MESSAGE_LENGTH_HEAD = 4;
byte[] head = new byte[4];
int bodylen = -1;

Handler(Selector selector, SocketChannel socket) throws IOException
{
    this.socket = socket;
    socket.configureBlocking(false);
    sk = socket.register(selector, 0);
    sk.attach(this);
    sk.interestOps(SelectionKey.OP_READ);
    selector.wakeup();
}

public void run()
{
    try
    {
        read();
    }
    catch (IOException ex)
    {
        try
        {
            socket.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        logger.info("got a disconnect from " + socket.socket().toString());
        sk.cancel();
    }
}

public synchronized void read() throws IOException
{
    ByteBuffer input = ByteBuffer.allocate(1024);
    socket.read(input);
    input.flip();

    //讀取數據的原則: 要么讀取一個完整的包頭,要么讀取一個完整包體。不滿足這兩種情況,不對ByteBuffer進行任何的get操作
    //但是要注意可能發生上次讀取了一個完整的包頭,下次讀才讀取一個完整包體情況。
    //所以包頭部分必須用類的成員變量進行暫時的存儲,當完整讀取包頭和包體后,在給業務處理部分。
    logger.debug("1: remain=" + input.remaining() + " bodylen=" + bodylen);
    while(input.remaining() > 0)
    {
        if (bodylen < 0) //還沒有生成完整的包頭部分, 該變量初始值為-1,并且在拼湊一個完整的消息包以后,再將該值設置為-1
        {
            if ( input.remaining() >= MESSAGE_LENGTH_HEAD) //ByteBuffer緩沖區的字節數夠拼湊一個包頭
            {
                input.get(head, 0, 4);
                bodylen = Util2Bytes.bytes2bigint(head);
                logger.debug("2: remain=" + input.remaining() + " bodylen=" + bodylen);
            }
            else//ByteBuffer緩沖區的字節數不夠拼湊一個包頭,什么操作都不做,退出這次處理,繼續等待
            {
                logger.debug("3: remain=" + input.remaining() + " bodylen=" + bodylen);
                break;
            }
        }
        else if(bodylen > 0) //包頭部分已經完整生成. 
        {
            if (input.remaining() >= bodylen) //緩沖區的內容夠一個包體部分
            {
                byte[] body = new byte[bodylen];
                input.get(body, 0, bodylen);
                byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
                System.arraycopy(head, 0, headandbody, 0, head.length);
                System.arraycopy(body,0, headandbody, head.length, body.length);            
                bodylen = -1;
                logger.debug("4: remain=" + input.remaining() + " bodylen=" + bodylen);
                Bytes2util.outputHex(headandbody, 16);
            }
            else  ///緩沖區的內容不夠一個包體部分,繼續等待,跳出循環等待下次再出發該函數
            {
                System.out.println("5: remain=" + input.remaining() + " bodylen=" + bodylen);
                break;
            }
        }
        else if(bodylen == 0) //沒有包體部分,僅僅有包頭的情況
        {
            byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
            System.arraycopy(head, 0, headandbody, 0, head.length);
            Bytes2util.outputHex(headandbody, 16);
            bodylen = -1;
        }
    }

    sk.interestOps(SelectionKey.OP_READ);
}

}</pre>

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!