Kafka 設計詳解之隊列
前言
本文打算詳細分析 Kafka 的核心 — 隊列 的設計和實現,來對 Kafka 進行更深一步的了解。
如何設計隊列
隊列是一種先進先出(FIFO)的數據結構,它是 Kafka 中最重要的部分,負責收集生產者生產的消息,并將這些消息傳遞給消費者。要實現一個隊列有多種方式,Kafka 作為一個消息隊列中間件,在設計隊列時主要要考慮兩個問題:
1. 隊列數據是寫到內存還是寫到磁盤
乍一看到這個問題,我們會想,內存的讀取速度遠快于磁盤,如果追求性能,內存也充足的話,當然是將生產者產生的消息數據寫到內存(比如用一個數組或者鏈表來存儲隊列數據),供消費者消費。真的是這樣嗎?
下面我們依次分析下寫內存和寫磁盤文件的優缺點,首先,內存的優點是讀寫速度非常快,但是,如果我們的目標是設計「大數據量」下的「高吞吐量」的消息隊列,會有以下幾個問題:
- GC 的時間消耗:如果隊列數據都在內存,數據會非常大(幾十 G), 因為消息隊列需要不斷地接受新產生的消息和刪除已經被消費的消息(不然內存很快會被撐爆),Java GC 的消耗不容忽視。
- Java 內存存儲效率:如我們所知,一個 Java 對象的內存開銷會大于其對象數據本身,通常對象的內存開銷是數據本身的一倍(甚至更多)。
- 設計復雜度增加:為保證數據不丟失,需要一個預寫日志(WAL),如果程序異常掛掉,重啟時可以從 WAL 恢復數據。
- 數據初始化的時間消耗:程序重啟時需要從文件將數據 load 到內存,如果數據對象大小是 10G 級別的,也會消耗大量的時間(10 分鐘左右)。
接下來我們來分析一下磁盤,寫磁盤文件方式存儲隊列數據的優點就是能規避上述內存的缺點,但其有很嚴重的缺點,就是讀寫速度慢,如果純依靠磁盤,那消息隊列肯定做不到「高吞吐量」這個目標。
分析了內存跟磁盤的優缺點,好像我們還是只能選寫內存,但我們忽視了磁盤的兩個情況:一是磁盤慢是慢在隨機讀寫,如果是順序讀寫,他的速度能達到 600MB/sec(RAID-5 磁盤陣列),并不慢,如果我們盡可能地將數據的讀寫設計成順序的,可以大大提升性能。二是 現代的操作系統會(盡可能地)將磁盤里的文件進行緩存 。
有了操作系統級別的文件緩存,那用磁盤存儲隊列數據的方式就變得有優勢了。首先,磁盤文件的數據會有文件緩存,所以不必擔心隨機讀寫的性能;其次,同樣是使用內存,磁盤文件使用的是操作系統級別的內存,相比于在 Java 內存堆中存儲隊列,它沒有 GC 問題,也沒有 Java 對象的額外內存開銷,更可以規避應用重啟后的內存 load 數據耗時的問題,而且,文件緩存是操作系統提供的,因為我們只要簡單的寫磁盤文件,系統復雜性大大降低。
因此,Kafka 直接使用磁盤來存儲消息隊列的數據。
2. 以何種數據結構存儲隊列數據
剛才我們已經決定用磁盤文件來存儲隊列數據,那么要如何選擇數據結構呢?一般情況下,如果需要查找數據并隨機訪問,我們會用 B+ 樹來存儲數據,但其時間復雜度是 O(log N),由于我們設計的是消息隊列,我們可以完全順序的寫收到的生產者消息,消費者消費時,只要記錄下消費者當前消費的位置,往后消費就可以了,這樣可以對文件盡可能的進行順序讀寫,同時,時間復雜度是O(1)。其實,這跟我們寫日志的方式很像,每條日志順序 append 到日志文件。
隊列實現
之前我們已經確定采用直接順序寫磁盤文件的方式來存儲隊列數據,下面我們來剖析下具體的實現細節。
在 Kafka 中,用一個文件夾存儲一條消息隊列,成為一個 Log,每條消息隊列由多個文件組成,每個文件稱為一個 LogSegment,每當一個 LogSegment 的大小到達閾值,系統就會重新生成一個 LogSegment;當舊的 LogSegment 過期需要清理時(雖然磁盤空間相對于內存會寬裕很多,我們可以保存更長時間的消息數據,比如一周,以供消費者更靈活的使用,但還是需要定期清理太老的數據),系統會根據清理策略刪除這些文件。
現在我們知道一個隊列(Log)是由多個隊列段文件(LogSegment)組成的,那么 Kafka 是如何將這些文件邏輯上連接從而組成一條有序隊列的呢?在生成每個隊列段文件時,Kafka 用該段的初始位移來對其命名,如在新建一個隊列時,會初始化第一個隊列段文件,那么其文件名就是0,假設每個段的大小是固定值 L,那么第二個段文件名就是 L,第 N 個就是 (N - 1)* L。這樣,我們就可以根據文件名對段文件進行排序,排序后的順序就是整個隊列的邏輯順序。
隊列讀寫
了解了隊列的基本實現,下面我們就來分析下隊列的核心操作—讀和寫。
寫
寫操作發生在生產者向隊列生產消息時,在上篇文章講網絡通信時我們已經說到,所有的客戶端請求會根據協議轉到一個 Handler 來具體處理,負責寫操作的 Handler 叫 ProducerHandler,整個寫請求的流程如下:
寫消息流程
-
根據消息的 topic、partition 信息(跟其分布式有關,以后文章會專門說到),定位到具體的隊列,即上面我們說到的 Log。
-
檢驗生產者消息是否合法
-
我們知道一個 Log 由多個 LogSegment 組成,在任意時刻, 只有最新一個 LogSegment 是可寫的 ,其他的都是只讀的,所以,接下來我們通過排序后的 LogSegment 獲取到最新一個可寫的 LogSegment。
-
用 NIO,將消息(一個 ByteBuffer)寫到 LogSegment 所屬的文件
-
上一步中說的寫文件,考慮到效率問題,并沒有直接將消息 flush 到磁盤,所以,這里其實 存在一個丟消息的風險 。在本步驟中,主要檢查待 flush 的消息大小是否到達指定閾值,如果到了,就 flush 到磁盤
-
檢查最新的 LogSegment 大小是否到達閾值,如果是,則保存關閉當前文件,新建一個 LogSegment 文件。
之前我們說過,如果是順序寫,由于省掉了磁頭尋址的時間,磁盤的性能還是很高的,我們看到 Kakfa 隊列是以順序方式寫的,所以性能很高。但是,如果一臺 Kafka 服務器有很多個隊列,而硬盤的磁頭是有限的,所以還是得在不同的隊列直接來回切換尋址,性能會有所下降。
讀
隊列的讀操作發送在消費者消費隊列數據時,由于隊列是線性的,只需要記錄消費者上次消費到了哪里(offset),接下去消費就好了。那么首先會有一個問題,由誰來記消費者到底消費到哪里了?
一般情況下,我們會想到讓服務端來記錄各個消費者當前的消費位置,當消費者來拉數據,根據記錄的消費位置和隊列的當前位置,要么返回新的待消費數據,要么返回空。讓服務端記錄消費位置,當遇到網絡異常時會有一些問題,比如服務端將消息發給消費者后,如果網絡異常消費者沒有收到消息,那么這條消息就被「跳過」了,當然我們可以借鑒二階段提交的思想,服務端將消息發送給消費者后,標記狀態為「已發送」,等消費者消費成功后,返回一個 ack 給服務端,服務端再將其標記為「成功消費」。不過這樣設計還是會有一個問題,如果消費者沒有返回 ack 給服務端,此時這條消息可能在已經被消費也可能還沒被消費,服務端無從得知,只能根據人為策略跳過(可能會漏消息)或者重發(可能存在重復數據)。另一個問題是,如果有很多消費者,服務端需要記錄每條消息的每個消費者的消費狀態,這在大數據的場景下,非常消耗性能和內存。
Kafka 將每個消費者的消費狀態記錄在消費者本身(隔一段時間將最新消費狀態同步到 zookeeper),每次消費者要拉數據,就給服務端傳遞一個 offset,告訴服務端從隊列的哪個位置開始給我數據,以及一個參數 length,告訴服務端最多給我多大的數據(批量順序讀數據,更高性能),這樣就能使服務端的設計復雜度大大降低。當然這解決不了一致性的問題,不過消費者可以根據自己程序特點,更靈活地處理事務。
下面就來分析整個讀的流程:
讀消息流程
-
在收到消費者讀的請求后,根據請求中的 topic、partition 信息,定位到具體的隊列(Log)。
-
根據 offset(我們要開始消費的隊列位置),因為每個 LogSegment 文件都是以隊列的位置命名的,所以可以利用 offset 進行二分查找尋找具體的 LogSegment。
-
在找到具體的 LogSegment 后,就可以讀數據了,不過,在這里并不真正讀數據,而是生成一個引用,記錄該文件的 channel,這次要讀取數據在文件中的起始位置以及結束位置,在真正進行網絡傳輸時,我們利用 零拷貝(zero-copy) 將數據傳輸,即從文件的 channel 直接向 socket channel 傳輸數據(Java 中是 channel.transferTo() 方法)。
-
消費者收到返回的數據后,解碼成真正的 message 列表。
一致性問題
分布式系統中不可避免的會遇到一致性問題,主要是兩塊:生產者與隊列服務端之間的一致性問題、消費者與隊列服務端之間的一致性問題,下面依次展開。
生產者與隊列服務端之間的一致性
當生產者向服務端投遞消息時,可能會由于網絡或者其他問題失敗,如果要保證一致性,需要生產者在失敗后重試,不過重試又會導致消息重復的問題,一個解決方案是每個消息給一個唯一的 id,通過服務端的主動去重來避免重復消息的問題,不過這一機制目前 Kafka 還未實現。目前 Kafka 提供配置,供用戶不同場景下選擇允許漏消息(失敗后不重試)還是允許重復消息(失敗后重試)。
消費者與隊列服務端之間的一致性
由于在消費者里我們可以自己控制消費位置,就可以更靈活的進行個性化設計。如果我們在拉取到消息后,先增加 offset,然后再進行消息的后續處理,如果在消息還未處理完消費者就掛掉,就會存在消息遺漏的問題;如果我們在拉取到消息后,先進行消息處理,處理成功后再增加 offset,那么如果消息處理一半消費者掛掉,會存在重復消息的問題。要做到完全一致,最好的辦法是將 offset 的存儲與消費者放一起,每消費一條數據就將 offset+1。
總結
本文介紹了 Kafka 的隊列實現以及其讀寫過程。Kafka 認為操作系統級別的文件緩存比 Java 的堆內存更省空間和高效,如果生產者消費者之間比較「和諧」的話,大部分的讀寫操作都會落在文件緩存,且在順序讀寫的情況下,硬盤的速度并不慢,因此選擇直接寫磁盤文件的方式存儲隊列。在隊列的讀寫過程中,Kafka 盡可能地使用順序讀寫,并使用零拷貝來優化性能。最后,Kafka 讓消費者自己控制消費位置,提供了更加靈活的數據消費方式。
來自:http://www.jianshu.com/p/6b2e39ba7787