kafka學習

jopen 11年前發布 | 38K 次閱讀 消息系統 Kafka

關于kafka的架構圖在之前的一篇文章中有:

http://wiki.corp.qunar.com/pages/viewpage.action?pageId=27866816

kafka的一些設計理念:

1、關注大吞吐量,而不是別的特性

2、針對實時性場景

3、關于消息被處理的狀態是在consumer端維護,而不是由kafka server端維護。

4、分布式,producer、broker和consumer都分布于多臺機器上。

以下內容基本是翻譯 加總結kafka的官方文檔:http://incubator.apache.org/kafka/design.html

基本術語和概念:

Message 是通信的基本單位。

Messages被一個producer發布給一個topic,也就是說它們被物理地傳遞給作為一個broker的服務器(一般是另外一臺機器)。一些consumer訂閱一個topic,每一個被發布的消息就被轉交給這些consumer了。

每一個consumer進程屬于一個consumer  group 并且一條message會傳遞給consumer group中但是(全局看)只有一個consumer進程。這種consumer group 概念非常有用,因為,consumer group 下面的多個進程或機器在邏輯上可以作為一個consumer。

為支持 queue語義,我們可以把所有的consumers 放到一個consumer group,然后一個消息只會進入一個consumer。

為支持topic語義,consumer可以作為單獨的consumer group,這樣所有的消息都會傳給每一個consumer。

Kafka在大規模數據面前有更多的優勢,因為不管一個topic有多少consumer,一個消息只存儲一次。

Message持久化和緩存:

Kafka是依賴文件系統來存儲和緩存消息的,(但是大家都覺得磁盤是比較慢的),磁盤不同用法會造成速度上的巨大差別。

一個67200rpm SATA磁盤 線性寫可達到300M/s,但是如果是隨機寫,只有50k/s

并且,kafka是運行在JVM上的,JVM兩個特性:

         1、object 的內存開銷是非常大的,經常是要存儲數據的兩倍(或者更高)

         2、Java的內存回收機制隨著堆內存的數據的增加變得頻繁。

作為這些因素的結果,使用文件系統和依賴于頁緩存比維持一個內存的存儲或者其他的結構有優勢------我們至少通過自動訪問所有的空閑內存使得可用的緩存加倍,而且可能通過存儲一個緊湊的字節結構而不是單獨的對象使

得可用的緩存又增加一倍的大小。這么做將導致在在一個32GB的機器上有28到30GB的緩存,而且還不會有GC帶來的損失。而且這種緩存將保持可用即使服務被重新啟動,但是進程中的緩存將需要在內存中重建(這對于一

個10GB的緩存將需要大概10分鐘的時間)或者需要用一個完全的冷備份啟動(這將是一個非常可怕的初始化過程)。它也將極大的簡化了編碼因為所有在緩存和文件系統里的相關維護邏輯現在都歸操作系統里了,這將比在

進程中的一次性嘗試的效率和正確度都要高。如果你的磁盤支持一次的讀取那么read-ahead 將有效地用每一個磁盤上讀取的有用數據填充這個緩存。

這表明了一種很簡單的設計:我們不是把數據盡量多的維持在內存中并只有當需要的時候在將數據刷到文件系統,我們是反其道而行之。所有的數據不用進行任何的刷數據的調用就立刻被寫入到文件系統的一個持久化的日志

記錄中。事實上這只是意味著轉移到了內核的頁緩存中,OS將在之后將它刷出。接著我們添加一個配置驅動器刷數據策略來允許系統的用戶控制數據被刷入物理磁盤的頻率(每多少消息或者每多少秒)來設置一個在臨界磁

盤崩潰時數據量的一個限制。

這種頁緩存為中心的設計在一片關于Varnish的設計的文章 中有描述。

滿足長時間保存消息:

一般消息系統持久化數據結構是用BTree,使得在消息系統中支持一個廣泛的各種各樣的事務性的和非事務性的語義。但是BTree的開銷還是比較高的:B樹操作的復雜度是O(log N),這個開銷貌似是固定的。但是對磁盤操作

卻不是這樣的,因為需要考慮磁盤尋道的開銷。此外,為滿足事務性語義,BTree還要考慮row-lock,無疑這樣的開銷是非常大的。

直觀上一個持久化的隊列可以進行簡單讀寫和添加數據到文件。盡管不能支持B數的豐富語義,但是他的優勢是:快!O(1)并且讀寫不相互阻塞。

這樣還有個好處,可以長時間存儲消息,只要磁盤沒有限制并且不出現損失,kafka可以存儲相當長時間的消息(一周)。

效率最大化:

通常有兩種原因造成效率低下: 太多的網絡請求,過多的字節拷貝。

為提供效率,kafka的API圍繞 “message set”概念構建,這種方式是天然的將消息分組。這樣可以允許一次請求一組消息,并且分攤了網絡往返的開銷。

Lazily desialized :MessageSet  實現本身是一個封裝了字節數組或者文件的API。因此,在處理messageset時可以用lazy deserialize。(如果不需要反序列化,就不做反序列化)

被broker維護的message的記錄本身只是個被寫入磁盤的message sets的目錄。

維護字節數組或者文件對網絡傳輸是非常方便的,現代的unix操作系統提供了一個非常高效的方法將數據從頁緩存發送到socket------sendfile,java通過FileChannel.transferTo.api提供對這個系統調用的訪問。

通常的數據從file傳輸到socket的路徑有:

1、操作系統從磁盤讀取文件到內核空間的pagecache。

2、應用程序從內核空間讀取數據到用戶空間的緩存。

3、應用程序將數據寫回內核空間的socket buffer。

4、操作系統將socket  buffer的數據拷貝到NIC buffer,數據從NIC被發送到網絡。

kafka使用了zero copy技術:

使用zero copy 方法優化: 數據只被拷貝到pagecache一次,每一次consumer請求都會重用,這就要求限制連接到服務器的consumer的數量。

點對點的批量壓縮

高效的壓縮需要將多個消息一起壓縮而不是對單個消息單獨壓縮。

點對點壓縮:producer端:定期的對數據進行壓縮,然后發送給服務端。服務端以壓縮的形式存儲數據,只有當consumer請求數據時進行解壓。

Kafka支持 GZIP 和 Snappy 壓縮協議,詳情請看這里

Consumer 狀態:

對于一個消息系統來說,保持消耗消息的紀錄,是一個必不可少的功能。

一般的消息系統是將消息追蹤紀錄寫在broker端,因此,broker可以及時的將發出消息從消息系統刪除,從而保障消息系統存儲的數據盡量的少。

但是,如果consumer處理消息失敗,此時broker已經把相應消息刪除,會造成消息丟失。因此,許多消息系統引入 acknowledge機制,只有消息確認處理完成,broker再刪除消息。

這種機制雖然解決消息丟失問題,但它又引入了別的問題:

1、如果consumer處理完一個消息,在發生確認之前fail掉,會造成消息被處理兩次。

2、在性能上,broker要記錄每一個消息的多種狀態,開銷太大。

消息交付語義:

以下幾個條件是消息交付必須保證的:

1、最多一次:消息處理之后應該立即被標記,以免重復處理,但是,很多失敗場景會造成消息丟失。

2、至少一次:應該保證每一個消息至少被交付一次,但是,失敗場景會造成交付多次。

3、恰好一次:這正是我們想要的。

確保恰好交付一次的算法有:two- or three- phase commits  和 Paxos 的變體。但是它們有一些缺陷:They typically require multiple round trips and may have poor guarantees of liveness (they can halt indefinitely). The FLP result provides some of the fundamental limitations on these algorithms.(求解釋)

在metadata 方面,kafka有兩樣是與眾不同的。

* *1、數據流將被劃分成獨立的部分(partitions)。這個語義是將劃分的過程留給了producer,producer來指定一個消息 屬于哪個部分。在一個partition中,消息是按照接受順序排序的,并且會以相同的順序發送給consumer。這意味著我們需要記錄 每一部分(consumer、topic、and partition)的“high water mark”。

    2、在kafka中,high-water mark 以“offset”的形式記錄,這對于各方面來說將變的非常清晰。

在kafka中,consumer將保持記錄已經處理的消息狀態(offset),consumer會將這些記錄信息寫到zookeeper中,但 是如果把這些信息存儲到consumer正在寫入數據的數據庫會更好,例如:consumer可能會簡單將一些統計信息寫到OLTP數據庫。這樣 consumer可以把狀態信息寫到正在進行事務操作的數據庫,這樣解決了分布式一致性問題。

很多應用場景下,可以講consumer存儲的狀態信息(offset)同步到別的地方(如搜索系統的索引字段里,HDFS中),這樣,如果consumer掛了,還能從上次處理的記錄的offset繼續,避免重復處理。

另外,這種機制還可以這樣用,直接從頭開始處理,雖然會有出發處理,但是有些場景確實需要這樣做。例如:發現處理過程有BUG,等BUG修復了可以從頭開始處理。

推 VS 拉:

有一個問題是:應該讓consumer從broker拉數據還是應該讓broker向consumer推數據?

在這一點上,kafka遵從傳統的做法,數據從producer被推到broker,然后consumer從broker拉取數據,類似的系統有 scribe 和flume(大家可以查查相關資料)。

在push-based的系統中,broker將控制著consumer的處理數據的速度(依賴broker向其推送數據的速度);pull-based系統就不會這樣,道理很簡單,consumer處理數據的速度主要受其自身拉數據的能力影響。

當然,可以結合一些backoff協議,以防consumer拉數據的速度過載,讓其達到滿載即可。

結合以上的說明,kafka采用的是        Producer ----- 推 --->broker<--- 拉  -----consumer;

Producer:

producer的自動負載均衡:

kafka支持producer端的自動負載均衡和用戶指定的TCP連接負載均衡器;kafka的broker支持用戶指定的第四層TCP連接負載均衡器(求解釋)。

使用第四層TCP連接負載均衡器 好 處是每一個producer只需要一個TCP連接與broker連接,并且不需要與zookeeper連接。它的弊端是負載均衡是在TCP層進行的,這樣 會造成負載分配不夠均衡(如果一個producer產生的消息比其他producer多,producer到broker的tcp連接均衡并不會導致消息 的傳遞的均衡)。

基于zookeeper客戶端的負載均衡解決了這個問題,它允許producer自動發現broker,并且對每一個請求都會做負載均衡。

并且zookeeper會通過一個key而不是隨機的將producer與broker連接起來(例如按照用戶ID來劃分數據)。這種機制稱為“語義劃分”;

基于zookeeper的負載均衡描述如下:通過以下幾種事件在zookeeper watcher上注冊:

1、新的broker出現。

2、一個broker掛了。

3、一個新的topic被注冊。

4、一個broker注冊到一個已經存在的topic。

在內部,producer對每一個broker會建一個彈性的連接池。這個連接池通過zookeeper watchers的回調會持續更新與每一個broker的新建或者已經連好的連接(保持心跳)。

當一個producer針對某個topic產生數據,broker會挑選一個partition,然后,在連接池中選一個可用的連接進行傳輸數據。

異步發送:

異步非阻塞操作是一個可擴展消息系統的基本操作,kafka當然也提供這樣一個操作(producer.type=async)。producer 可以在內存中緩存要發送的消息,然后等到觸發時間或者緩存內容達到配置好的buffer的大小,就會批量發送消息。由于產生消息的機器一般都是異構的,產 生數據的速度是不同的,這種異步緩存機制會對broker產生統一的通信量,會更好的提高網絡利用率和更高的吞吐量。

語義劃分:

kafka的producer可以將消息與可用的broker和partition進行映射,這樣允許對數據流以語義進行劃分,就是基于關鍵字將這些消息 劃分到不同的broker和partition。這個劃分機制可以由用戶自己通過實現kafka.producer.Partitioner接口進行設 置,默認的是隨機劃分。例如:關鍵詞可以是消息的ID,劃分方法可以是hash(消息id)%劃分的總數

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!