【譯】使用Apache Kafka構建流式數據平臺(1)
前言:前段時間接觸過一個流式計算的任務,使用了阿里巴巴集團的JStorm,發現這個領域值得探索,就發現了這篇文章——Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform(Part 1)。在讀的過程中半總結半翻譯,形成本文,跟大家分享。
最近你可能聽說很多技術名詞,例如“流式處理”、“事件數據”以及“實時”等,與之相關的技術有Kafka、Storm、Samza或者是Spark的Steam Model。這些新興的技術令人興奮,不過還沒有多少人知道如何將這些技術添加到自己的技術棧中,如何實際應用于項目中。
這篇指南討論我們關于實時數據流的工程經驗:如何在你的公司內部搭建實時數據平臺、如何使用這些數據構建應用程序,所有這些都是基于實際經驗——我們在Linkdin花了五年時間構建Apache Kafka,將Linkdin轉換為流式數據架構,并幫助硅谷的很多技術公司完成了同樣的工作。
這份指南的第一部分是關于流式數據平臺(steam data platform)的概覽:什么是流式數據平臺,為什么要構建流式數據平臺;第二部分將深入細節,給出一些操作規范和最佳實踐。
何為流式數據平臺?
流式數據平臺:簡潔、輕量的事件處理
我們在Linkein構建Apache Kafka的目的是讓它作為數據流的中央倉庫工作,但是為什么要做這個工作,有下面兩個原因:
- 數據整合:數據如何在各個系統之間流轉和傳輸;
- 流式處理:通常在數據倉庫或者Hadoop集群中需要做豐富的數據分析,同時實現低延時。
接下來介紹下上述兩個理論的提出過程。起初我們并沒有意識到這些問題之間有聯系,我們采取了臨時方案:只要需要,就在系統和應用程序之間建造數據通道或者給web服務發送異步請求。隨著時間推移,系統越來越復雜,我們在幾乎所有子系統之間都建立了不同的數據通道:

每個數據通道都有自己的問題:日志數據的規模很大但是數據有缺失,并且數據傳輸的延遲很高;Oracle數據庫實例之間的數據傳輸速度快、準確而且實時性好,但是其他系統不能及時快速得獲得這些數據;Oracle數據庫的數據到Hadoop集群的數據通道吞吐量很高,但是只能進行批次操作;搜索系統數據通道的延遲低,不過數據規模小,并且是直接連接數據庫;消息系統數據通道的延遲低,但是不可靠且規模小。
隨著我們在全球各地添加數據中心,我們也要為這些數據流添加對應的副本;隨著系統規模的增長,對應的數據通道規模也應該相應得增長,整個系統面臨的壓力越來越大。我認為我的團隊與其說是由分布式系統工程師組成,還不如說是由一些管道工組成。
更糟的是,復雜性過高導致數據不可靠。由于數據的索引和存儲存在問題,導致我們的報告可信度降低。員工需要花費大量時間處理各種類型的臟數據,記得有在處理一起故障中,我們在兩個系統中發現一些非常類似但存在微小差異的數據,我們費了很大力氣檢查這兩個數據哪個是爭取額的,最后發現兩個都不對。
與此同時,我們除了要做數據遷移,還想對數據進行進一步的處理和分析。Hadoop平臺提供了批處理、數據打包和專案(ad hoc)處理能力,但是我們還需要一個實時性更好的數據處理平臺。我們的很多系統——特別是監控系統、搜索索引的數據通道、數據分析應用以及安全分析應用,都需要秒級的響應速度,但是這類型的應用在上圖的系統架構中表現很差。
2010年左右,我們開始構建一個系統:專注于實時獲取流式數據(stream data),并規定各個系統之間的數據交互機制也以流式數據為承載,同時還允許對這些流式數據進行實時處理。這就是Apache Kafka的原型。
我們對整個系統的構想如下所示:

很長一段時間內我們都沒有為我們所構建的這個系統取名字,僅僅稱之為“Kafka stuff”或者“global commit log thingy”,隨著時間推移,我們開始將這個系統中的數據稱之為流式數據(steam data),而負責處理這種類型的數據的平臺稱之為流式數據平臺(steam data platform)。
最終我們的系統從前文描述的跟“意大利面條”一樣雜亂進化為清晰的以流式數據平臺為中心的系統:

在這個系統中Kafka的角色是通用數據管道。每個子系統都可以很容易得接入到這個中央數據管道上;流式處理應用可以接入到該數據管道上,并對外提供經過處理后的流式數據。這種固定格式的數據類型成為各個子系統、應用和數據中心之間的通用語言。舉個例子說明:如果一個用戶更新了他的個人信息,這個更新信息會流入我們的系統處理層,在系統處理層會對該用戶的公司信息、地理位置和其他屬性進行標準化處理;然后這個數據流會流入搜索引擎和社區地圖用于查詢和檢索、這個數據也會流入推薦系統進行工作匹配;所有的這些動作只需要毫秒量級的時間,最后這些數據會流入Hadoop數據倉庫。
LinkedIn內部在大量使用這套系統,每天為數百個數據中心處理超過5000億事件請求,該系統已經成為其他系統的數據后臺、成為Hadoop集群的數據管道,以及流式處理的Hub。
由于Kafka開源,因此有很多公司在做類似的事情:Kafka Powered By
接下來我們將論述流式數據平臺的一些細節:該平臺的工作原理、該平臺解決了什么重要問題。
流式數據(Steam Data)
大部分業務邏輯可以理解為事件流(steam of events)。零售業有訂單流、交易流、物流信息流、價格調整事件流,以及各類調用的返回值等等;金融行業有訂單流、股票價格變更事件流,以及其他金融行業的信息流;網站有點擊流、關注流(impressions)、搜索流等等。在大規模的軟件系統中還有請求流、錯誤流、機器監控信息流和日志流。總之,業務邏輯可以從整體上當作一種數據處理系統——接收多種輸入流并產生對應的輸出流(有時還會產生具體的物理產品)。
這種概念對于習慣于將數據想象為數據庫中的一行的同學可能有點陌生,接下來我們看一點關于事件流數據的實際例子。
事件觸發和事件流
數據庫中存放的是數據的當前狀態,當前狀態是過去的某些動作(action)的結果,這些動作就是事件。庫存表保存購買和交易事件產生的結果,銀行結余存放信貸和借記事件的結果;Web Server的延時圖是一系列HTTP請求的聚合。
當談論大數據時,很多人更青睞于記錄上述提到的這些事件流,并在此基礎上進行分析、優化和決策。某種層度上來說,這些事件流是傳統的數據庫沒有反應出來的一面:它們表示業務邏輯。
事件流數據在金融行業已經廣泛使用:股票發行、市場預測、股票交易等數據都可以當作是事件流,但是技術屆使得搜集和使用這些數據的現代技術開始流行。Google將廣告點擊流和廣告效果轉化為幾十億美金的收入。在web開發屆,這些事件數據又被稱為日志數據,由于缺乏針對日志處理的模塊,這些日志事件就存放在日志文件中。Hadoop之類的系統經常用于日志處理,但是根據實際情況,稱之為“批量事件存儲和處理(batch event storage and processing)”更合適。
網絡公司應該是最早開始記錄事件流的公司,搜集網站上的事件數據非常容易:在某些特定節點加一些代碼即可記錄和跟蹤每個用戶在改網站上的行為。即使是一個單頁面或者是某個流行網站上的移動窗口也能記錄很多類似的行為數據用于分析和監控。
你可能聽說過“機器產生的數據”這個概念,其實跟事件數據表示相同的含義。某種程度上所有的數據都是機器產生的,因為這些數據來自計算機系統。
還有很多人在談論設備數據和“物聯網(internet of things)”。不同的人對這些名詞有各自的理解,但是這個物聯網的核心也在于針對某些數據集進行分析和決策,只不過我們這里的分析對象是大規模網絡系統,而物聯網的分析對象是工業設備或者消費產品。
數據庫是事件流
事件流數據很適合描述日志數據或諸如訂單、交易、點擊和貿易這些具備明顯事件特征的數據。和大多數開發人員相同,你可能將自己系統的大部分數據保存在各種數據庫中:關系型數據庫(Oracle、MySQL和Postgres)或者新興的分布式數據庫(MongoDB、Cassandra和Couchbase),這些數據可能不容易理解為事件或者事件流。
但實際上,數據庫中存儲的數據也可理解為一種事件流(event steam),簡單來說,數據庫可以理解為創建數據備份或者建立備庫的過程。做數據備份的主要方法是周期性得導出數據庫內容,然后將這些數據導入到備庫中。如果我很少進行數據備份,或者是我的數據量不大,那么可以進行全量備份。實際上,隨著備份頻率的提高,全量備份不再可行:如果兩天做一次全量備份,將會耗費兩倍的系統資源、如果每個小時做一次全量備份,則會耗費24倍的系統資源。在大規模數據的備份中,顯然增量備份更加有效:只增加新創建的、更新的數據和刪除對應的數據。利用增量備份,如過我們將備份頻率提高為原來的1倍,則每次備份的數量將減少幾乎一半,消耗的系統資源也差不多。
那么為什么我們不盡可能提高增量備份的頻率呢?我們可以做到,但是最后只會得到一系列單行數據改變的記錄——這種事件流稱之為變更記錄,很多數據庫系統都有負責這個工作的模塊(Oracle數據庫系統中的XStreams和GoldenGate、MySQL有binlog replication、Postgres有Logical Log Steaming Replication)。
綜上,數據庫的變更過程也可以作為事件流的一部分。你可以通過這些事件流同步Hadoop集群、同步備庫或者搜索索引;你還可以將這些事件流接入到特定的應用或者流式處理應用中,從而發掘或者分析出新的結論。
流式數據平臺解決的問題?
流式數據平臺有兩個主要應用:
- 數據整合:流式數據平臺搜集事件流或者數據變更信息,并將這些變更輸送到其他數據系統,例如關系型數據庫、key-value存儲系統、Hadoop或者其他數據倉庫。
- 流式處理:對流式數據進行持續、實時的處理和轉化,并將結果在整個系統內開放。
在角色1中,流式數據平臺就像數據流的中央集線器。與之交互的應用程序不需要考慮數據源的細節,所有的數據流都以同一種數據格式表示;流式數據平臺還可以作為其他子系統之間的緩沖區(buffer)——數據的提供者不需要關心最終消費和處理這些數據的其他系統。這意味著數據的消費者與數據源可以完全解耦合。
如果你需要部署一個新的系統,你只需要將新系統接入到流式數據平臺,而不需要為每個特定的需求選擇(并管理)各自的數據庫和應用程序。不論數據最初來自日志文件、數據庫、Hadoop集群或者流式處理系統,這些數據流都使用相同的格式。在流式數據平臺上部署新系統非常容易,新系統只需要跟流式數據平臺交互,而不需要跟各種具體的數據源交互。
Hadoop集群的設計目標是管理公司的全量數據,直接從HDFS中獲取數據是非常耗費時間的方案,而且直接獲取的數據不能直接用于實時處理和同步。但是,這個問題可以反過來看:Hadoop等數據倉庫可以主動將結果以流式數據的格式推送給其他子系統中。
流式數據平臺的角色2包含數據聚合用例,系統搜集各類數據形成數據流,然后存入Hadoop集群歸檔,這個過程就是一個持續的流式數據處理。流式處理的輸出還是數據流,同樣可以加載到其他數據系統中。
流式處理可以使用通過簡單的應用代碼實現,這些處理代碼處理事件流并產生新的事件流,這類工作可以通過一些流行的流式處理框架完成——Storm、Samza或Spark Streaming,這些框架提供了豐富的API接口。這些框架發展得都不錯,同時它們跟Apache Kafka的交互都很好。
流式數據平臺需要提供的能力?
在上文中我提到了一些不同的用例,每個用例都有對應的事件流,但是每個事件流的需求又有所不同——有些事件流要求快速響應、有些事件流要求高吞吐量、有些事件流要求可擴展性等等。如果我們想讓一個平臺滿足這些不同的需求,這個平臺應該提供什么能力?
我認為對于一個流式數據平臺,應該滿足下列關鍵需求:
- 它必須足夠可靠,以便于處理嚴苛的更新,例如將某個數據庫的更新日志變更為搜索索引的存儲,能夠順序傳輸數據并保證不丟失數據;
- 它必須具備足夠大的吞吐量,用于處理大規模日志或者事件數據;
- 它必須具備緩沖或者持久化數據的能力,用于與Hadoop這類批處理系統交互。
- 它必須能夠為實時處理程序實時提供數據,即延時要足夠低;
- 它必須具備良好的擴展性,可以應付整個公司的滿負載運行,并能夠集成成百上千個不同團隊的應用程序,這些應用以插件的形式與流式數據平臺整合。
- 它必須能和實時處理框架良好得交互
流式數據平臺是整個公司的核心系統,用于管理各種類型的數據流,如果該系統不能提供良好的可靠性以及可擴展性,系統會隨著數據量的增長而再次遭遇瓶頸;如果該系統不支持批處理和實時處理,那么就不能與Hadoop或者Storm這類系統整合。
Apache Kafka
Apache Kafka是專門處理流式數據的分布式系統,它具備良好的容錯性、高吞吐量、支持橫向擴展,并允許地理位置分布的流式數據處理。
Kafka常常被歸類于消息處理系統,它確實扮演了類似的角色,但同時也提供了其他的抽象接口。在Kafka中最關鍵的抽象數據結構是用于記錄更新的commit log:

數據生產者向commit log隊列中發送記錄流,其他消費者可以像水流一樣在毫秒級延時處理這些日志的最新信息。每個數據消費者在commit log中有一個自己的位置(指針),并獨立移動,這使得可靠、順序更新能夠分布式得發送給每個消費者。
這個commit log的作用非常關鍵:可以多個生產者和消費者共享,并覆蓋一個集群中的多臺機器,每臺機器都可用作容錯保障;可以提供一個并行模型,其具備的順序消費的特點使得Kafka可以用于記錄數據庫的變更。
Kafka是一個現代的分布式系統,存儲在一個集群的數據(副本和分片存儲)可以水平擴張和縮小,同時上層應用對此毫無感知。數據消費者的機器數量可以隨數據規模的增長而水平增加,同時可以自動應對數據處理過程中發生的錯誤。
Kafka的一個關鍵設計是對持久化的處理相當好,Kafka的消息代理(broker)可以存儲TB量級的數據,這使得Kafka能夠完成一些傳統數據庫無法勝任的任務:
- 接入Kafka的Hadoop集群或者其他離線系統可以放心得停機維護,間隔幾小時或者幾天后再平滑接入,因為在它停機期間到達的流式數據被存儲在Kafka的上行集群。
- 在首次執行同步數據庫的任務時可以執行全量備份,以便讓下行消費者訪問全量數據。
上述這些特性使得Kafka能夠提供比傳統的消息系統更廣的應用范圍。
事件驅動的應用
自從我們將Kafka開源后,我們有很多機會與其他想做類似的事情的公司交流和合作:研究如何Kafka系統的部署以及Kafka在該公司內部技術架構的角色如何隨著時間演進和改變。
初次部署常常用于單個的大規模應用:日志數據處理,并接入Hadoop集群;也可能是其他數據流,該數據流的規模太大以至于超出了該公司原有的消息系統的處理能力。
從這些用例延伸開來,在接入Hadoop集群后,很快就需要提供實時數據處理的能力,現存的應用需要擴展和重構,利用現有的實時處理框架更高效得處理流式數據。以LinkedIn為例,我們最開始是利用Kafka處理job信息流,并將job信息存入Hadoop集群,然后很多ETL-centric的應用需求開始出現,這些job信息流開始用于其他子系統,如下圖所示:

在這張圖中,job的定義不需要一些定制就可以與其他子系統交互,當上游應用(移動應用)上出現新的工作信息時,就會通過Kafka發送一個全局事件,下游的數據處理應用只需要響應這個事件即可。
流式數據平臺與現存中間件的關系
我們簡單講下流式數據平臺與現存的類似系統的關系。
消息系統(Messaging)
流式數據平臺類似于企業消息系統——它接收消息事件,并把它們發布到對應事件的訂閱者。不過,二者有三個重要的不同:
- 消息系統通常是作為某個應用中的一個組件來部署,不同的應用中有不同的消息系統,而流式數據平臺希望成為整個企業的數據流Hub。
- 消息系統與批處理系統(數據倉庫或者Hadoop集群)的交互性很差,因為消息系統的數據存儲容量有限;
- 消息系統并未提供與實時處理框架整合的API接口。
換句話說,流式數據平臺可以看作在公司級別(消息系統的級別是項目)設計的消息系統。
數據聚合工具(Data Integration Tools)
為了便于跟其他系統整合,流式數據平臺做了很多工作。它的角色跟Informatica這類工具不同,流式數據平臺是可以讓任何系統接入,并可以圍繞該平臺構建不同的應用。
流式數據平臺與數據聚合工具有一點重合的實踐:使用一個統一的數據流抽象,保證數據格式相同,這樣可以避免很多數據清洗任務。我會在這個系列文章的第二篇仔細論述這個主題。
企業服務總線(Enterprise Service Buses)
我認為流式數據平臺借鑒了很多企業服務總線的設計思想,不過提供了更好的實現方案。企業服務總線面臨的挑戰就是自身的數據傳輸效率很低;企業服務總線在部署時也面臨一些挑戰:不適合多租戶使用(PS,此處需要看下原文,歡迎指導)。
流式數據平臺的優勢在于數據的傳輸與系統本身解耦合,數據的傳輸由各個應用自身完成,這樣就能避免平臺自身成為瓶頸。
變更記錄系統(Change Capture Systems)
常規的數據庫系統都有類似的日志機制,例如Golden Gate,然而這個日志記錄機制僅限于數據庫使用,并不能作為通用的事件記錄平臺。這些數據庫自帶的日志記錄機制主要用于同類型數據庫(eg:Oracle-to-Oracle)之前的互相備份。
數據倉庫和Hadoop
流式數據平臺并不能替代數據倉庫,恰恰相反,它為數據倉庫提供數據源。它的身份是一個數據管道,將數據傳輸到數據倉庫,用于長期轉化、數據分析和批處理。這個數據管道也為數據倉庫提供對外輸出結果數據的功能。
流式處理系統(Steam Processing Systems)
常用的流式處理框架,例如Storm、Samza或Spark Streaming可以很容易得跟流式數據平臺整合。這些流式數據處理框架提供了豐富的API接口,可以簡化數據轉化和處理。
流式數據平臺的落地與實踐
我們不只是提出了一個很好的想法,我們面臨的需求很適合將自己的想法落地。過去五年我們都在構建Kafka系統,幫助其他公司落地流式數據平臺。今天,在硅谷有很多公司在實踐這套設計思路,每個用戶的行為都被實時記錄并處理。
前瞻
我們一直在思考如何使用公司掌握的數據,因此構建了Confluent平臺,該平臺上有一些工具用來幫助其他公司部署和使用Apache Kafka。如果你希望在自己的公司部署流式數據處理平臺,那么Confluent平臺對你絕對有用。
還有一些用的資源:
- 我之前寫過的blog post和小書,討論的主題包括Kafka中的日志抽象、數據流和數據系統架構等;
- Kafka的官方文檔也很有用;
- 在這里有關于Confluent平臺的更多介紹
這個教程的下篇將會論述在構建和管理數據流平臺中的一些實踐經驗。