Netty那點事(2)Netty中的buffer

Alice2539 8年前發布 | 23K 次閱讀 Netty 網絡工具包

來自: http://www.importnew.com/17660.html

本系列:

Netty 那點事(1)概述

上一篇文章我們概要介紹了Netty的原理及結構,下面幾篇文章我們開始對Netty的各個模塊進行比較詳細的分析。Netty的結構最底層是buffer機制,這部分也相對獨立,我們就先從buffer講起。

What: buffer二三事

buffer中文名又叫緩沖區,按照維基百科的解釋,是”在數據傳輸時,在內存里開辟的一塊臨時保存數據的區域”。它其實是一種化同步為異步的機制,可以解決數據傳輸的速率不對等以及不穩定的問題。

根據這個定義,我們可以知道涉及I/O(特別是I/O寫)的地方,基本會有buffer的存在。就Java來說,我們非常熟悉的Old I/O– InputStream & OutputStream 系列API,基本都是在內部使用到了buffer。Java課程老師就教過,必須調用 OutputStream.flush() ,才能保證數據寫入生效!

而NIO中則直接將buffer這個概念封裝成了對象,其中最常用的大概是ByteBuffer了。于是使用方式變為了:將數據寫入Buffer,flip()一下,然后將數據讀出來。于是,buffer的概念更加深入人心了!

Netty中的buffer也不例外。不同的是,Netty的buffer專為網絡通訊而生,所以它又叫ChannelBuffer(好吧其實沒有什么因果關系…)。我們下面就來講講Netty中的buffer。當然,關于Netty,我們必須講講它的所謂”Zero-Copy-Capable”機制。

When & Where: TCP/IP協議與buffer

TCP/IP協議是目前的主流網絡協議。它是一個多層協議,最下層是物理層,最上層是應用層(HTTP協議等),而在Java開發中,一般只接觸TCP以上,即傳輸層和應用層的內容。這也是Netty的主要應用場景。

TCP報文有個比較大的特點,就是它傳輸的時候,會先把應用層的數據項拆開成字節,然后按照自己的傳輸需要,選擇合適數量的字節進行傳輸。什么叫”自己的傳輸需要”?首先TCP包有最大長度限制,那么太大的數據項肯定是要拆開的。其次因為TCP以及下層協議會附加一些協議頭信息,如果數據項太小,那么可能報文大部分都是沒有價值的頭信息,這樣傳輸是很不劃算的。因此有了收集一定數量的小數據,并打包傳輸的Nagle算法(這個東東在HTTP協議里會很討厭,Netty里可以用setOption(“tcpNoDelay”, true)關掉它)。

這么說可能太學院派了一點,我們舉個例子吧:

發送時,我們這樣分3次寫入(‘|’表示兩個buffer的分隔):

<code> +-----+-----+-----+ | ABC | DEF | GHI | +-----+-----+-----+ </code>

接收時,可能變成了這樣:

<code> +----+-------+---+---+ | AB | CDEFG | H | I | +----+-------+---+---+ </code>

很好懂吧?可是,說了這么多,跟buffer有個什么關系呢?別急,我們來看下面一部分。

Why: buffer中的分層思想

我們先回到之前的 messageReceived 方法:

public void messageReceived(
        ChannelHandlerContext ctx, MessageEvent e) {
    // Send back the received message to the remote peer.
    transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
    e.getChannel().write(e.getMessage());
}

這里 MessageEvent.getMessage() 默認的返回值是一個 ChannelBuffer 。我們知道,業務中需要的”Message”,其實是一條應用層級別的完整消息,而一般的buffer工作在傳輸層,與”Message”是不能對應上的。那么這個ChannelBuffer是什么呢?

來一個官方給的圖,我想這個答案就很明顯了:

這里可以看到,TCP層HTTP報文被分成了兩個ChannelBuffer,這兩個Buffer對我們上層的邏輯(HTTP處理)是沒有意義的。但是兩個ChannelBuffer被組合起來,就成為了一個有意義的HTTP報文,這個報文對應的ChannelBuffer,才是能稱之為”Message”的東西。這里用到了一個詞”Virtual Buffer”,也就是所謂的”Zero-Copy-Capable Byte Buffer”了。頓時覺得豁然開朗了有沒有!

我這里總結一下, 如果說NIO的Buffer和Netty的ChannelBuffer最大的區別的話,就是前者僅僅是傳輸上的Buffer,而后者其實是傳輸Buffer和抽象后的邏輯Buffer的結合。 延伸開來說,NIO僅僅是一個網絡傳輸框架,而Netty是一個網絡應用框架,包括網絡以及應用的分層結構。

當然,在Netty里,默認使用 ChannelBuffer 表示”Message”,不失為一個比較實用的方法,但是 MessageEvent.getMessage() 是可以存放一個POJO的,這樣子抽象程度又高了一些,這個我們在以后講到 ChannelPipeline 的時候會說到。

How: Netty中的ChannelBuffer及實現

好了,終于來到了代碼實現部分。之所以啰嗦了這么多,因為我覺得,關于”Zero-Copy-Capable Rich Byte Buffer”,理解為什么需要它,比理解它是怎么實現的,可能要更重要一點。

我想可能很多朋友跟我一樣,喜歡”順藤摸瓜”式讀代碼–找到一個入口,然后順著查看它的調用,直到理解清楚。很幸運, ChannelBuffers (注意有s!)就是這樣一根”藤”,它是所有ChannelBuffer實現類的入口,它提供了很多靜態的工具方法來創建不同的Buffer,靠“順藤摸瓜”式讀代碼方式,大致能把各種ChannelBuffer的實現類摸個遍。先列一下ChannelBuffer相關類圖。

此外還有 WrappedChannelBuffer 系列也是繼承自 AbstractChannelBuffer ,圖放到了后面。

ChannelBuffer中的readerIndex和writerIndex

開始以為Netty的ChannelBuffer是對NIO ByteBuffer的一個封裝,其實不是的, 它是把ByteBuffer重新實現了一遍

以最常用的 HeapChannelBuffer 為例,其底層也是一個byte[],與ByteBuffer不同的是,它是可以同時進行讀和寫的,而不需要使用flip()進行讀寫切換。ChannelBuffer讀寫的核心代碼在 AbstactChannelBuffer 里,這里通過readerIndex和writerIndex兩個整數,分別指向當前讀的位置和當前寫的位置,并且,readerIndex總是小于writerIndex的。貼兩段代碼,讓大家能看的更明白一點:

public void writeByte(int value) {
    setByte(writerIndex ++, value);
}

public byte readByte() { if (readerIndex == writerIndex) { throw new IndexOutOfBoundsException("Readable byte limit exceeded: "

            + readerIndex);
}
return getByte(readerIndex ++);

}

public int writableBytes() { return capacity() - writerIndex; }

public int readableBytes() { return writerIndex - readerIndex; }</pre>

我倒是覺得這樣的方式非常自然,比單指針與flip()要更加好理解一些。AbstactChannelBuffer還有兩個相應的mark指針 markedReaderIndex 和 markedWriterIndex ,跟NIO的原理是一樣的,這里不再贅述了。

字節序Endianness與HeapChannelBuffer

在創建Buffer時,我們注意到了這樣一個方法: public static ChannelBuffer buffer(ByteOrder endianness, int capacity); ,其中 ByteOrder 是什么意思呢?

這里有個很基礎的概念:字節序(ByteOrder/Endianness)。它規定了多余一個字節的數字(int啊long什么的),如何在內存中表示。BIG_ENDIAN(大端序)表示高位在前,整型數 12 會被存儲為 0 0 0 12 四字節,而LITTLE_ENDIAN則正好相反。可能搞C/C++的程序員對這個會比較熟悉,而Javaer則比較陌生一點,因為Java已經把內存給管理好了。但是在網絡編程方面,根據協議的不同,不同的字節序也可能會被用到。目前大部分協議還是采用大端序,可參考 RFC1700

了解了這些知識,我們也很容易就知道為什么會有 BigEndianHeapChannelBuffer 和 LittleEndianHeapChannelBuffer 了!

DynamicChannelBuffer

DynamicChannelBuffer是一個很方便的Buffer,之所以叫Dynamic是因為它的長度會根據內容的長度來擴充,你可以像使用ArrayList一樣,無須關心其容量。實現自動擴容的核心在于 ensureWritableBytes 方法,算法很簡單:在寫入前做容量檢查,容量不夠時,新建一個容量x2的buffer,跟ArrayList的擴容是相同的。貼一段代碼吧(為了代碼易懂,這里我刪掉了一些邊界檢查,只保留主邏輯):

public void writeByte(int value) {
    ensureWritableBytes(1);
    super.writeByte(value);
}

public void ensureWritableBytes(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; }

int newCapacity = capacity();
int minNewCapacity = writerIndex() + minWritableBytes;
while (newCapacity < minNewCapacity) {
    newCapacity <<= 1;
}

ChannelBuffer newBuffer = factory().getBuffer(order(), newCapacity);
newBuffer.writeBytes(buffer, 0, writerIndex());
buffer = newBuffer;

}</pre>

CompositeChannelBuffer

CompositeChannelBuffer 是由多個ChannelBuffer組合而成的,可以看做一個整體進行讀寫。這里有一個技巧:CompositeChannelBuffer并不會開辟新的內存并直接復制所有ChannelBuffer內容,而是直接保存了所有ChannelBuffer的引用,并在子ChannelBuffer里進行讀寫,從而實現了”Zero-Copy-Capable”了。來段簡略版的代碼吧:

public class CompositeChannelBuffer{

//components保存所有內部ChannelBuffer
private ChannelBuffer[] components;
//indices記錄在整個CompositeChannelBuffer中,每個components的起始位置
private int[] indices;
//緩存上一次讀寫的componentId
private int lastAccessedComponentId;

public byte getByte(int index) {
    //通過indices中記錄的位置索引到對應第幾個子Buffer
    int componentId = componentId(index);
    return components[componentId].getByte(index - indices[componentId]);
}

public void setByte(int index, int value) {
    int componentId = componentId(index);
    components[componentId].setByte(index - indices[componentId], value);
}

}</pre>

查找componentId的算法再次不作介紹了,大家自己實現起來也不會太難。值得一提的是,基于ChannelBuffer連續讀寫的特性,使用了順序查找(而不是二分查找),并且用 lastAccessedComponentId 來進行緩存。

ByteBufferBackedChannelBuffer

前面說ChannelBuffer是自己的實現的,其實只說對了一半。 ByteBufferBackedChannelBuffer 就是封裝了NIO ByteBuffer的類,用于實現堆外內存的Buffer(使用NIO的 DirectByteBuffer )。當然,其實它也可以放其他的ByteBuffer的實現類。代碼實現就不說了,也沒啥可說的。

WrappedChannelBuffer

WrappedChannelBuffer 都是幾個對已有ChannelBuffer進行包裝,完成特定功能的類。代碼不貼了,實現都比較簡單,列一下功能吧。

類名 入口 功能
SlicedChannelBuffer ChannelBuffer.slice()ChannelBuffer.slice(int,int) 某個ChannelBuffer的一部分
TruncatedChannelBuffer ChannelBuffer.slice()ChannelBuffer.slice(int,int) 某個ChannelBuffer的一部分, 可以理解為其實位置為0的SlicedChannelBuffer
DuplicatedChannelBuffer ChannelBuffer.duplicate() 與某個ChannelBuffer使用同樣的存儲, 區別是有自己的index
ReadOnlyChannelBuffer ChannelBuffers.unmodifiableBuffer(ChannelBuffer) 只讀,你懂的

可以看到,關于實現方面,Netty 3.7的buffer相關內容還是比較簡單的,也沒有太多費腦細胞的地方。

而Netty 4.0之后就不同了。4.0,ChannelBuffer改名ByteBuf,成了單獨項目buffer,并且為了性能優化,加入了BufferPool之類的機制,已經變得比較復雜了(本質倒沒怎么變)。性能優化是個很復雜的事情,研究源碼時,建議先避開這些東西,除非你對算法情有獨鐘。舉個例子,Netty4.0里為了優化,將Map換成了Java 8里6000行的 ConcurrentHashMapV8 ,你們感受一下…

下篇文章我們開始講Channel。

參考資料:

Netty那點事系列文章索引

 本文由用戶 Alice2539 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!