[Apache Kafka]Kafka設計

jopen 8年前發布 | 18K 次閱讀 Kafka 消息系統

在開始開發producer和consumer之前,先從設計的角度看一看Kafka。

由于重度依賴JMS,且實現方式各異、對可伸縮架構的支持不夠,LinkedIn開發了Kafka來實現對活動流數據和運營指標數據的監控,這些數據包括CPU、I/O使用數據、請求響應時間等。開發Kafka時主要目標是提供以下特性:

  • 支持自定義實現的producer和consumer的API
  • 以低開銷的網絡和存儲實現消息持久化
  • 支持百萬級消息訂閱和發布的高吞吐量
  • 分布式和高可伸縮性架構以實現低延遲傳輸
  • 在consumer失效時自動平衡多個consumer
  • 服務器故障的容錯保證

Kafka基本設計

Kafka既不是隊列平臺(消息由consumer pool中的某一個consumer接收),也不是發布訂閱平臺(消息被發布給每個consumer)。在一個基本的結構中,producer發布消息到Kafka的topic中(message queue的同義詞)。topic也可以看做是消息類別。topic是由作為Kafka server的broker創建的。如果需要,broker也會存儲消息。conmuser訂閱(一個或多個)topic來獲取消息。這里,broker和consumer之間分別使用ZooKeeper記錄狀態信息和消息的offset。如下圖所示:

圖中展示的是單節點單broker架構,一個topic有四個partition。圖中包含了Kafka集群的五個組成部分:Zookeeper,broker,topic,producer,consumer。

在topic中,每個partition映射一個邏輯日志文件,由一系列大小相同的segment文件組成。每個分區是有序的且順序不可改變的消息,當消息發布至一個partition時,broker將消息添加到最后一個segment文件中。segment文件在發布的消息達到配置的數量時或一段時間后會寫入磁盤中。一旦segment文件寫入磁盤,消息就可以被consumer獲取到了。所有的消息partition被分配了一個唯一的序列號,稱之為offset,用來識別partition內的每個消息。每個partition可配置為在多個服務器之間復制,實現容錯保證。

每個partition在作為leader的服務器和有零個或多個作為follower的服務器上都是可用的。leader負責所有的讀寫請求,而follower從leader異步復制數據。Kafka動態維護一個in-sync replicas(ISR)集合,作為候選leader,并且始終將最新的ISR集合持久化到ZooKeeper中。如果當前leader失效了,某一個follower(ISR)會自動成為新的leader。在Kafka集群中,每個服務器扮演者兩個角色,它既作為它的一些partition的leader,也是其它partition的follower。這樣保證了集群中的負載均衡。

還有一個概念是consumer group。每個consumer看作一個進程,多個進程組織成群組就稱為consumer group。

topic內的一個消息被consumer group內的某個進程(consumer)消費,如果需要,一個消息也可以被多個consumer消費,只要這些consumer在不同的group中。consumer始終從一個partition中順序地消費消息,并且知道消息的offset。這意味著consumer消費了之前的所有消息。consumer向broker發起同步的pull請求,請求中包含待消費消息的offset。

Kafka中的broker是無狀態的,這意味著消息的消費狀態由consumer維護,broker并不記錄消息的消費者。如果消息被broker刪除(而broker不知道該消息是否已被消費),Kafka定義了time-based SLA(service level aggrement)作為消息保留策略。該策略中,消息在broker中停留的時間超過定義的SLA時長后會自動被刪除。盡管與傳統的消息系統相比,這樣會允許consumer回退到之前的offset重新消費數據,但這是consumer違反了隊列的約定。

來看一下Kafka提供的producer和consumer之間的消息傳遞方式:

  • 消息不允許重新傳遞但可能丟失
  • 消息可能重新傳遞但不會丟失
  • 消息只傳遞一次

發布消息時,消息會被提交到日志中。如果producer在發布時出現網絡故障,這時不能確定故障是在信息提交前還是提交后發生的。一旦提交成功,只要消息被復制到任意一個broker的partition中,消息將不會丟失。為了保證消息發布,需要配置producer提供的參數,如消息提交的確認時間和等待時間等。

從consumer視角來看,每個副本有著同樣的日志,日志中是同樣的offset信息。對于consumer,Kafka保證consumer在讀取消息、處理消息、保存位置過程中至少被傳遞一次。如果consumer進程在保存位置之前崩潰了,該topic partition的另一個consumer進程將會接收到這小部分已經被處理的消息。

日志精簡(log compaction)

日志精簡是一種實現細粒度的消息保留,而不是粗粒度、基于時間保留的機制。它確保了topic partition內的每個消息key最后的值保持到該key有更新操作后才刪除該記錄。

在Kafka中,可以對每個topic設置如基于時間、基于大小、基于日志精簡的保留策略。日志精簡確保了:

  • 始終維護消息的順序
  • 消息擁有順序的offset,且offset不會改變
  • consumer從offset 0開始讀取,或者從日志的開始處開始讀取,可以看出所有記錄的寫順序的最終狀態

日志精簡由后臺線程池處理,復制日志segment文件,刪除在日志最前面出現的key對應的記錄。

Kafka的一些重要的設計如下:

  • Kafka的根本是消息緩存和基于文件系統的存儲。數據會被立即寫入OS kernel page。緩存和寫入磁盤是可配置的。
  • Kafka可在消息被消費后仍被保留,如果需要可以被consumer重新消費
  • Kafka使用消息集分組管理消息,可減少網絡開銷
  • 大部分消息系統將消息的消費狀態信息記錄在服務器層,而在Kafka中是由consumer層。這解決了兩個問題:由于故障導致的消息丟失、同一個消息的多次傳遞。默認情況下,consumer將狀態信息存儲在ZooKeeper中,以可以使用其它的Online Transaction Processing(OLTP)系統。
  • 在Kafka中,producer和consumer使用傳統的推拉模式,producer將消息push到broker中,consumer從broker中pull消息。
  • Kafka中的broker沒有主從概念,所有的broker都看所peers。這意味著broker可以在任意時候添加或刪除,因為broker的元數據維護在ZooKeeper中并共享給consumer。
  • producer可以選擇向broker發送數據是同步模式還是異步模式。

消息壓縮(message compression)

考慮到網絡帶寬的瓶頸,Kafka提供了消息組壓縮特性。Kafka通過遞歸消息集來支持高效壓縮。高效壓縮需要多個消息同時壓縮,而不是對每個消息單獨壓縮。一批消息壓縮在一起發送給broker。壓縮消息集降低了網絡的負載,但是解壓縮也帶來了一些額外的開銷。消息集的解壓縮是由broker處理消息offset時完成的。

每個消息可通過一個不可比較的、遞增的邏輯offset訪問,這個邏輯offset在每個分區內是唯一的。接收到壓縮數據后,lead broker將消息集解壓縮,為每個消息分配offset。offset分配完成后,leader再次將消息集壓縮并寫入磁盤。

在Kafka中,數據的壓縮由producer完成,可使用GZIP或Snappy壓縮協議。同時需要在producer端配置相關的參數:

  • compression.codec:指定壓縮格式,默認為none,可選的值還有gzip和snappy。
  • compressed.topics:設置對指定的topic開啟壓縮,默認為null。當compression.codec不為none時,對指定的topic開啟壓縮;如果compressed.topics為null則對所有topic開啟壓縮。

消息集ByteBufferMessageSet可能既包含壓縮數據也包含非壓縮數據,為了區分開來,消息頭中添加了壓縮屬性字節。在該字節中,最低位的兩位表示壓縮格式,如果都是0表示非壓縮數據。

復制機制

在Kafka中,broker使用消息分區策略。消息如何分區由producer決定,broker會按照消息到達的順序存儲。每個topic的partition數量可在broker里配置。

盡管Kafka具有高可伸縮性,為了集群的更好的穩定性和高可用性,復制機制保證了即使broker故障消息也會被發布和消費。producer和consumer都是replication-aware的。如下圖所示:

在復制機制中,每個消息partition有n個副本,可以支持n-1個故障。n個副本中有一個作為lead。ZooKeeper保存了lead副本和follow的in-sync replicas(ISR)。lead副本負責維護所有的follow的in-sync replicas(ISR)。

每個副本將消息存儲到本地日志和offset,并定時同步到磁盤中。這樣保證了消息要么被寫到所有副本中,要么都沒寫。

Kafka支持以下復制模式:

  • 同步復制:同步復制模式時,producer首先從ZooKeeper識別lead副本并發布消息。一旦消息被提交,它會被寫到lead副本的日志中,所有的follower開始拉取消息;通過使用一個單獨的通道,保證了消息的順序。每個follow副本在將消息寫入日志后會向lead副本發送反饋信息。復制完成后,lead副本向producer發送反饋信息。在consumer這一端,所有的消息從lead副本拉取。
  • 異步復制:異步復制模式時,lead副本將消息寫入日志后就向peoducer發送反饋信息,不等待follow副本的反饋。這樣有個缺點,就是在broker故障時不能保證消息的傳遞。

如果任意的follower in-sync replicas故障了,leader會在配置的時間超時后將其從ISR列表中刪掉。當故障的follower恢復了時,它首先將日志截斷到最后的checkpoint(最后提交消息的offset),然后開始從checkpoint開始,從leader獲取所有消息。當follower完全同步了leader時,leader將其加回到ISR列表中。

在將信息寫入本地日志時或向producer發送反饋信息前,如果leader故障了,producer會將消息發送給新的leader。

新leader的選擇跟所有的follower ISR注冊它們自己到ZooKeeper的順序有關。最先注冊的將會成為新的leader,它的log end offset(LEO)成為最后提交消息的offset(也被稱為high watermark(HW))。剩下的follower稱為新的候選leader。每個副本向ZooKeeper注冊一個監聽器,在leader變化時會收到通知。當新的leader被選出,被通知的副本不再是leader時,它將日志截斷到最后提交消息的offset,并開始趕上新的leader。新leader在配置的時間超時或者所有活躍的副本都同步完成了時,將當前ISR寫入ZooKeeper,并開啟自己的消息讀取和寫入。

更多關于Kafka復制的實現方式,見 wiki .

參考資料

Learing Apache Kafka-Second Edition

來自: http://www.cnblogs.com/w1991/p/5093752.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!