Kafka通訊協議指南
官方英文版本: A Guide To The Kafka Protocol
中文翻譯: watchword 翻譯于2016年1月31日,修改于6月17日,基于原文2016年5月5日修改版本(v.106)修改翻譯: Kafka通訊協議指南
smallnest 基于原文 Jan 20, 2017版本修改。
如果想深入了解Kafka的通訊協議的話,這篇文章不可不讀。感謝 watchword 將原文翻譯成了中文,我基于最新版進行了修訂,修訂和完善翻譯中的錯誤。
簡介
此文檔涵蓋了Kafka 0.8及之后版本實現的通訊協議。其目的是提供一個易讀的協議文檔, 包含可用的請求API及其二進制格式, 以及如何正確使用他們來實現一個客戶端。本文假設您已經了解了Kafka基本的設計以及 術語 。
0.7 和更早的版本所使用的協議與此類似,但我們(希望)通過一次性地斬斷兼容性,以便清理原有設計上的沉疴,并且泛化一些概念。
概述
Kafka協議是相當簡單的,只有六種核心的客戶端請求的API:
- 元數據(Metadata) – 描述可用的brokers,包括他們的主機和端口信息,并給出了每個broker上分別存有哪些分區Partition;
- 發送(Send) – 發送消息到broker;
- 獲取(Fetch) – 從broker獲取消息,其中,一個獲取數據,一個獲取集群的元數據,還有一個獲取topic的偏移量信息;
- 偏移量(Offsets) – 獲取給定topic的分區的可用偏移量信息;
- 偏移量提交(Offset Commit) – 提交消費者組(Consumer Group)的一組偏移量;
- 偏移量獲取(Offset Fetch) – 獲取一個消費者組的一組偏移量;
每一種API都將在下面詳細說明。此外,從0.9版本開始,Kafka支持為消費者和Kafka Connect提供通用的分組管理。為此客戶端API又提供五個請求:
- 分組協調者(GroupCoordinator) – 用來定位一個分組當前的協調者。
- 加入分組(JoinGroup) – 成為某一個分組的一個成員,當分組不存在(沒有一個成員時)創建分組。
- 同步分組(SyncGroup) – 同步分組中所有成員的狀態(例如分發分區分配信息(Partition Assignments)到各個組員)。
- 心跳(Heartbeat) – 保持組內成員的活躍狀態。
- 離開分組(LeaveGroup) – 直接離開一個組。
最后,有幾個管理API,可用于監控/管理的Kafka集群( KIP-4 完成時,這個列表將變長):
- 描述消費者組(DescribeGroups) – 用于檢查一組群體的當前狀態(如:查看消費者分區分配)。
- 列出組(ListGroups) – 列出某一個broker當前管理的所有組
開場白
網絡
Kafka使用基于TCP的二進制協議。該協議定義了所有API的請求及響應消息。所有消息都是通過長度來分隔,并且由后面描述的基本類型組成。
客戶端初始化一個socket連接,并且寫入請求的消息序列和讀回相應的響應消息。連接和斷開時均不需要握手消息。如果保持長連接,那么TCP協議本身將會節省很多TCP握手時間,但如果真的重新建立連接,那么代價也相當小。
客戶可能需要維持到多個broker的連接,因為數據是被分區的,而客戶端需要和存儲這些分區的broker服務器進行通訊。當然,一般而言,不需要為單個服務端和單個客戶端間維護多個連接(即連接池技術)。
服務器保證單一的TCP連接中,請求將被順序處理,響應也將按該順序返回。為保證broker的處理請求的順序,單個連接同時也只會處理一個請求指令。請注意,客戶端可以(也應該)使用非阻塞IO實現請求流水線,從而實現更高的吞吐量。也就是說,客戶可以在等待上次請求應答的同時發送下個請求,因為待完成的請求將會在底層操作系統套接字緩沖區進行緩沖。除非特別說明,所有的請求是由客戶端啟動,并從服務器獲取到相應的響應消息。
服務器能夠配置請求大小的最大限制,超過這個限制將導致socket連接被斷開。
分區和引導(Partitioning and bootstrapping)
Kafka是一個分區系統,所以不是所有的服務器都有完整的數據集。Topic被分為P(預先定義的分區數量)個分區,每個分區被復制N(復制因子)份,Topic Partition根據順序在“提交日志”中編號為0,1,…,P。
所有具有這種特性的系統都有一個如何制定某個特定數據應該被分配給哪個特定的分區的問題。Kafka中它由客戶端直接控制分配策略,broker則沒有特別的語義來決定消息發布到哪個分區。相反,生產者直接將消息發送到一個特定的分區,獲取消息時,消費者也直接從某個特定的分區獲取。如果兩個生產者要使用相同的分區方案,那么他們必須用同樣的方法來計算Key到分區映射關系。
這些發布或獲取數據的請求必須發送到指定分區的 leader broker中。此條件同時也會由broker檢查,發送到不正確的broker的請求將會返回NotLeaderForPartition 錯誤代碼(后文所描述的)。
那么客戶端如何找出存在的topic呢,這些topic有哪些分區,以及這些分區被哪些broker保存,以便它可以直接將請求發送到正確的主機上?這個信息是動態的,因此你不能只是為每個客戶端配置一些靜態映射。相反所有的Kafka broker都可以回答描述集群當前狀態的數據請求:有哪些topic,這些topic都有多少分區,哪個broker是這些分區的Leader,以及這些broker主機的地址和端口信息。
換句話說,客戶端只需要找到一個broker,broker將會告知客戶端所有其他存在的broker,以及這些broker上面的所有分區。這個broker本身也可能會掉線,因此客戶端實現的最佳做法是保存兩三個broker地址。用戶可以選擇使用負載均衡器或只是靜態地配置兩個或三個客戶的Kafka主機。
客戶并不需要輪詢地查看集群是否已經改變;它可以獲取元數據一次然后緩存起來,等到它接收到所用的元數據過期的錯誤信息時再更新元數據。這種錯誤有兩種形式:(1)套接字錯誤指示客戶端不能與特定的broker進行通信,
(2)請求響應表明該broker不再是其請求數據分區的Leader的錯誤。
輪詢“初始”Kafka的URL列表,直到我們找到一個我們可以連接到的broker,獲取集群元數據。
處理獲取數據或者生產消息請求,根據這些請求所發送的topic和分區,將這些請求發送到合適的broker。
如果我們得到一個合適的錯誤(顯示元數據已經過期時),刷新元數據,然后再試一次。
分區策略(Partitioning Strategies
上面提到消息的分區分配是由生產者客戶端控制,那么,這個功能是如何暴露給最終用戶的?
在Kafka中,分區有兩個目的:
- 它平衡了broker的數據和請求的負載
- 它允許多個消費者之間處理分發消息的同時,能夠維護本地狀態,并且在分區中維持消息的順序。我們稱之為語義的分區(semantic partitioning)。
對于給定的使用場景下,你可能只關心其中的一個或兩個。
為了實現簡單的負載均衡,一個簡單的策略是客戶端發布消息是對所有broker進行輪詢請求(round robin requests)。另一種選擇,在那些生產者比消費者多的場景下,給每個客戶機隨機選擇一個分區并發布消息到該分區。后一種的策略能夠使用少得多的TCP連接。
語義分區是指使用消息中的 key 來決定分配的分區。例如,如果你正在處理一個點擊消息流時,你可能想通過用戶ID來劃分流,使得特定用戶的所有數據會被單個消費者消費。要做到這一點,客戶端可以采取與消息相關聯的key,并使用key的某種Hash值來選擇要發送的分區。
批處理(Batching)
我們的API鼓勵將小的請求批量處理以提高效率。我們發現這能非常顯著地提升性能。我們兩個用來發送消息和獲取消息的API,總是鼓勵處理一連串的消息,而不是單一的消息。聰明的客戶端可以利用這一點,并支持“異步”操作模式,以此進行批處理哪些單獨發送的消息,并把它們以較大的塊進行發送。我們可以進一步允許跨多個主題和分區的批處理,所以生產請求可能包含追加到許多分區的數據,一個讀取請求可以一次性從多個分區提取數據的。
當然,如果他們喜歡,客戶端實現者可以選擇忽略這一點,所有消息一次都發送一個。
版本和兼容性(Versioning and Compatibility)
該協議的目的要達到在向后兼容的基礎上漸進演化。我們的版本是基于每個API基礎之上,每個版本包括一個請求和響應對。每個請求包含API Key,里面包含了被調用的API標識,以及表示這些請求和響應格式的版本號。
這樣做的目的是允許客戶端實現相應特定版本的請求,在請求中標志版本信息。目標主要是為了在不允許停機的環境下進行更新,這種環境下,客戶端和服務器不能一次性都切換所使用的API。
服務器會拒絕它不支持的版本的請求,并始終返回它期望收到的能夠完成請求響應的版本的協議格式。建議的升級路徑方式是,新功能將首先部署到服務器(老客戶端無法完全利用他們的新功能),然后隨著新的客戶端的部署,這些新功能將逐步被利用。
目前,所有版本從0開始,當我們介紹這些API時,我們將分別顯示每個版本的格式。
通訊協議(The Protocol)
基本數據類型(Protocol Primitive Types)
該協議是建立在下列基本類型之上。
-
定長基本類型(Fixed Width Primitives)
int8, int16, int32, int64 – 不同精度(以bit數區分)的帶符號整數,以大端(Big Endiam)方式存儲.
-
變長基本類型(Variable Length Primitives)
bytes, string – 這些類型由一個表示長度的帶符號整數N以及后續N字節的內容組成。長度如果為-1表示空(null). string 使用int16表示長度,bytes使用int32.
-
數組(Arrays)
這個類型用來處理重復的結構體數據。他們總是由一個代表元素個數int32整數N,以及后續的N個重復結構體組成,這些結構體自身是有其他的基本數據類型組成。我們后面會用BNF語法展示一個foo的結構體數組[foo]
請求格式語法要點
后面的 BNF 明確地以上下文無關的語法展示了請求和響應的二進制格式。每個API都會一起給出請求和響應定義,以及所有的子定義(sub-definitions)。BNF使用沒有經過縮寫的便于閱讀的名稱(比如我使用一個符號化了的名稱來定義了一個production 錯誤碼,即便它只是int16整數)。一般在BNF中,一個production序列表示一個連接,所以下面給出的MetadataRequest將是一個含有VersionId,然后clientId,然后TopicNames的數組(每一個都有其自身的定義)。自定義類型一般使用駝峰法拼寫,基本類型使用全小寫方式拼寫。當存在多中可能的自定義類型時,使用’|’符號分割,并且用括號表示分組。頂級定義不縮進,后續的子部分會被縮進。
通用的請求和響應格式
所有請求和響應都從以下語法為基礎,其余的會在本文剩下部分中進行增量描述:
RequestOrResponse => Size (RequestMessage | ResponseMessage) Size => int32
域(FIELD) | 描述 |
---|---|
MessageSize | MessageSize 域給出了后續請求或響應消息的字節(bytes)長度。客戶端可以先讀取4字節的長度N,然后讀取并解析后續的N字節請求內容。 |
請求(Requests)
所有請求都具有以下格式:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
域(FIELD) | 描述 |
---|---|
ApiKey | 這是一個表示所調用的API的數字id(即它表示是一個元數據請求?生產請求?獲取請求等). |
ApiVersion | 這是該API的一個數字版本號。我們為每個API定義一個版本號,該版本號允許服務器根據版本號正確地解釋請求內容。響應消息也始終對應于所述請求的版本的格式。 |
CorrelationId | 這是一個用戶提供的整數。它將會被服務器原封不動地回傳給客戶端。用于匹配客戶機和服務器之間的請求和響應。 |
ClientId | 這是為客戶端應用程序的自定義的標識。用戶可以使用他們喜歡的任何標識符,他們會被用在記錄錯誤時,監測統計信息等場景。例如,你可能不僅想要監視每秒的總體請求,還要根據客戶端應用程序進行監視,那它就可以被用上(其中每一個都將駐留在多個服務器上)。這個ID作為特定的客戶端對所有的請求的邏輯分組。 |
下面我們就來描述各種請求和響應消息。
響應(Responses)
Response => CorrelationId ResponseMessageCorrelationId => int32ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
域(FIELD) | 描述 |
---|---|
CorrelationId | 服務器傳回給客戶端它所提供用作關聯請求和響應消息的整數。 |
所有響應都是與請求成對匹配(例如,我們將發送回一個元數據請求,會得到一個元數據響應)。
消息集(Message sets)
生產和獲取消息指令請求共享同一個消息集結構。在Kafka中,消息是由一個鍵值對以及少量相關的元數據組成。消息只是一個有偏移量和大小信息的消息序列。這種格式正好即用于在broker上的磁盤上存儲,也用在線上數據交換。
消息集也是Kafka中的壓縮單元,我們也允許消息遞歸包含壓縮消息從而允許批量壓縮。
注意, 在通訊協議中,消息集之前沒有類似的其他數組元素的int32。
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32
消息格式
v0Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes Value => bytes v1 (supported since 0.10.0)Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Timestamp => int64 Key => bytes Value => bytes
域(FIELD) | 描述 |
---|---|
Offset | 這是在Kafka中作為日志序列號使用的偏移量。當生產者發送非壓縮消息,這時候它可以填寫任意值。當生產者發送壓縮消息,為了避免服務端重新壓縮,每個壓縮消息的內部消息的偏移量應該從0開始,然后依次增加(kafka壓縮消息詳見后面的描述)。 |
Crc | Crc是的剩余消息字節的CRC32值。broker和消費者可用來檢查信息的完整性。 |
MagicByte | 這是一個用于允許消息二進制格式的向后兼容演化的版本id。當前值是0。 |
Attributes | 這個字節保存有關信息的元數據屬性。最低的3位包含用于消息的壓縮編解碼器。第四位表示時間戳類型,0代表CreateTime,1代表LogAppendTime。生產者必須把這個位設成0。所有其他位必須被設置為0。 |
Timestamp | 消息的時間戳。時間戳類型在Attributes域中體現。單位為從UTC標準準時間1970年1月1日0點到所在時間的毫秒數。 |
Key | Key是一個可選項,它主要用來進行指派分區。Key可以為null。 |
Value | Value是消息的實際內容,類型是字節數組。Kafka支持本身遞歸包含,因此本身也可能是一個消息集。消息可以為null。 |
壓縮(Compression)
Kafka支持壓縮多條消息以提高效率,當然,這比壓縮一條原始消息要來得復雜。因為單個消息可能沒有足夠的冗余信息以達到良好的壓縮比,壓縮的多條信息必須以特殊方式批量發送(當然,如果真的需要的話,你可以自己壓縮批處理的一個消息)。要被發送的消息被包裝(未壓縮)在一個MessageSet結構中,然后將其壓縮并存儲在一個單一的“消息”中,一起保存的還有相應的壓縮編解碼集。接收系統通過解壓縮得到實際的消息集。外層MessageSet應該只包含一個壓縮的“消息”(詳情見 Kafka-1718 )。
Kafka目前支持一下兩種壓縮算法:
壓縮算法(COMPRESSION) | 編碼器編號(CODEC) |
---|---|
None | 0 |
GZIP | 1 |
Snappy | 2 |
接口 (API)
本節將給出每個API的用法、二進制格式,以及它們的字段的含義的細節。
元數據接口(Metadata API)
這個API回答下列問題:
- 存在哪些Topic?
- 每個主題有幾個分區(Partition)?
- 每個分區的Leader分別是哪個broker?
- 這些broker的地址和端口分別是什么?
這是唯一一個能發往集群中任意一個broker的請求消息。
因為可能有很多topic,客戶端可以給一個的可選topic列表,以便只返回一組topic元數據。
返回的元數據信息是分區級別的信息,為了方便和以避免冗余,以topic為組集中在一起。每個分區的元數據中包含了leader以及所有副本以及正在同步的副本的信息。
注意: 如果broker配置中設置了”auto.create.topics.enable”, topic元數據請求將會以默認的復制因子和默認的分區數為參數創建topic。
topic元數據請求(Topic Metadata Request)
TopicMetadataRequest => [TopicName] TopicName => string
域(FIELD | ) 描述 |
---|---|
TopicName | 要獲取元數據的主題數組。 如果為空,就返回所有主題的元數據 |
元數據響應(Metadata Response)
響應包含的每個分區的元數據,這些分區元數據以topic為組組裝在一起。該元數據以broker id來指向具體的broker。每個broker有一個地址和端口。
MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32]
域(FIELD) | 描述 |
---|---|
Leader | 該分區作為Leader節點的Kafka broker id。如果在一個Leader選舉過程中,沒有Leader存在,這個id將是-1。 |
Replicas | 該分區中,其他活著的作為slave的節點集合。 |
Isr | 副本集合中,所有處在與Leader跟隨(“caught up”,表示數據已經完全復制到這些節點)狀態的子集 |
Broker | kafka broker節點的id, 主機名, 端口信息 |
可能的錯誤碼(Possible Error Codes)
- UnknownTopic (3)
- LeaderNotAvailable (5)
- InvalidTopic (17)
- TopicAuthorizationFailed (29)
生產接口(Produce API)
生產者API用于將消息集發送到服務器。為了提高效率,它允許在單個請求中發送多個不同topic的不同分區的消息。
生產者API使用通用的消息集格式,但由于發送時還沒有被分配偏移量,因此可以任意填寫該值。
Produce Request
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32
v1及以后版本表示客戶端可以在response中解析quota throttle time。
v2及以后版本表示客戶端可以解析response中的時間戳域。
域(FIELD) | 描述 |
---|---|
RequiredAcks | 這個值表示服務端收到多少確認后才發送反饋消息給客戶端。如果設置為0,那么服務端將不發送response(這是唯一的服務端不發送response的情況)。如果這個值為1,那么服務器將等到數據寫入到本地日之后發送response。如果這個值是-1,那么服務端將阻塞,知道這個消息被所有的同步副本寫入后再發送response。 |
Timeout | 這個值提供了以毫秒為單位的超時時間,服務器可以在這個時間內可以等待接收所需的Ack確認的數目。超時并非一個確切的限制,有以下原因:(1)不包括網絡延遲,(2)計時器開始在這一請求的處理開始,所以如果有很多請求,由于服務器負載而導致的排隊等待時間將不被包括在內,(3)如果本地寫入時間超過超時,我們將不會終止本地寫操作,這樣這個超時時間就不會得到遵守。要使硬超時時間,客戶端應該使用套接字超時。 |
TopicName | 該數據將會發布到的topic名稱 |
Partition | 該數據將會發布到的分區 |
MessageSetSize | 后續消息集的長度,字節為單位 |
MessageSet | 上面描述的標準格式的消息集合 |
Produce Response
v0ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 v1 (supported in 0.9.0 or later)ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 ThrottleTime => int32 v2 (supported in 0.10.0 or later)ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 Timestamp => int64 ThrottleTime => int32
域 | 描述 |
---|---|
Topic | 此響應對應的主題。 |
Partition | 此響應對應的分區。 |
ErrorCode | 如果有,此分區對應的錯誤信息。錯誤以分區為單位提供,因為可能存在給定的分區不可用或者被其他的主機維護(非Leader),但是其他的分區的請求操作成功的情況 |
Offset | 追加到該分區的消息集中的分配給第一個消息的偏移量。 |
Timestamp | 如果該主題使用了LogAppendTime,這個時間戳就是broker分配給這個消息集。這個消息集中的所有消息都有相同的時間戳。如果使用的是CreateTime,這個域始終是-1。如果沒有返回錯誤碼,生產者可以假定消息的時間戳已經被broker接受。單位為從UTC標準準時間1970年1月1日0點到所在時間的毫秒數。 |
ThrottleTime | 由于限額沖突而導致的時間延遲長度,以毫秒為單位。(如果沒有違反限額條件,此值為0) |
可能的錯誤碼(Possible Error Codes):( TODO)
獲取消息接口(Fetch API)
獲取消息接口用于獲取一些topic分區的一個或多個的日志塊。邏輯上根據指定topic,分區和消息起始偏移量開始獲取一批消息。在一般情況下,返回消息的偏移量將大于或等于開始偏移量。然而,如果是壓縮消息,有可能返回的消息的偏移量比起始偏移量小。這類的消息的數量通常較少,并且調用者必須負責過濾掉這些消息。
獲取數據指令請求遵循一個長輪詢模型,如果沒有足夠數量的消息可用,它們可以阻塞一段時間。
作為優化,服務器被允許在消息集的末尾返回一個消息的一部分。客戶端應處理這種情況。
有一點要注意的是,獲取消息API需要指定消費的分區。現在的問題是如何讓消費者知道消費哪個分區?特別地,作為一組消費者,如何使得每個消費者獲取分區的一個子集,并且平衡這些分區。我們使用zookeeper動態地為Scala和Java客戶端完成這個任務。這種方法的缺點是,它需要一個相當胖的客戶端并且需要客戶端與zookeeper連接。我們尚未創建一個Kafka接口(API),允許該功能被移動到在服務器端并被更方便地訪問。一個簡單的消費者的客戶端可以通過配置指定訪問的分區,但這樣將不能在某些消費者失效后做到分區的動態重新分配。我們希望能在下一個主要版本解決這一空白。
Fetch Request
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32
域 | 描述 |
---|---|
ReplicaId | 副本ID的是發起這個請求的副本節點ID。普通消費者客戶端應該始終將其指定為-1,因為他們沒有節點ID。其他broker設置他們自己的節點ID。基于調試目的,以非代理身份模擬副本broker發出獲取數據指令請求時,這個值填-2。 |
MaxWaitTime | 如果沒有足夠的數據可發送時,最大阻塞等待時間,以毫秒為單位。 |
MinBytes | 返回響應消息的最小字節數目,必須設置。如果客戶端將此值設為0,服務器將會立即返回,但如果沒有新的數據,服務端會返回一個空消息集。如果它被設置為1,則服務器將在至少一個分區收到一個字節的數據的情況下立即返回,或者等到超時時間達到。通過設置較高的值,結合超時設置,消費者可以在犧牲一點實時性能的情況下通過一次讀取較大的字節的數據塊從而提高的吞吐量(例如,設置MaxWaitTime至100毫秒,設置MinBytes為64K,將允許服務器累積數據達到64K前等待長達100ms再響應)。 |
TopicName | topic名稱 |
Partition | 獲取數據的Partition id |
FetchOffset | 獲取數據的起始偏移量 |
MaxBytes | 此分區返回消息集所能包含的最大字節數。這有助于限制響應消息的大小。 |
Fetch Response
v0FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32 v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32
域 | 描述 |
---|---|
ThrottleTime | 由于限額沖突而導致的時間延遲長度,以毫秒為單位。(如果沒有違反限額條件,此值為0) |
TopicName | 返回消息所對應的Topic名稱。 |
Partition | 返回消息所對應的分區id。 |
HighwaterMarkOffset | 此分區日志中最末尾的偏移量。此信息可被客戶端用來確定后面還有多少條消息。 |
MessageSetSize | 此分區中消息集的字節長度 |
MessageSet | 此分區獲取到的消息集,格式與之前描述相同 |
v1只會包含v0格式的消息.
v2可能即包含v0又包含v1版本格式的消息.
可能的錯誤碼(Possible Error Codes)
- OFFSET_OUT_OF_RANGE (1)
- UNKNOWN_TOPIC_OR_PARTITION (3)
- NOT_LEADER_FOR_PARTITION (6)
- REPLICA_NOT_AVAILABLE (9)
- UNKNOWN (-1)
偏移量接口(又稱ListOffset)(Offset API)
此API描述了一組topic分區的偏移量有效范圍。生產者和獲取數據API的請求必須發送到分區Leader所在的broker上,這需要通過使用元數據的API來確定。
自版本0, response包含請求的分區的起始偏移量以及“log end offset”,即,將被追加到給定分區中的下一個消息的偏移量。kafka 0.10.1.0開始支持版本1, 它開始支持根據時間戳進行基于時間索引的查找,API做了一點改變來支持這個特性。注意這個API只支持開啟 0.10消息格式的topic,否則返回UNSUPPORTED_FOR_MESSAGE_FORMAT錯誤。
Offset Request
// v0ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 MaxNumberOfOffsets => int32 // v1 (supported in 0.10.1.0 and later)ListOffsetRequest => ReplicaId [TopicName [Partition Time]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64
域 | 描述 |
---|---|
Time | 用來請求一定時間(毫秒)前的所有消息。這里有兩個特殊取值:-1表示獲取最后一個offset(也就是后面即將到來消息的offset值); -2表示獲取最早的有效偏移量。注意,因為獲取到偏移值都是降序排序,因此請求最早Offset的請求將總是返回一個值 |
Offset Response
// v0OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64 // v1ListOffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode Timestamp [Offset] Partition => int32 ErrorCode => int16 Timestamp => int64 Offset => int64
可能的錯誤碼(Possible Error Codes)
- UNKNOWN_TOPIC_OR_PARTITION (3)
- NOT_LEADER_FOR_PARTITION (6)
- UNKNOWN (-1)
- UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
偏移量提交/獲取接口(Offset Commit/Fetch API)
這些API使得偏移量的能夠集中管理。了解更多 偏移量管理 。按照 Kafka-993 的評論,直到Kafka 0.8.1.1,這些API調用無法全部正常使用,他們這將在0.8.2版本中提供。
消費者組協調員請求(Group Coordinator Request)
消費者組(Consumer Group)偏移量信息,由一個特定的broker維護,這個broker稱為消費者組協調員。即消費者需要向從這個特定的broker提交和獲取偏移量。可以通過發出一組協調員發現請求從而獲得當前協調員信息。
GroupCoordinatorRequest => GroupId GroupId => string
消費者組協調員響應(Group Coordinator Response)
GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string CoordinatorPort => int32
可能的錯誤碼(Possible Error Codes)
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- GROUP_AUTHORIZATION_FAILED (30)
偏移量提交請求(Offset Commit Request)
v0 (supported in 0.8.1 or later)OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]] ConsumerGroupId => string TopicName => string Partition => int32 Offset => int64 Metadata => string v1 (supported in 0.8.2 or later)OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string TopicName => string Partition => int32 Offset => int64 TimeStamp => int64 Metadata => string v2 (supported in 0.9.0 or later)OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string RetentionTime => int64 TopicName => string Partition => int32 Offset => int64 Metadata => string
在V0和v1版本中,每個分區的時間戳作為提交時間戳定義,偏移量協調員將保存消費者所提交的偏移量,直到當前時間超過提交時間戳+偏移量保留時間,此偏移量保留時間在broker配置中指定;如果時間戳域沒有設值,那么broker會將此值設定為接收到提交偏移量請求的時間,用戶可以通過設置這個提交時間戳達到延長偏移量保存時間的目的。
在v2版本中,我們移除了時間戳域,但是增加了一個全局保存時間域(詳情參見 KAFKA-1634 );broker會設置提交時間戳為接收到請求的時間,但是提交的偏移量能被保存到提交請求中用戶指定的保存時間,如果這個保存時間沒有設值(-1),那么broker會使用默認的保存時間。
注意,當這個API在“simple consumer”模式下使用,并非作為消費者組一員時,那么generationId必須被設置成-1,并且memberId必須為空(非null)。另外,如果有一個活動的消費者組有同樣的groupId,那么提交Offset的請求將會被拒絕(一般會返回UNKNOWN_MEMBER_ID或者ILLEGAL_GENERATION錯誤)。
Offset Commit Response
v0, v1 and v2:OffsetCommitResponse => [TopicName [Partition ErrorCode]]] TopicName => string Partition => int32 ErrorCode => int16
可能的錯誤碼(Possible Error Codes)
OFFSET_METADATA_TOO_LARGE (12)
GROUP_LOAD_IN_PROGRESS (14)
GROUP_COORDINATOR_NOT_AVAILABLE (15)
NOT_COORDINATOR_FOR_GROUP (16)
ILLEGAL_GENERATION (22)
UNKNOWN_MEMBER_ID (25)
REBALANCE_IN_PROGRESS (27)
INVALID_COMMIT_OFFSET_SIZE (28)
TOPIC_AUTHORIZATION_FAILED (29)
GROUP_AUTHORIZATION_FAILED (30)
Offset Fetch Request
根據 KAFKA-1841 的注釋,V0和V1是相同的,但V0(0.8.1或更高版本支持)從zookeeper讀取的偏移量,而V1(0.8.2或更高版本支持)從Kafka讀偏移量。
v0 and v1 (supported in 0.8.2 or after):OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] ConsumerGroup => string TopicName => string Partition => int32
Offset Fetch Response
v0 and v1 (supported in 0.8.2 or after):OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] TopicName => string Partition => int32 Offset => int64 Metadata => string ErrorCode => int16
請注意,消費者組中一個topic的分區如果沒有偏移量,broker不會設定一個錯誤碼(因為它不是一個真正的錯誤),但會返回空的元數據并將偏移字段為-1。
偏移量獲取請求v0和v1版本之間沒有格式上的區別。功能實現上來說,v0版本從zookeeper獲取偏移量,v1版本從Kafka中獲取偏移量。
可能的錯誤碼(Possible Error Codes)
- UNKNOWN_TOPIC_OR_PARTITION (3) <- 只在v0版本的請求中出現
- GROUP_LOAD_IN_PROGRESS (14)
- NOT_COORDINATOR_FOR_GROUP (16)
- ILLEGAL_GENERATION (22)
- UNKNOWN_MEMBER_ID (25)
- TOPIC_AUTHORIZATION_FAILED (29)
- GROUP_AUTHORIZATION_FAILED (30)
組成員管理接口(Group Membership API)
這些請求用于客戶端參加Kafka所管理的消費者組。從更高層次上看,集群中每個消費者組都會分配一個broker(即消費者組協調員),以簡化消費者組管理。一旦得到了組協調員地址(使用上面的消費者組協調員請求),組成員可以加入該組,同步狀態,然后用心跳消息保持在組中的活躍狀態。當客戶端關閉時,它會使用離開組請求從消費者組中注銷。此協議的語義在Kafka客戶端分配協議中有詳細描述。
組建管理接口的主要使用場景是消費者組,但這些請求也盡量設計得一般化以便支持其他應用場景(例如,Kafka Connect組)。這種設計的帶來的代價就是是一些特定的組語義(group semantics)被推到了客戶端實現。例如,下面定義的JoinGroup和SyncGroup請求無明確定義的字段以支持消費者組分區分配。相反,它們在其中包含有一些通用的字節數組(byte arrays),用這些字節數組就可以使得分區分配切入在消費者客戶端實現。
但是,盡管這種實現允許每個客戶端來實現來定義它們自己的嵌入schema,但是Kafka工具的兼容性要求這些客戶端使用Kafka客戶端使用的標準方案。例如,consumer-groups.sh這個應用程序會假定用這種格式來顯示分區分配。因此,我們建議客戶遵循相同的模式,使這些工具對所有客戶端實現都可以正常工作。
Join Group Request
加入組請求用于讓客戶端成為組的成員。當新成員加入一個現有組,之前加入的所有的會員必須通過發送一個新加入組的要求來重新入組。當成員第一次加入該組,成員編號將是空的(即“”),但重新加入的成員都應該使用與之前生成的相同的會員ID。
v0 supported in 0.9.0.0 and greaterJoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols GroupId => string SessionTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata] ProtocolName => string ProtocolMetadata => bytes v1 supported in 0.10.1.0 and greaterJoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols GroupId => string SessionTimeout => int32 RebalanceTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata] ProtocolName => string ProtocolMetadata => bytes
SessionTimeout字段指示客戶端的存活。如果組協調員在session過期前沒有收到一個心跳, 那么組員會被移除組。0.10.1前的版本, session timeout也被用作完成所需的 rebalance。一旦組管理員開始rebalance, 每一個組員會觸發session timeout以便發送新的JoinGroup請求。 如果它們失敗了,它們會從組中移除。 在0.10.1中,新版JoinGroup會使用一個獨立的RebalanceTimeout來創建,一旦rebanlance開始,每個客戶端觸發過期以便重新加入,但是如果session timeout小于rebalance timeout, 客戶端還是會持續發送heatbeat。
ProtocolType字段定義了該組實現的嵌入協議。組協調器確保該組中的所有成員都支持相同的協議類型。組中包含的協議(GroupProtocols)字段中的協議名稱和元數據的含義取決于協議類型。請注意,加入群請求允許多協議/元數據對。這使得滾動升級時無需停機。協調器會選擇所有成員支持的一種協議,升級后的成員既包括新版本和老版本的協議,一旦所有成員都升級,協調器將選擇列在數組中最前面的組協議(GroupProtocol)。
消費者組: 下文我們定義了消費者組使用的嵌入協議。我們建議所有消費者客戶端實現遵循這個格式,以便Kafka工具能夠對所有的客戶端正常工作
ProtocolType => "consumer" ProtocolName => AssignmentStrategy AssignmentStrategy => string ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => bytes
UserData域的可以用來自定義分配策略。例如,在一個粘性分區策略實現中,這個字段可以包含之前的分配。在基于資源的分配策略,也可以包括每個運行消費者主機上的CPU個數等信息。
Kafka Connect使用“connect”的協議類型,和協議細節也是基于Connect的內部實現。
Join Group Response
接收到來自該組中的所有成員組的加入組請求后,協調器將選擇一個成員作為Leader,并且選擇所有成員支持的協議。Leader將收到會員的完整列表與選擇的協議相關的元數據。其他追隨者成員,會收到一個空會員數組。Leader需要檢查每個成員的元數據,并且使用下文中描述的SyncGroup請求來分配狀態。
一旦加入組階段完成,協調器會增加該組的GenerationId,這個Id是發送給每個成員的response中的一個字段,同時也會在心跳和偏移量提交請求中。當協調器重新rebalance了一個組,協調器將發送一個錯誤碼,表示客戶端成員需要重新加入組。如果重新平衡完成前成員未重入組(rejoin),那么它將有一個舊generationId,在新的請求使用這個舊Id時,這將導致ILLEGAL_GENERATION錯誤。
v0 and v1 supported in 0.9.0 and greaterJoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members ErrorCode => int16 GenerationId => int32 GroupProtocol => string LeaderId => string MemberId => string Members => [MemberId MemberMetadata] MemberId => string MemberMetadata => bytes
消費者組: 協調器負責選擇所有成員都兼容協議(即分區分配策略),Leader是實際執行分配的成員,加入群請求可以包含多個分配策略,從而支持現有版本升級或者更改不同的分配策略。
可能的錯誤碼(Possible Error Codes):
- GROUP_LOAD_IN_PROGRESS (14)
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- NOT_COORDINATOR_FOR_GROUP (16)
- INCONSISTENT_GROUP_PROTOCOL (23)
- UNKNOWN_MEMBER_ID (25)
- INVALID_SESSION_TIMEOUT (26)
- GROUP_AUTHORIZATION_FAILED (30)
SyncGroup Request
組長(group leader)使用同步組請求用來向當前組中的所有成員進行狀態分配(例如分區分配)。所有成員加入該組后,立即發送SyncGroup,但只有Leader承擔這個工作。
SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment GroupId => string GenerationId => int32 MemberId => string GroupAssignment => [MemberId MemberAssignment] MemberId => string MemberAssignment => bytes
消費者組: 消費則組中MemberAssignment字段的格式如下:
MemberAssignment => Version PartitionAssignment Version => int16 PartitionAssignment => [Topic [Partition]] Topic => string Partition => int32 UserData => bytes
所有實現了“consumer”協議類型的客戶端實現都需要支持這個scheme。
Sync Group Response
組中的每個成員都會接收到leader發出的同步組響應。
SyncGroupResponse => ErrorCode MemberAssignment ErrorCode => int16 MemberAssignment => bytes
可能的錯誤代碼(Possible Error Codes):
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- NOT_COORDINATOR_FOR_GROUP (16)
- ILLEGAL_GENERATION (22)
- UNKNOWN_MEMBER_ID (25)
- REBALANCE_IN_PROGRESS (27)
- GROUP_AUTHORIZATION_FAILED (30)
Heartbeat Request
每當一個成員加入并同步完成,他將開始發送心跳請求使自己留在組里。當協調器在配置的會話超時時間內沒有他的收到心跳請求,該成員會被踢出該組。
HeartbeatRequest => GroupId GenerationId MemberId GroupId => string GenerationId => int32 MemberId => string
Heartbeat Response
HeartbeatResponse => ErrorCode ErrorCode => int16
可能的錯誤代碼(Possible Error Codes):
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- NOT_COORDINATOR_FOR_GROUP (16)
- ILLEGAL_GENERATION (22)
- UNKNOWN_MEMBER_ID (25)
- REBALANCE_IN_PROGRESS (27)
- GROUP_AUTHORIZATION_FAILED (30)
LeaveGroup Request
當想要離開組群時,用戶可以發送一個退組請求。這優先于會話超時,因為它能使該組快速再平衡,這對于消費者而言這意味著可以用更短的時間將分區分配到一個活動的成員。
LeaveGroupRequest => GroupId MemberId GroupId => string MemberId => string
LeaveGroup Response
LeaveGroupResponse => ErrorCode ErrorCode => int16
可能的錯誤代碼(Possible Error Codes):
- GROUP_LOAD_IN_PROGRESS (14)
- CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- NOT_COORDINATOR_FOR_CONSUMER (16)
- UNKNOWN_CONSUMER_ID (25)
- GROUP_AUTHORIZATION_FAILED (30)
管理接口(Administrative API)
ListGroups Request
該API可用于找到當前被broker管理的組群。為了得到集群內的所有組列表,你必須向所有broker發送組列表請求。
ListGroupsRequest =>
ListGroups Response
ListGroupsResponse => ErrorCode Groups ErrorCode => int16 Groups => [GroupId ProtocolType] GroupId => string ProtocolType => string
可能的錯誤代碼(Possible Error Codes):
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- AUTHORIZATION_FAILED (29)
DescribeGroups Request
DescribeGroupsRequest => [GroupId] GroupId => string
DescribeGroups Response
DescribeGroupsResponse => [ErrorCode GroupId State ProtocolType Protocol Members] ErrorCode => int16 GroupId => string State => string ProtocolType => string Protocol => string Members => [MemberId ClientId ClientHost MemberMetadata MemberAssignment] MemberId => string ClientId => string ClientHost => string MemberMetadata => bytes MemberAssignment => bytes
可能的錯誤代碼(Possible Error Codes):
- GROUP_LOAD_IN_PROGRESS (14)
- GROUP_COORDINATOR_NOT_AVAILABLE (15)
- NOT_COORDINATOR_FOR_GROUP (16)
- AUTHORIZATION_FAILED (29)
常量(Constants)
Api Keys And Current Versions
下面是請求中ApiKey的數字值,用來表示上面所述的請求類型。
接口名稱(API NAME) | APIKEY值 |
---|---|
ProduceRequest | 0 |
FetchRequest | 1 |
OffsetRequest | 2 |
MetadataRequest | 3 |
Non-user facing control APIs | 4-7 |
OffsetCommitRequest | 8 |
OffsetFetchRequest | 9 |
GroupCoordinatorRequest | 10 |
JoinGroupRequest | 11 |
HeartbeatRequest | 12 |
LeaveGroupRequest | 13 |
SyncGroupRequest | 14 |
DescribeGroupsRequest | 15 |
ListGroupsRequest | 16 |
Error Codes
我們用數字代碼表示服務器發生的問題。這些可以由客戶端轉換成客戶端中的異常(Exceptions)或者其他任何適當的錯誤處理機制。這里是當前正在使用的錯誤代碼表:
錯誤名稱(Error) | 編碼(Code) | 是否可重試(Retriable) | Description | 描述 |
---|---|---|---|---|
NoError | 0 | No error–it worked! | 沒有錯誤 | |
Unknown | -1 | An unexpected server error | 服務器未知錯誤 | |
OffsetOutOfRange | 1 | The requested offset is outside the range of offsets maintained by the server for the given topic/partition. | 請求的偏移量超過服務器維護的主題分區的偏移量。 | |
InvalidMessage / CorruptMessage | 2 | Yes | This indicates that a message contents does not match its CRC | 這個錯誤表示消息的內容與它的CRC校驗碼不符合。 |
UnknownTopicOrPartition | 3 | Yes | This request is for a topic or partition that does not exist on this broker. | broker上不存在所請求的主題或者分區。 |
InvalidMessageSize | 4 | The message has a negative size | 消息長度為負數。 | |
LeaderNotAvailable | 5 | Yes | This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes. | 這個錯誤會在leader選舉之間拋出,一樣那位此時這個分區沒有leader因此不能被寫入。 |
NotLeaderForPartition | 6 | Yes | This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date. | 這個錯誤表示客戶端正在把消息發送給副本,而不是分區的leader。這說明客戶端的元數據已經過期。 |
RequestTimedOut | 7 | Yes | This error is thrown if the request exceeds the user-specified time limit in the request. | 當這個請求超過了用戶自定義的請求時間限制拋出此錯誤 |
BrokerNotAvailable | 8 | This is not a client facing error and is used mostly by tools when a broker is not alive. | 這個不是客戶端所能接受到的錯誤,一般被工具用在broker沒有活動的場合。 | |
ReplicaNotAvailable | 9 | If replica is expected on a broker, but is not (this can be safely ignored). | 當broker希望有副本而實際上并沒有時拋出(這個錯誤可以被安全地忽略)。 | |
MessageSizeTooLarge | 10 | The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum. | 當服務器配置了一個最大消息長度以避免無限制的內存分配時,客戶端產生了一個超過這個最大值的消息會拋出此錯誤。 | |
StaleControllerEpochCode | 11 | Internal error code for broker-to-broker communication. | broker之間內部通訊是的錯誤。 | |
OffsetMetadataTooLargeCode | 12 | If you specify a string larger than configured maximum for offset metadata | 如果你賦了一個超過所配置的最大偏移量元數據的字符串時觸發。 | |
GroupLoadInProgressCode | 14 | Yes | The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition), or in response to group membership requests (such as heartbeats) when group metadata is being loaded by the coordinator. | broker會在以下情況下返回這個錯誤:當broker人在加載偏移量時(主題分區的leader發生變化后)請求偏移量獲取請求;或者正在反饋組成員請求(比如心跳)時,組的元數據正在被協調器加載。 |
GroupCoordinatorNotAvailableCode | 15 | Yes | The broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active. | 組協調器請求,偏移量提交和大部分組管理請求時,偏移量主題還沒有被建立或者組協調器還沒有激活是broker會返回此錯誤。 |
NotCoordinatorForGroupCode | 16 | Yes | The broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for. | 非該組協調器的broker接收到一個偏移量獲取或提交請求時返回此錯誤。 |
InvalidTopicCode | 17 | For a request which attempts to access an invalid topic (e.g. one which has an illegal name), or if an attempt is made to write to an internal topic (such as the consumer offsets topic). | 請求指令嘗試訪問一個非法的主題(例如,一個包含非法名稱的主題),或者嘗試寫入一個內部主題(例如消費者偏移量主題)。 | |
RecordListTooLargeCode | 18 | If a message batch in a produce request exceeds the maximum configured segment size. | 批處理消息片段數組的長度超過了配置的最大消息片段數。 | |
NotEnoughReplicasCode | 19 | Yes | Returned from a produce request when the number of in-sync replicas is lower than the configured minimum and requiredAcks is -1. | 當同步中的副本數量小于配置的最小數量,并且requiredAcks設置為-1時返回此錯誤 |
NotEnoughReplicasAfterAppendCode | 20 | Yes | Returned from a produce request when the message was written to the log, but with fewer in-sync replicas than required. | 消息已經寫入日志文件,但是同步中的副本數量比請求中要求的數量少時返回此錯誤碼 |
InvalidRequiredAcksCode | 21 | Returned from a produce request if the requested requiredAcks is invalid (anything other than -1, 1, or 0). | 請求的requiredAcks非法(任何非-1,1或者0)時返回此錯誤碼。 | |
IllegalGenerationCode | 22 | Returned from group membership requests (such as heartbeats) when the generation id provided in the request is not the current generation. | 組籍管理請求(諸如心跳請求)時generation id不是與當前不一致時返回此錯誤碼 | |
InconsistentGroupProtocolCode | 23 | Returned in join group when the member provides a protocol type or set of protocols which is not compatible with the current group. | 加入組請求時成員提供的協議類型或者協議類型組與當前組不兼容時返回。 | |
InvalidGroupIdCode | 24 | Returned in join group when the groupId is empty or null. | 加入組請求時groupId為空或者null是返回。 | |
UnknownMemberIdCode | 25 | Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation. | 組請求(偏移量提交/獲取,心跳等)時memberId不在當前的generation。 | |
InvalidSessionTimeoutCode | 26 | Return in join group when the requested session timeout is outside of the allowed range on the broker | 加入組請求時請求的會話超時超過broker允許的限制。 | |
RebalanceInProgressCode | 27 | Returned in heartbeat requests when the coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group. | 心跳請求時協調器已經開始了組的再平衡,這意味著客戶端必須重新加入組。 | |
InvalidCommitOffsetSizeCode | 28 | This error indicates that an offset commit was rejected because of oversize metadata. | 這個錯意味著偏移量提交因為超過了元數據大小而被拒絕。 | |
TopicAuthorizationFailedCode | 29 | Returned by the broker when the client is not authorized to access the requested topic. | 客戶端沒有訪問請求主題的權限時,broker返回此錯誤。 | |
GroupAuthorizationFailedCode | 30 | Returned by the broker when the client is not authorized to access a particular groupId. | 客戶端沒有訪問特定groupId的權限時,broker返回此錯誤。 | |
ClusterAuthorizationFailedCode | 31 | Returned by the broker when the client is not authorized to use an inter-broker or administrative API. | 客戶端沒有權限訪問broker之間的接口或者管理接口時,broker返回此錯誤。 |
一些常見的哲學問題(Some Common Philosophical Questions)
有些人問,為什么我們不使用HTTP。有許多原因,最主要的是客戶端實現可以使用一些更高級的TCP特性–請求的多工(multiplex)能力(譯者注:同一個TCP連接中同時發送多個請求,http長連接必須等到前一次請求結束才能發送后一個請求,否則需要多個http連接),同時輪詢多個連接的能力,等等。我們還發現HTTP庫在許多編程語言中非常是出奇地破舊(shabby -_-!)。
還有人問,也許我們可以支持許多不同的協議。此前的經驗是,多協議支持的是很難添加和測試新功能,因為他們要被移植到許多協議實現中。我們感覺,大多數用戶并不在乎支持多個協議這些特性,他們只是希望在自己選擇的語言中實現了良好可靠的客戶端。
另一個問題是,為什么我們不采用XMPP,STOMP,AMQP或現有的協議。這個問題的不同協議有不同答案,但在共通的問題是,這些協議的確確定了大部分實現,但如果我們沒有協議的控制權,我們就實現不了我們的功能。我們相信,我們可以實現比現有消息系統更好的真正的分布式消息系統,但要做到這一點,我們需要建立不同的工作模式。
最后一個問題是,為什么我們不使用的Protocol Buffers或Thrift來定義我們的請求消息格式。這些庫擅長幫助您管理非常多的序列化的消息。然而,我們只有幾個消息。而且這些庫跨語言的支持是有點參差不齊(取決于軟件包)。最后,我們頗為謹慎地管理二進制日志格式和傳輸協議之間的映射,而用如果使用這些系統將變得不太可能。最后,我們比較喜歡讓API有明確的版本并且通過檢查版來引入原本為空的新值,因為它能更細致地控制兼容性。
來自:http://www.udpwork.com/item/16080.html