Netty5 Read事件處理過程_源碼講解

jopen 9年前發布 | 92K 次閱讀 Netty5 Netty 網絡工具包

Netty是對Nio的一個封裝,關于網絡的所有操作都是通過事件的方式完成的。例如連接創建、read事件、write事件都是通過Nio來完成 的。那netty是怎么啟動監聽的呢? 在什么地方啟動的呢?此處不為大家設置懸念,一次性告訴大家。通過循環掃描的方式來實現監聽的。具體的方法類位于NioEventLoop的run方法中 (趕緊進去看看吧!! 淺顯易懂)。

下面是netty的acceptor線程創建連接的代碼。位于類NioEventLoop的processSelectedKey中(至于 processSelectedKey是怎么被調用的,自己看看調用鏈就行了(eclipse用ctrl+Shift+H就可以查看到選中方法的調用 鏈))。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
 
        try {
            //得到當前的key關注的事件
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //一個剛剛創建的NioServersocketChannel感興趣的事件是0。
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//可以讀取操作  --對于serverSocket來說就是acceptor事件、對于socketChannel來說就是read事件 
                //INFO: channel類型為io.netty.channel.socket.nio.NioSocketChannel unsafe類型為io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe
                Object obj = k.attachment();//得到NioServerSocketChannel或者NioSocketChannel
                if(obj instanceof NioServerSocketChannel){
                    System.out.println(obj.getClass().getName()+ " 開始接收連接");
                }else{
                    System.out.println(obj.getClass().getName()+ " 開始接收字節");
                }
                //不同的socketChannel對于那個的unsafe是不同的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe
                unsafe.read();//對于接受鏈接或者read興趣都會添加進入read操作調用serverSocket->NioMessageUnsafe
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對于半包消息進行輸出操作
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
 
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

這里我們以Read事件的處理(NioByteUnsafe)為線索進行講解。后續會有基于byte的unsafe進行講解的(Unsafe不知道為啥要這 么叫,本人也感到挺費解的,不過現在看來感覺就是一個工具對象。不要從名稱上懼怕它)。下面來看NioByteUnsafe(該類是AbstractNioByteChannel的一個內部類)的read方法進行講 解。直接講代碼(后面也會有圖形講解,方便大家理解):

public void read() {
            //得到config對象、pipeline對象
            final ChannelConfig config = config();
            //得到對應的管道對象
            final ChannelPipeline pipeline = pipeline();
            //實際的內存分配器---
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                //創建一個allocHandle對象--AdaptiveRecvByteBufAllocator
                //RecvByteBufAllocator負責內存分配的算法問題 
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                int byteBufCapacity = allocHandle.guess();
                int totalReadAmount = 0;
                do {
                    //可能是 direct或者 heap  從與當前socket相關的allocator得到byteBuf數組
//                    byteBuf =allocHandle.allocate(allocator);
                    //每次從內核中讀取數據netty都會分配內存
                    byteBuf = allocator.ioBuffer(byteBufCapacity);
                     //獲得可以寫入的容量的大小
                    int writable = byteBuf.writableBytes(); //分一個多大的內存就從socket中讀取多大的數據
                    int localReadAmount = doReadBytes(byteBuf);//從socket中讀取數據到bytebuf中
                    if (localReadAmount <= 0) {//發生了讀取事件,但是讀取的長度是負數,
                        // not was read release the buffer
                        byteBuf.release();//釋放到Thread Cache中
                        close = localReadAmount < 0;//是否進行關閉,關鍵要看讀取到的數據的長度是否為-1;
                        break;
                    }
                    //發起讀取事件---如果是第一次積累數據的話,那么就會將當前的bytebuf作為累積對象,供繼續使用
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;//由pipeline進行byteBuf的釋放
                    //避免內存溢出,
                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);//每次讀取的消息的數量都會有限制的,也就說,每次處理read事件的消息量是可以配置的
                //讀取完成---處理完一次 讀取事件
                pipeline.fireChannelReadComplete();
                //對本次讀取的數據量進行記錄,便于下一次為當前的Channel分配合適大小的buffer
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }
//上述代碼段說明:
/**
 config.getRecvByteBufAllocator().newHandle(); 負責內存分配算法
    而
ByteBufAllocator 負責具體的內存分配-分配到堆還是直接內存
*/

這就是對一個read的處理基本流程,就是將從socket中讀取到的放入到分配器分配的bytebuf,然后將其傳入到pipeline.fireChannelRead(byteBuf);中,至于在pipeline是怎樣的傳遞的,我們從這個方法中是無法查看到的。這也是我們這篇文章的主要內容(別的內存也很重要哦!關鍵是我已經添加了很多注釋了!)。就是要看看在得到bytebuf后,pipeline是怎么處理傳入進去的bytebuf的。我們來對pipeline.fireChannelRead(byteBuf);窮追(ctrl+shift+H eclipse)到具體的實現,

我們發現,最終會調用到的ChannelHandler接口的

void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

ChannelHandler有很多我們具體選用哪一個呢?動動腦子就知道,我們pipeline中存儲的都是ChannelHandler,有哪些個Handler,就要看我們在啟動代碼中是怎樣設置了。來看看我的啟動代碼(精簡版,沒有寫全,所以這里看不懂得話,建議你寫個Netty的小demo).上代碼:

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
//             .option(ChannelOption.ALLOCATOR, )//設置內存分配器
              .option(ChannelOption.SO_SNDBUF, 1024)//發送緩沖器
              .option(ChannelOption.SO_RCVBUF, 1024)
            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收緩沖器
             .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel對應的ChannelPipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//客戶端新接入的連接socketChannel對應的ChannelPipeline的Handler
                 @Override
                 public void initChannel(SocketChannel ch) {
                     SocketChannelConfig config=ch.config();
                     ChannelPipeline p = ch.pipeline();
                     p
                     .addLast(new LineBasedFrameDecoder(30))//也會將回車符刪除掉--是以換行符作為分隔的
                     .addLast(new DiscardServerHandler());
                 }
             });

由此可以看到,這里第一個被調用的ChannelHandler是LineBasedFrameDecoder。看看LineBasedFrameDecoder是怎么實現ChannelRead方法的。翻看了弗雷之后,我們終于找到了channelRead方法。由此可以看到,在AbstractNioByteChannel的read方法中的pipeline.fireChannelRead(byteBuf);按照我的啟動代碼(雖然說是按照我的,但是按照你們的也是這樣,因為byte在通過網絡接收之后,都要進行decode,第一個經過的channelHandler肯定是ByteToMessageDecoder,不信,你看看自己的啟動代碼試試),最終調用的是ByteToMessageDecoder.channelRead()  ,上代碼:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //緩沖區的大小沒有超過需要寫入的數據的大小
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());//擴展緩沖區--查看實現后,就是通過分配一個更大的,然后復制一下字節數據
                    }
                    cumulation.writeBytes(data);//將數據寫入到積累對象中
                    data.release();//釋放bytebuffer(heap或者direct)--通過引用的方式進行釋放緩沖區(至于什么是引用方式釋放,我們會有一個特定的章節進行講解)
                }
                //收集完畢之后解析收集到的字符串---通常調用子類的方法實現,在具體實現中,用out來承載解析出來的msg
                callDecode(ctx, cumulation, out);//實現的時候,不要釋放我們的累積對象cumulation
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
            if (cumulation != null && !cumulation.isReadable()) {//如果累積對象為null或者沒有可讀內容的話,那么就將累積對象釋放掉(因為空了或者為null了)
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();//代碼 11
                decodeWasNull = size == 0;
                //針對解析后的out結果中的msg的對象,將解析出來的message(具體的類型,請自己看實現.是怎樣做的)傳遞到pipeline中。
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();//代碼  22
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

提示: 一個pipeline,為某個socketChannel所有,也就是說pipeline里的channelHandler,也是為某個socketchannel所享用的。不會出現多個線程共享一個channelHanler的情況(我們可以讓他們共享一個handler,但是我們得保證這個共享的handler是一個無狀態的handler,例如我們現在就要講解的ByteToMessageDecoder就是一個有狀態的handler,所以就不能共享,就要在每次初始化socketChannel的pipeline時,都要重新new一個ByteToMessageDecoder,不信大家,可以可以看一下ByteToMessageDecoder的實現。我直接粘貼代碼吧!!(看看我的注釋哦)如下:).

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {

    ByteBuf cumulation;//因為單詞cumulation --累積 意思,也就是,這個成員對象,就是用來作為半包的累積存儲的對象來使用的
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
}

下面我們看一下callDecode()是怎樣完成的,上代碼,

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {//傳入的字節是否有可讀數據
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {//如果此handler被移除
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

上面的代碼,很容易易理解,就是進行必要的校驗,其中最惹人眼的就是decode()方法,而decode方法該類中是抽象方法:

/**
     * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
     * {@link ByteBuf} has nothing to read anymore, till nothing was read from the input {@link ByteBuf} or till
     * this method returns {@code null}.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added

     * @throws Exception    is thrown if an error accour
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

我們來看一下具體實現有哪些?見下圖

Netty5 Read事件處理過程_源碼講解

我們發現ByteToMessageDecoder的的decode子類實現有好多,我們為了講解的方便我們選擇使用,FixedLengthFrameDecoder作為研究對象。至于別的decoder大家有時間自己去看一下吧1!!(很簡單的,不要害怕).

上代碼(FixedLengthFrameDecoder.decode方法):

 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);//out是外界傳入的一個用來盛放解析出來的message對象的容器
        }
    }

此處調用了自有的decode方法,上代碼:

Netty5 Read事件處理過程_源碼講解

這里我們看到如果可以讀取的數據長度沒有要求的長度搞的話,那么就會以傳入的ByteBuf參數(其實這里就是那個累積對象)為基礎,構建一個新的ByteBuf。

下面我們這里特意大概翻譯下in.readBytes(frameLength)方法的注釋是怎樣的情況.

ByteBuf io.netty.buffer.ByteBuf.readBytes(int length)

Transfers this buffer's data to a newly created buffer starting at the currentreaderIndexand increases thereaderIndexby the number of the transferred bytes (=length). The returned buffer'sreaderIndexandwriterIndexare0andlengthrespectively.

  • Returns:

  • the newly created buffer which contains the transferred bytes 該方法返回的是新創建的盛有傳輸數據的直接緩沖對象

將當前的ByteBuf對象的數據傳輸到一個剛剛創建的ByteBuf,就是從readerindex開始,然后增加ReaderIndex的值,增加length個字節數。返回的字節的的readerindex和writerindex分別是0和length。

通過閱讀上面的注釋的閱讀,我們可以看到,就FixedLengthFrameDecoder解析器來說,其實累積對象對readerIndex進行了改變。也就是說,累積對象可以讀取的數據的數據量是發生變化的(我們可以在源代碼中看一下在decode前后,readerindex是否發生了變化,觀察一下就知道了。這個很簡單哦,看一下我是怎么知道這一點的,見下圖)。至于除了FixedLengthFrameDecoder之外的別的decoder是否也改變了readerindex,大家可以去具體查看一下代碼(不過我個人覺得肯定都是這么做滴!!)。

Netty5 Read事件處理過程_源碼講解

----------------------------------------------------------------------------------------------------------

累積對象的內存釋放問題講解完了(其實很簡單,就是把readerindex改變了一下,具體長度就看解析出來的message的長度了,哈哈)。

講到這里,會涉及到一個解析出來的message在被pipeline中的其它handler處理完畢后的內存釋放問題。怎么解決? 什么時候釋放這些message占用的空間呢? 

我們從上面代碼11 和代碼22之間的代碼可以看出,就是在將子類解析出來的msg,傳入到后續的( 因為當前的decoder Handler負責將大的ByteBuf累積對象轉換成小的后續handler可以理解的msg對象,數據這個msg對象是個什么類型,就要看子類是怎么將什么類型的msg放入到out盛放容器的了 )handler中。由此可以看出: 對于一個socketChannel,其message的處理順序不會出現錯亂。永遠都是先處理完前一個,然后才是后一個,因為這是在一個線程里依次處理所有的msg的。

message是在什么時候釋放呢? 還是看  代碼11 和代碼22之間的代碼  大家可以自己去看看代碼。我發現就是被丟了,被JVM回收了,沒有重復利用。我個人覺得可以重復利用。關于這個問題,大家回去自己理解一下吧!!有時間得的話,我也會專門將一下的。畢竟這篇文章是講Netty read事件處理的。不是將netty內存分配的。放心我不會忘記這個問題的。我會在后續的文章中講解的。歡迎大家吐槽!!!

本文是本人學習Netty后的個人總結的分享,需要說明的是本人僅僅代表個人的看法,不是什么官方權威的版本,如果有不當之處,還請賜教!!歡迎吐槽!!

來自:http://my.oschina.net/hotbain/blog/421150


 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!