Kafka消息存儲概覽
摘要
Kafka作為一個消息中間件系統,面臨的首要問題就是消息如何持久化,如何方便地進行讀寫和解析。本文將就Kafka的消息存儲問題開一個頭,后續將會對重要的代碼部分一一講解。Kafka的消息概念,首先我們在此談論的不是網絡傳遞中的消息,而更多偏向于記錄的意思,也就是消費者和生產者所在意的實際對象。消息是Kafka造作的最小單元,并不允許更改消息的實際內容,一條消息本質上是一個鍵值可缺省的鍵值對。
消息格式
下面首先以Kafka 0.10.0版本為例來解釋一下其消息格式:
CRC+magic+attributes+wrapperTimestamp(optional)+key(長度+內容)+payload(長度+內容)
下面依次列出每一部分
* 1. 4 byte CRC32 校檢值
- 1 byte "magic" 標識符來顯示消息格式是否發生了改動,值為0/1(也可以看作是版本號)
- 1 byte "attributes" 標識符,包含以下內容:
- bit 0 ~ 2 : 壓縮編碼方式
- 0 : no compression
- 1 : gzip
- 2 : snappy
- 3 : lz4
- bit 3 : 時間戳類型
- 0 : create time
- 1 : log append time
- bit 4 ~ 7 : 保留部分
- (可選) 8 byte 時間戳,只有magic為1時才攜帶該部分
- 4 byte key length, 指定Key部分的長度
- K byte key
- 4 byte payload length, 指定值的長度
- V byte payload</code></pre>
Kafka的消息格式在設計上允許多重嵌套,這種嵌套是通過壓縮實現的。試想一下,某個消息的key為空,然后它的value部分是一個壓縮后的MessageSet,那么經過解壓縮并讀取后它就是一個鍵值對集合了,這有些類似于json了。但實際上Kafka的消息只允許二重嵌套,這并非由其消息格式的局限性決定,而是考慮到讀取消息時解析的復雜度決定。嵌套消息使得Kafka傳遞復雜類型的對象成為可能,但是出于性能因素的考慮,對象序列化和過于復雜的數據格式并不適合消息系統這一業務,或者說kafka在性能方面和表達能力上做了一個漂亮的妥協。
同時,我們還要重點來談一談時間戳相關,時間戳有三種取值,分別是-1代表不帶時間戳,0代表該消息創建的時間,1代表它持久化時間(也可以理解為入庫被kafka處理的時間)。對于嵌套的消息來說,若我們選擇時間戳類型為入庫時間,則被壓縮消息的時間戳和其外層消息一致;若我們選擇時間戳類型為創建時間,則應該從字節碼流中讀取;若magic值為0.則不管如何,都應該認為時間戳為-1,時間戳類型為CREATE_TIME。
Message類的代碼設計
雖然看起來消息格式好像比較簡單,但實際上代碼卻相對有些復雜,最重要的問題是1、要兼容magic為0的情況,2、要能為后續的版本升級留出擴展。我們首先思考一下,Message類應該具有哪些功能呢大致分為下面幾個部分:
預定義的變量
由于操作的是bytes,大量的取值需要位操作,所以我們應該預定義好一些位操作輔助變量和一些重要的偏移位置。這里面有幾個比較重要的預定義變量需要著重強調一下:
-
key length byte的位置,因為它是頭和體的分割點
-
從attributs byte讀取時間戳類型和壓縮編碼方式的輔助變量
下面上代碼
/**
- Specifies the mask for the compression code. 3 bits to hold the compression codec.
- 0 is reserved to indicate no compression
*/
val CompressionCodeMask: Int = 0x07
/**
- Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
- 0 for CreateTime, 1 for LogAppendTime
*/
val TimestampTypeMask: Byte = 0x08
val TimestampTypeAttributeBitOffset: Int = 3
public byte updateAttributes(byte attributes) {
return this == CREATE_TIME ?
(byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
}
public static TimestampType forAttributes(byte attributes) {
int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
}
def compressionCodec: CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)</code></pre>
各個屬性的get方法
這個不用多說,需要注意的就是magic的值不同,有些取值的偏移位置不一樣,所以需要事先寫好靜態方法快捷獲得不同magic下的位置偏移。
合法性檢查
主要檢查以下幾個方面:
-
magic值和時間戳類型和時間戳值的組合是否一致
-
CRC 校檢是否通過
不同magic值下的message相互轉換
1. 計算新message需要的空間大小并分配
- 寫入新的magic值
- 取原來的attribute并用設置的TimestampType更新它,然后寫入attribute值
- 若是0->1則寫入時間戳
5.寫入原來的消息體
6.計算新的CRC值并填充</code></pre>
一系列的構造方法
主要的構造途徑有兩條:
-
主要用于構造嵌套消息,直接傳入buffer數據,可以設定時間戳和時間戳類型
-
主要用于構造原子消息,傳入鍵值對數據,并設定主要參數(magic、壓縮編碼、時間戳類型、時間戳)
消息相關的主要類

下面就依次介紹每個類的作用
-
MessageAndOffset:在Message之外附加它處于set中的偏移
-
MessageAndMeta:主要功能是包裝解碼器將message解碼為Key和Value對象
-
MessageSet:管理message集合,事先集合的順序讀和批量寫,但個人認為其代碼的重心在于如何解決消息嵌套的解析
-
ByteBufferMessageSet:用ByteBuffer來存儲序列的Message,主要為了方便讀Message的操作
-
ByteBufferBackedInputStream:將buffer的讀寫包裝成流的模式
來自:https://segmentfault.com/a/1190000006875889