搜狗商業平臺分布式消息中間件應用實踐
背景概述
搜狗商業平臺負責搜狗商業廣告平臺的研發,其廣告平臺中存在大量的數據,包括廣告物料、操作日志、PV 點擊、上下線報文等。整個廣告平臺涉及實時 PV/UV 統計分析、實時安全分析、廣告審核、日志匯總等通用功能和操作日志、上下線報文、賬戶優化等與業務線相關的功能,系統繁多,依賴關系復雜。早期,我們主要 通過 rsync、ActiveMQ 等方式在不同系統間進行日志和數據的同步,然而,這些方式存在著時效性、性能、吞吐能力、消息堆積、失效轉移時消息丟失等方面的問題,所以我們一直在探索 一種有效方案來統一商業平臺內部系統間的交互,降低數據處理的成本。Apache Kafka 是 Linkin 開源的高吞吐分布式消息系統,它具備高穩定、高吞吐、低延遲、分布式等方面的特征,同時能夠支持消息消費者的自動水平擴展,有效解決消息堆積問題。因此, 我們選用 Apache Kafka 作為我們內部統一的消息系統。
事實上,我們對消息系統有更嚴格的要求。1,對于關鍵操作(例如,上下線報文,操作日志),我們需要在任何情況下都盡量保證其端到端的消息可靠 性。否則對于關鍵的操作,例如,該下線的廣告沒有下線,有可能會造成廣告投放端資源的浪費;2,由于消息基礎設施一般由多個應用共享,因此必須保證其合理 的權限,約束其讀寫端的行為,防止由于越權導致數據方面的問題。
針對上述問題,我們在 Apache Kafka 的基礎上進行了一系列的定制和優化,下面將分別闡述。
回頁首
端到端的消息可靠性機制的保證
此處端到端的消息可靠性指生產者生產的消息,一定可以被消費者所消費。在 Kafka 典型的應用場景中,它包含三個層面的意思:1,在絕大部分場景下,生產者產生的消息能夠發布到 Kafka Broker 集群上;2,消息在 Kafka Broker 集群上的存儲是高可靠的,不會因為機器故障、網絡故障而造成消息丟失;3,消費者端從 Kafka Broker 集群中獲取并消費消息至少一次。其中,這里主要分析第 1 點和第 2 點。
在生產者端,Kafka 提供了一系列的生產者 API,生產者可直接通過 API 發布消息。然而,由于網絡故障、硬件故障以及其它不可預期的原因,嚴重時可能會導致生產者進程掛掉。因此在生產者必須要對消息進行可靠傳輸,同時對消息傳 輸的斷點進行保存,便于極端場景下通過重試保證消息的可靠性。為此,我們開發了通用組件 Kafka Producer 用于最大程度的保證發送者的消息可靠性。為了保證消息的持久化,生產者會將通過追加寫將消息持久化到本地日志文件(本地日志文件一般采用 RAID,具備一定的可靠性),Kafka Producer 是一個單獨的進程,它通過監控本地日志文件的變化,并將其中的消息按照給定的主題發送到 Kafka Broker 集群中。其概念架構如圖 1 所示。
圖 1.Kafka Producer 概念架構圖

例如,如下配置:
[topic_dubheapi_access] request.required.acks=-1 producer.type=sync log_base_path=/opt/logs/dubhe-api base_log_fullpath=/opt/logs/dubhe-api/access.log log_name_regx=/opt/logs/dubhe-api/access.log.*(?<!swp$) topic=dubheapi_access client.id=dubhe-api sender.batch.num.messages=200 sender.waiting.max.ms=50 checkpoint_name=topic_dubheapi_access
上述配置監控的文件目錄 log_base_path 是/opt/logs/dubhe-api,其日志名稱 log_name_regx 為/opt/logs/dubhe-api/access.log.*(?<!swp$),其基準日志文件 base_log_fullpath 為/opt/logs/dubhe-api/access.log,其發送模式 producer.type 是同步發送,其對應的主題是 dubheapi_access, 其檢查點文件為 topic_dubheapi_access。因此,該 Kafka Producer 進程對監控文件夾/opt/logs/dubhe-api 下的所有名稱類似于 access.log.2015-09-06,access.log 的文件并將其中的消息按行發送到 Kafka Broker 集群的 dubheapi_access 主題,同時定時將檢查點寫入 topic_dubheapi_access 文件中。
在 Kafka Producer 中,我們主要關注日志的順序性以及可恢復性。
順序性能夠保證其監控的文件中的消息能夠按順序發送給 Kafka Broker,而順序性主要受到日志切分影響。Kafka Producer 內部會維護一個按照更新時間正序排列的文件隊列,同時也維護了當前處理的文件的位置,一般說來,當前處理的文件一般處于隊列的最前面。當發生日志切分時, 可能有兩種場景。1,切分時當前隊列中大于一個文件,此時切分的文件非當前日志文件,文件變更監控線程將其添加到文件隊列即可;2,切分時當前隊列中僅一 個文件,那么一定是此文件發生了切分,在文件發生切分時,當前正在處理的文件的位置需要重置成文件隊列最前面的文件,否則就可能造成部分消息丟失,無法保 證順序性。例如,若 access.log 發生切分,切分為 access.log.2015-09-02 和 access.log,若切分前的 access.log 日志文件中的消息未完全發送,則切分后仍處理新的 access.log 日志的話,則切分前的 access.log 日志文件中未處理的消息將丟失。
可恢復性指生產者進程在進程退出后能夠盡快的恢復到之前的狀態。為了保證這一點,我們通過檢查點的機制來保證。檢查點持久化到本地硬盤上,在每發 送完一批消息(由參數 sender.batch.num.messages 確定)或者等待一定的時間(由參數 sender.waiting.max.ms 確定)就會將當前的文件位置,就會將當前位置和最后修改時間持久化到本地硬盤上,以便在災難情況下能夠迅速恢復到當前狀態。當然,從性能方面來考慮,不可 能在每條消息發送完之后進行檢查點持久化,因此,有部分消息可能會被重發。
最后,在整個 Kafka Broker 集群不可用的情況下,Kafka Producer 將會等待,消息將在生產者端堆積,同時會觸發堆積報警監控,便于后續人工介入處理。
當然,該方案僅能滿足追加式寫日志的需要,如果其監控的源日志文件是通過 rsync 或者可編輯,則不可使用該方案。然而,在我們的實踐中,此種方式基本滿足我們的絕大部分需求,在實踐中被證明是一種有效的日志收集方法。
Broker 端消息可靠性的保證主要依賴于 Kafka Broker 自身的高可用機制。
在 Kafka Broker 中,每個 Topic 分為多個 Partition,每個 Partition 對應于一個主 Replica 和若干從 Replica。每個 Replica 若有對應的數據存儲在本地,則對應于 LogManager 所管理的一個日志文件 Log,每個日志文件 Log 對應于當前 Broker 所在物理機文件系統中的一個目錄,其概念架構圖 2 所示。
圖 2.Kakfa Replica 概念架構圖

Broker 上消息的可靠性主要涉及到消息的可靠存儲、消息復制以及節點失效處理等。
在消息的可靠存儲中,消息是通過追加寫寫入日志文件中,由于 Linux 文件緩存機制,消息首先被寫入緩存,需要通過 flush 將其持久化到硬盤。在數據寫入到持久化之間出現故障,則可能導致數據丟失。Kafka 提供了兩個參數:log.flush.interval.messages 和 log.flush.interval.ms 來控制其 flush 頻率。然而,在系統故障(例如掉電)等的情況下,未 flush 到硬盤上的數據將會丟失。
為了保證高穩定性,Kafka 采用復制機制,主 Replica 提供讀寫,從 Replica 僅提供備份和災備功能。在主 Replica 出現故障的情況下會將從 Replica 提升為主 Replica。主從間的數據復制由 ReplicaFetcherThread 完成,ReplicaFetcherThread 通過 FetchRequest 從相應的 Borker 中獲取其所負責處理 Partition 的主 Replica 的數據。在 FetchRequest 請求中提供了其所取數據的偏移量,該偏移量有兩個作用:1,用于 ReplicaFetcherThread 向主 Replica 指示獲取數據的起始位置;2,用于 ReplicaManager 記錄每個從 Replica 同步的位置,也稱為高水位線。HighWatermarkCheckPointThread 會定期(由 log.flush.offset.checkpoint.interval.ms 參數控制)將其持久化到文件中,以便故障恢復。因此,在 Replica 的復制過程中,在發生故障的情況下,若偏移量沒有及時上報,可能會重新獲取消息,造成數據的重復。
Partition 中也同時采用了 ISR 來維護處于“同步”狀態中的 Replica 列表。此同步指主 Replica 以及從 Replica 間的差距不會太大,此差距可通過參數 replica.lag.max.messages 和 replica.lag.time.max.ms 配置,分別表示消息偏移量差值大小以及從 Replica 通過 ReplicaFetcherThread 獲取主 Replica 時的延遲時間。
生產者端可通過配置參數 request.required.acks 來在高吞吐和數據可靠性間獲得平衡。其參數值及其說明如表 1 所示。
表 1.request.required.acks 參數值及其說明
參數值 | 說明 |
---|---|
0 | Broker 接收到數據后就返回,不關注寫入狀態。 |
1 | Broker 寫入主 Replica 就返回,會匯報寫入狀態給生產者。 |
-1 | Broker 寫入所有 ISR 中的 replica 之后才返回 |
事實上,在 Kafka 0. 8.1.X 以及其前的版本中,即使設置 log.flush.interval.ms=-1,在極端場景下可能發生數據丟失,例如:某個 Topic Partition 有 2 個 Replica, broker1 是 leader,broker2 是 follower,按如下步驟進行處理:
- 停止 broker2,觀察到 broker2 被從該 Partition 的 ISR 之中去掉之后
- 停止 broker1
- 啟動 broker2
- 啟動 broker1
可以看到啟動后此 Topic Partition 的 leader 變成了 broker2,如果在上述過程 1、2 之間持續發送消息的話,就會造成消息丟失。其本質原因是配置項 request.required.acks=-1 并不是指消息寫入到所有 Replicas 之后才返回,而是指寫入到在 ISR(in-sync replicas) 之中的 replicas 就返回。而 ISR 是會隨著系統運行而動態增加或縮減的,極端情況下 ISR 的列表可能為 1 或者 0;當在 ISR 列表大小為 1 時,若該 Broker 由于故障而掛起,則在此期間寫入到該 Broker 的數據可能丟失。這實際上是在 CAP 當中是選擇 C(Consistency,一致性) 還是選擇 A(Availability,可用性) 的問題,是一個設計權衡問題,并沒有完美的解決方案。Kafka 0.8.1.x 強制選擇可用性,犧牲一致性,用戶無法通過配置項來更改。Kafka 最新的 0.8.2 新增了配置項 unclean.leader.election.enable 和 min.insync.replicas,用戶可以根據場景的不同,自行決定選擇可用性還是選擇一致性。
目前的部署是 0.8.1.1版本,在我們的實踐中若需要保證高可靠性,會將 request.required.acks 設置成-1,并其復制集設置為 3,即對每個 Topic Partition 都有 3 個 replica(其中包含一個主 replica)。然而,復制集設置為 3 會對發送者的性能有一定的影響,若需要更高的性能,則可將 Kafka 升級到 0.8.2 以上版本并將其 min.insync.replicas 設置為 2。
回頁首
Kafka 權限管理
通過分析 Kafka 的設計架構,可以看到其中存在較多安全方面的隱患,具體體現在以下幾方面:
- 向 Kafka 集群加入 Broker 節點時缺少認證授權,同一網絡下任何一臺服務器只需要在配置中指定集群的 Zookeeper 配置路徑,并啟動 Broker 服務,即可加入集群,獲取 Producer 寫入的數據。存在寫入消息被新加入 Broker 惡意篡改的風險,且不易排查。
- 任何服務器都可以向 Kafka 的任意 Topic 寫數據,一方面,存在垃圾數據被惡意寫入集群的風險,另一方面,在存在多套集群的情況下,也可能因配置出錯導致消息發送者把數據誤寫入其他集群的風險,從而導致集群間的數據干擾。
- 任何服務器都可以讀任意 Topic 數據,敏感數據存在泄漏風險。
- Zookeeper 中保存的配置數據沒有設置讀寫權限,若被惡意篡改,會直接影響 Kafka 集群的穩定性。
隨著 Kafka 的廣泛應用,上述安全問題也日益凸顯,截至目前最新的版本,Kafka 都沒有提供任何安全機制,在社區對 Kafka 未來的路線圖規劃中,Kafka 安全是一個重要的方向。我們認為較好的 Kafka 權限設計應該滿足以下特點:
- 實現起來簡單,不需要大量修改源碼,不依賴第三方服務
- 提供安全保障的同時不降低 Kafka 讀寫性能
- 權限配置簡單,操作方便
因此,在 Kafka 的應用實踐過程中,為規避可能出現的安全風險,我們根據以上目標設計了一套 Kafka 安全架構,如圖 3 所示。
圖 3.Kafka 安全架構

Kafka 以 Topic 組織數據,權限設計也以 Topic 為單位進行管理。我們以當前請求機器的 IP、請求機器提供的 ClientId(對于 Producer)或者 GroupId(對于 Consumer) 以及請求的 Topic 進行權限判斷。在 Kafka Broker 中,我們新增了一個權限刷新線程,用于從遠程或本地加載權限信息,可通過配置文件配置是否開啟權限控制、權限刷新周期、本地備份路徑等。Kafka Broker 中處理客戶端(包括 Producer 和 Consumer)的請求時,是通過 KafkaApis 的 handle() 方法根據請求類型執行相應的處理。對于寫請求,我們在處理寫數據的請求 RequestKeys.ProduceKey 前加上權限判斷攔截,獲取 Producer 的 IP、ClientId 以及請求寫入的 Topic 名,判斷是否存在寫權限列表中,若存在則通過權限校驗,否則拒絕寫入。對于讀請求,我們在處理讀數據請求 RequestKeys.FetchKey 前獲取 Consumer 的 ClientId,IP 和請求讀取的 Topic,判斷是否存在讀權限列表中,若存在則通過權限校驗,否則拒絕讀取。默認情況下,Consumer 的 ClientId 等于其 GroupId。
我們也提供了 Kafka Admin 端,可通過 Web 方式針對 Topic 配置其讀寫權限。
最后,對于 Zookeeper 上數據安全問題,可簡單的通過部署私有集群以及 iptables 的方式來解決。其實踐較簡單,不再贅述。
回頁首
應用實踐
Kafka 作為基礎服務,在搜狗商業平臺廣泛應用于各類數據業務,為了便于采集各業務系統的日志數據,我們通過自行開發的 Kafka Producer 收集日志,它支持日志文件切分、故障恢復、斷點續傳和失敗重試,已作為基礎組件部署在產生數據的各應用服務器上,各應用只需把數據寫入日志文件,即可將數 據按需收集并傳輸至 Kafka 集群。下游的消費系統可進一步對系統日志數據做實時處理。商業平臺 Kafka 應用架構如圖 4 所示。通過 Kafka,我們將很多統計分析類應用的響應時間提升到了秒級。
圖 4. 商業平臺 Kafka 應用架構

在搜狗商業平臺,每天會產生上億級的廣告的狀態變化數據,包括新增/修改廣告的審核、關鍵詞的調價、暫停投放、恢復投放等。這些廣告狀態變化的數據都需要實時監控獲取并傳輸到后端服務系統,再進行廣告的相關處理。
在引入 Kafka 之前,原廣告狀態變更處理架構如圖 5 所示,對每類廣告狀態變化業務,都需要單獨設立一條數據傳輸通道,如下圖所示。
圖 5. 原廣告狀態變更處理架構

廣告主新提交廣告后,需要把新增廣告的消息傳輸給廣告審核系統進行實時審核;廣告主對投放中的廣告做改價操作后,需要把該廣告的改價消息傳輸給廣 告展現系統進行展現策略調整;廣告主暫停對某個廣告的投放后,需要通知展現系統停止展現該廣告;而對于每條廣告的各種狀態變化,也都需要統一收集并做進一 步數據分析。如此一來,每新增一類廣告樣式或新增一種狀態變化,都需要在報文系統層單獨定制狀態變化的監控,并與后端服務系統進行單對單的數據傳輸,系統 的擴展性和開發維護成本都比較高,并且發給各后端業務系統的數據之間可能還存在交集,比如發給展現系統的數據也需要發給數據分析系統一份,作業也就存在一 定的資源浪費。
引入 Kafka 后,廣告狀態變更處理架構如圖 6 所示,我們用 Kafka 來統一收集各種廣告狀態的變化數據,由下游系統訂閱獲取,下圖所示。
圖 6:引入 Kafka 后廣告狀態變更處理架構

引入 Kafka 后,報文系統負責統一收集廣告的狀態變化消息并寫入 Kafka,供下游系統使用。一方面,系統規模可橫向擴展,消息傳輸的吞吐率得以極大提升,能夠滿足日均億級消息量的實時傳輸,另一方面,也滿足了各類業 務場景的向后兼容性,對于新增類型的廣告狀態變化,只需數據訂閱端按需定制自己關心的數據并對消息進行解析過濾即可。
同時,Kafka 作為商業平臺的重要基礎服務,我們也根據實際需要,部署了完備的監控。監控分兩大類:系統常規監控和特定監控。系統常規監控主要包括三部分:
- 系統級的監控:如 Kafka Broker 的 CPU 負載、內存使用率、存儲空間、I/O 負載。
- 錯誤日志監控:主要是 Kafka Broker 的 error 日志。
- JVM 監控:主要是 Broker 進程的線程信息、JVM 堆、GC 情況等。
特定監控中,主要包括以下幾點:
1. Kafka 集群的 QPS 監控
Kafka 提供了 JMX 監控接口,可通過獲取 MBean "Kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec" 中 OneMinuteRate 屬性的值得到。QPS 超出一定閾值時發出報警。
2. Producer 端消息積壓監控
日志收集中間件在發送消息至 Kafka 時會在 checkpoint 文件中記錄當前處理的文件的最后修改時間和偏移量,通過計算當前的偏移量和系統中最新的文件的偏移量的差值即可得知消息積壓量。
3. Topic 級別的消息寫入監控
正常情況下,系統在一段時間內向一個 Topic 寫入的數據量應該有一個大致的范圍,寫入量過多時,可能是 QPS 過高,寫入量過少時,有可能是 producer 端出現寫入堵塞,或者業務出現驟降,都屬于異常情況,可以發出報警通知相關人員。此類監控可通過 Kafka 提供的工具實現,兩次運行$KAFKA_HOME/bin/Kafka-run-class.sh Kafka.tools.GetOffsetShell --broker-list ${brokerhostname:port} --time -1 --topic${topicName},如果兩次運行獲取的 logEndOffset 差值不在既定范圍內,說明流量可能異常,發出報警。
4. 消費延遲監控
當消息寫入速率高于 consumer 處理消息速率時,會出現消息在 Kafka 中堆積的情況,為避免大量消息積壓在 Kafka 集群內部,可增加消費延遲監控,在出現消息堆積時發出報警,相關人員可據此判斷是否可以優化 consumer 處理效率或對 consumer 做擴容。具體的監控方法也可以使用 Kafka 提供的工具實現,運行$KAFKA_HOME/bin/Kafka-run-class.sh Kafka.tools.ConsumerOffsetChecker --zkconnect ${zkPath} --group ${consumer.groupid} -topic ${topicName},可得到指定消費組對指定 topic 的每個 partition 的未消費消息量,當該值大于指定閾值時,發出報警。
上述監控主要對端到端的消息積壓和延時以及 QPS 進行監控,使得端到端的響應時間是可控的,并且能夠在出現異常時及時報警。
回頁首
總結
搜狗商業平臺致力于使用 Kafka 來統一商業平臺基礎消息設施,目前正支撐著眾多線上業務的發展,每天承載著十億級消息的流入流出。我們接下來會在 Kafka 安全方面做進一步的探索和研究,包括用戶級的認證機制、增加客戶端配額限制等,此外,我們也會持續完善 Kafka Admin端,在集群易用性方面做更多的優化。