Kafka 設計與原理詳解
kafka 應用場景
-
日志收集
-
消息系統 解耦生產者和消費者、緩存消息。
-
用戶活動跟蹤: 就是我們在做的。
-
運營指標:生產各種操作的集中反饋。
-
流式處理:比如spark steaming
kafka的發布對象是topic。每類數據我們可以歸為一個topic。向topic發送消息的我們稱為生產者、從topic訂閱消息的稱為consumer。producer 和 consumer 可以同時讀寫數據。
-
topic: 消息主題。
-
producer: 生產者到topic的一方。
-
consumer: 訂閱topic消費消息的一方。
-
broker :
kafka topic & partition
kafka 集群會保存所有消息,不管消息有沒有被消費;通過設置消息過期時間,可以來定制的刪除消息。比如我們設置過期時間為2天。
一個消息被生產出來,寫入到多個partition。消息就是以partition作為存儲單位,每個partition可以通過調整以適應它所在的機器,而一個topic對應多個partition,這樣整個集群就可以適應各個大小的數據了。第二,也可以提高并發,因為可以以partition 為單位來讀寫了。
Kafka 核心組件
replications partitions and leaders
怎么實現持久化?
kafka能夠做數據持久化。可以為每個topic設置副本容量。 如果副本容量設為3,那么一份數據就會被放在3臺不同的機器上。一般設為2.
關于partition。
topic的存放形式是partition。每一個topic都可以設置partition數量。partition的數量決定了log的數量。producer 在生產消息時,會把消息發布到topic的各個partition中。
上面說的副本都是以partition為單位的,不過只有一個partition的副本會被選為leader作為讀寫用。
kafka從0.8開始提供partition級別的replication,replication的數量可在$KAFKA_HOME/config/server.properties中配置。
default.replication.factor = 1
如何設置partition值要考慮的因素?
一個partition只能被一個消費者消費(但是一個消費者可以同時消費多個partition。),所以,運行的partition的數量要大于運行的comsumer的數量,否則就會有消費者消費不到數據。另一方面,建議partition的數量大于broker 的數量。這樣leader partition 的數據就能均勻的分布在各個broker中,最終使得集群負載均衡。
(如果小于會怎樣樣,會造成比較集中的存儲在單個broker之中嗎。)。注意:kafka需要為每個partition分配一些內存來緩存消息數據,如果parttion數量越大,分配更大的heap space。
partition每一個都會保存作為一個repilca么? 不是的。partition的概念是根據partition 方法來將數據分布存儲。
producers
producer發送消息。
producer 可以直接發送到broker對應的leader partition中,不需要經歷任何一個中介的轉發。為實現這個特性,每個broker都可以響應producer的請求,并返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的你leader partition都在哪。現階段哪些leader partition 是可以直接訪問的?
如果訪問的不是leader partition 怎么搞? 而且我看是可以指定多個進行訪問的。
producer 和 partition 。
producer 可以控制以什么樣的將消息推送到客戶端。實現方法包括隨機,實現一類隨機負載均衡的算法,或者指定一些分區算法。kafka 提供了用戶自定義分區的方法,用戶可以為每一個消息指定一個partitionkey,通過這個key來實現一些hash 分區算法。
效率。
batch的方式將有效的提高效率,減少網絡和磁盤io的占用。這里batch的大小,可以再producer來設置,比如煥春100s,緩存1000條,或者數據的大小。
關于消息的完整性。
producer 可以異步的并行的向kafka發送消息,但是通常你producer在發送完消息之后會得到一個future的響應,返回的是offset或者發送過程遇到的錯誤。這里,acks 這參數很重要,這個參數決定了producer要求leader partition收到的確認副本數。如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這有可能會導致數據丟失,但這也是吞吐量最大的方式。
如果acks設置為1,表示producer 和laeder partition收到消息的時得到的broker的一個確認,這樣會有更好的可靠性。如果設置為-1,則組要等待所有partition收到消息。這樣能保持最高的可靠性。
kafka 消息。
kafka消息有一個定長的header和變長的字節組成。kafka沒有限定單個消息的大小,但一般不超過一mb,通常控制在1-10kb之間。
Consumers
kafka 提供了兩套api。sample api 。是一套無狀態的api。每次請求都需要指定offset。所以也是最靈活的。
在kafka中,當前消息的offset是由consumer來維護的。consumer可以自己決定讀哪些數據。比如,consumer 可以重新消費已經消費國的數據。這些數據有一個過期限制。這個限制是可配置的。
high-level api 封裝了對集群的訪問。可以透明的消費一個topic。自己本身維持了一個消費隊列,每次消費下一個。
這里consumer 用組來模擬了廣播和訂閱兩個功能。組是嫁接topic和consumer 的橋梁。 組對topic是來說是組內的成員都可以接受到消息,相當于廣播,組對成員來說,是訂閱,即你在這個組里才能接受到這個消息。所以都在一個組,就相當于一個大廣播。
kafka 的核心特性
壓縮
kafka 支持以batch的方式來發送消息。在此之上,還支持對消息的壓縮。 producer端進行壓縮之后,在consumer端進行解壓。這么做的好處是,往往大數據的瓶頸在于網絡,而不是cpu(所以會損耗一定的cpu。)
消息壓縮的信息,存儲在消息頭部的描述壓縮屬性字節。這個字節的后兩位表示消息的壓縮采用的編碼,若后兩位為0,則表示消息未被壓縮。
消息可靠性
在消息系統中,保證消息的可靠性是很重要的。在實際消息的傳遞過程中,會出現如下3種情況:
-
一個消息傳遞失敗
-
一個消息被發送多次
-
exactly once,一個消息發送成功并且僅發送了一次。
有許多系統聲稱它們實現了exactly-once,但一般沒有考慮生產者或消費者在生產和消費過程中有可能失敗的情況。比如雖然一個 producer成功發送一個消息,但消息丟失,或者成功發送到broker,也被consumer成功取走,但是這個consumer在處理消息時失敗了。
這里從兩個角度來分析這個。
從producer的角度:在發送端,看producer會等待broker成功接收到消息的反饋,如果沒有接到broker的反饋信息,producer 會重新發送,(我們知道kafka有備份機制,可以通過參數設置是否等待所有節點都收到消息,而本身的消息也有緩存)
從consumer的角度:因為consumer 可以調整offset,所以可以重復消費消息。也保證了,一條消息被發送一次就ok。
備份機制
備份機制是Kafka0.8版本的新特性,備份機制的出現大大提高了Kafka集群的可靠性、穩定性。有了備份機制后,Kafka允許集群中的節點掛掉后而不影響整個集群工作。一個備份數量為n的集群允許n-1個節點失敗。在所有備份節點中,有一個節點作為lead節點,這個節點保存了其它備份節點列表,并維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:
kafka 高效性相關設計
消息持久化
首先這里,kafka是高度依賴文件系統和緩存的。
文件系統的速度。文件系統的速度并不是想象中的慢或者快。對于,順序寫入和隨機寫入兩者有很大速度差。一個7200的硬盤順序寫入有600m/s的速度,隨機寫入有100k/s的速度。
緩存思路。所以,基本的數據寫入思路是,先拿內存緩存數據再刷新到磁盤。但是,眾所周知,內存的垃圾回收的代價很大,尤其當數據量過大的時候,垃圾回收會非常昂貴。
感覺這塊理解的不是很好
基于以上,得到的一個結論就是利用文件系統并且依靠頁緩存比維護一個內存緩存或者其他結構要好。而事實上,數據被傳輸到內核頁,稍后會被刷新。這里加上了一個配置項來控制讓系統的用戶來控制數據在什么時候被刷新到物理硬盤上。
常數時間性能保證
沒太理解
消息系統中持久化數據結構的設計通常是維護者一個和消費隊列有關的B樹或者其它能夠隨機存取結構的元數據信息。B樹是一個很好的結構,可以用在事務型與非事務型的語義中。但是它需要一個很高的花費,盡管B樹的操作需要O(logN)。通常情況下,這被認為與常數時間等價,但這對磁盤操作來說是不對的。磁盤尋道一次需要10ms,并且一次只能尋一個,因此并行化是受限的。
直覺上來講,一個持久化的隊列可以構建在對一個文件的讀和追加上,就像一般情況下的日志解決方案。盡管和B樹相比,這種結構不能支持豐富的語義,但是它有一個優點,所有的操作都是常數時間,并且讀寫之間不會相互阻塞。這種設計具有極大的性能優勢:最終系統性能和數據大小完全無關,服務器可以充分利用廉價的硬盤來提供高效的消息服務。
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味著我們可以提供一般消息系統無法提供的特性。比如說,消息被消費后不是立馬被刪除,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。
進一步提高效率
在web開發中,每次一條log都會產生一次寫操作,這些小的寫操作的量非常大,另外這些log也要至少被一個或以上consumer消費。
所以,這里出現了兩個比較低效的場景。
-
太多小的io操作。
-
過多的字節拷貝。
太多小的io操作。可以通過發送messageset來搞定。所以對消息的處理,這里沒有分開的序列化和反序列化的上步驟,消息的字段可以按需反序列化(如果沒有需要,可以不用反序列化)。
過多的字節拷貝。 為了解決這個問題,kafka設計了一個標準字節消息。producer,broker,consumer共享這一種消息格式。kafka的message log 在broker端就是一些目錄文件。這些文件都是按照message set 來存的。
而這種通用的方式,非常重要: 持久化log塊的網絡傳輸。這傳輸通過一鐘非常搞笑的途徑來實現頁面緩存和socket之間的數據傳遞。 叫sendfile
這里解釋下sendfile的作用,先聲明下一般的數據從文件到socket的路徑:
-
操作系統將數據從磁盤讀到內核空間的頁緩存中。
-
應用將數據從內核空間讀到用戶空間的頁緩存中。
-
應用將數據從用戶空間寫到內核空間的socket緩存。
-
操作系統將數據從socket緩存寫入到網卡緩存中。
這鐘方式非常低效,因為這里有四次拷貝,兩次系統調用。如果使用sendfile,就可以避免兩次拷貝:操作系統將數據直接從頁緩存發送到網絡上。所以這個過程,只有第一步和最后一步是需要的。利用上述zero copy,數據只需要拷貝到頁緩存一次,就可以重復被consumer利用。這樣通過頁緩存和sendfile的結合,下游有很多consumers,也不會對整個集群服務造成壓力。
kafka 集群部署
為了提高性能,盡量與hadoop的集群分開部署。如果共享節點的話,會影響其使用頁面緩存的性能。
kafka的性能主要在磁盤上。
kafka依賴于zookper,一般使用專用服務器來管理。zookeeper集群的節點采用偶數個。注意,zookeeper集群越大其讀寫性能越慢,因為zookeeper 要在節點之間同步數據。一個3節點的zookeeper集群允許一個節點失敗,一個5節點的集群允許2個節點失敗。
集群大小
衡量kafka集群所需的大小,最好是用模擬負載來測算一下。如果不想用模擬實驗,最好的方法是根據磁盤。
kafka 主要配置
broker config
log.dirs /tmp/kafka-logs Kafka數據存放的目錄。可以指定多個目錄,中間用逗號分隔,當新partition被創建的時會被存放到當前存放partition最少的目錄。