Kafka消息存儲概覽

yhsj0976 8年前發布 | 32K 次閱讀 消息系統 Apache Kafka

摘要

Kafka作為一個消息中間件系統,面臨的首要問題就是消息如何持久化,如何方便地進行讀寫和解析。本文將就Kafka的消息存儲問題開一個頭,后續將會對重要的代碼部分一一講解。Kafka的消息概念,首先我們在此談論的不是網絡傳遞中的消息,而更多偏向于記錄的意思,也就是消費者和生產者所在意的實際對象。消息是Kafka造作的最小單元,并不允許更改消息的實際內容,一條消息本質上是一個鍵值可缺省的鍵值對。

消息格式

下面首先以Kafka 0.10.0版本為例來解釋一下其消息格式:

CRC+magic+attributes+wrapperTimestamp(optional)+key(長度+內容)+payload(長度+內容)

下面依次列出每一部分

* 1. 4 byte CRC32 校檢值

    1. 1 byte "magic" 標識符來顯示消息格式是否發生了改動,值為0/1(也可以看作是版本號)
    1. 1 byte "attributes" 標識符,包含以下內容:
  • bit 0 ~ 2 : 壓縮編碼方式
  • 0 : no compression
  • 1 : gzip
  • 2 : snappy
  • 3 : lz4
  • bit 3 : 時間戳類型
  • 0 : create time
  • 1 : log append time
  • bit 4 ~ 7 : 保留部分
    1. (可選) 8 byte 時間戳,只有magic為1時才攜帶該部分
    1. 4 byte key length, 指定Key部分的長度
    1. K byte key
    1. 4 byte payload length, 指定值的長度
    1. V byte payload</code></pre>

      Kafka的消息格式在設計上允許多重嵌套,這種嵌套是通過壓縮實現的。試想一下,某個消息的key為空,然后它的value部分是一個壓縮后的MessageSet,那么經過解壓縮并讀取后它就是一個鍵值對集合了,這有些類似于json了。但實際上Kafka的消息只允許二重嵌套,這并非由其消息格式的局限性決定,而是考慮到讀取消息時解析的復雜度決定。嵌套消息使得Kafka傳遞復雜類型的對象成為可能,但是出于性能因素的考慮,對象序列化和過于復雜的數據格式并不適合消息系統這一業務,或者說kafka在性能方面和表達能力上做了一個漂亮的妥協。

      同時,我們還要重點來談一談時間戳相關,時間戳有三種取值,分別是-1代表不帶時間戳,0代表該消息創建的時間,1代表它持久化時間(也可以理解為入庫被kafka處理的時間)。對于嵌套的消息來說,若我們選擇時間戳類型為入庫時間,則被壓縮消息的時間戳和其外層消息一致;若我們選擇時間戳類型為創建時間,則應該從字節碼流中讀取;若magic值為0.則不管如何,都應該認為時間戳為-1,時間戳類型為CREATE_TIME。

      Message類的代碼設計

      雖然看起來消息格式好像比較簡單,但實際上代碼卻相對有些復雜,最重要的問題是1、要兼容magic為0的情況,2、要能為后續的版本升級留出擴展。我們首先思考一下,Message類應該具有哪些功能呢大致分為下面幾個部分:

      預定義的變量

      由于操作的是bytes,大量的取值需要位操作,所以我們應該預定義好一些位操作輔助變量和一些重要的偏移位置。這里面有幾個比較重要的預定義變量需要著重強調一下:

      • key length byte的位置,因為它是頭和體的分割點

      • 從attributs byte讀取時間戳類型和壓縮編碼方式的輔助變量

      下面上代碼

      /**
    2. Specifies the mask for the compression code. 3 bits to hold the compression codec.
    3. 0 is reserved to indicate no compression */ val CompressionCodeMask: Int = 0x07 /**
    4. Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
    5. 0 for CreateTime, 1 for LogAppendTime */ val TimestampTypeMask: Byte = 0x08 val TimestampTypeAttributeBitOffset: Int = 3
public byte updateAttributes(byte attributes) {
    return this == CREATE_TIME ?
        (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
}

public static TimestampType forAttributes(byte attributes) {
    int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
    return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
}


def compressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)</code></pre>

各個屬性的get方法

這個不用多說,需要注意的就是magic的值不同,有些取值的偏移位置不一樣,所以需要事先寫好靜態方法快捷獲得不同magic下的位置偏移。

合法性檢查

主要檢查以下幾個方面:

  • magic值和時間戳類型和時間戳值的組合是否一致

  • CRC 校檢是否通過

不同magic值下的message相互轉換

1. 計算新message需要的空間大小并分配

  1. 寫入新的magic值
  2. 取原來的attribute并用設置的TimestampType更新它,然后寫入attribute值
  3. 若是0->1則寫入時間戳     5.寫入原來的消息體 6.計算新的CRC值并填充</code></pre>

    一系列的構造方法

    主要的構造途徑有兩條:

    • 主要用于構造嵌套消息,直接傳入buffer數據,可以設定時間戳和時間戳類型

    • 主要用于構造原子消息,傳入鍵值對數據,并設定主要參數(magic、壓縮編碼、時間戳類型、時間戳)

    消息相關的主要類

    下面就依次介紹每個類的作用

    • MessageAndOffset:在Message之外附加它處于set中的偏移

    • MessageAndMeta:主要功能是包裝解碼器將message解碼為Key和Value對象

    • MessageSet:管理message集合,事先集合的順序讀和批量寫,但個人認為其代碼的重心在于如何解決消息嵌套的解析

    • ByteBufferMessageSet:用ByteBuffer來存儲序列的Message,主要為了方便讀Message的操作

    • ByteBufferBackedInputStream:將buffer的讀寫包裝成流的模式

     

    來自:https://segmentfault.com/a/1190000006875889

     

 本文由用戶 yhsj0976 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!