Spotify如何對Apache Storm進行規模擴展
原文 http://www.infoq.com/cn/articles/how-spotify-scales-apache-storm
【編者的話】Spotify是一家音樂流媒體服務商,最新的數據顯示他們已經有6000萬用戶。 Spotify內部使用 Apache Storm來構建實時類系統,包括廣告定位、音樂推薦以及數據可視化等。本文來自Spotify官方技術博客,介紹了Spotify公司如何使用Apache Storm來構建可擴展的個性化系統的。
Spotify已經在很多業務中使用Apache Storm來構建多種實時系統,包括 廣告定位 、 音樂推薦 以及 數據可視化 等。其中每一種實時系統都將Apache Storm與其它不同系統加以結合,例如Kafka、Cassandra、Zookeeper以及其它數據輸入和輸出的系統。Spotify在全球范圍內擁有 超過五千萬的活躍用戶 ,所以在應用構建時需要考慮可擴展性以保證應用的性能以及高可用性。
Thinking Scalability
面對資源線性增長所帶來的負載壓力提升,可擴展性已經成為軟件保持理想性能表現的前提。不過要真正實現可擴展目標,單靠添加資源并對性能進行調整 還遠遠不夠。具體而言,可擴展性要求大家對軟件方案的設計、質量、可維護性以及性能等方面整體考量。當我們構建應用程序時,首先應該從以下幾個方面對可擴 展性進行規劃(保證應用可擴展性的必要條件):
- 軟件具備良好的 架構 與較高的 質量 。
- 軟件應該易于 發布 、 監控 與 修改 。
- 軟件 性能 可以跟得上資源線性提升所帶來的額外負載增長。
Storm中的可擴展性
那么對Storm流程加以擴展需要在哪些方面做出努力?下面我將通過自己的實時個性化Storm系統作為實例,向大家闡述可擴展能力中的方方面面。
在我們的個性化系統中,使用Kafka集群的Topic(譯者注:一個Topic可以認為是一類消息,每個Topic將被分成多個 partition)來處理不同類型的事件,比如歌曲完成與廣告曝光。我們的個性化Storm拓撲可以訂閱不同的用戶事件,并將這些事件與讀取自 Cassandra的實體元數據(例如歌曲流派)相結合,然后將每位用戶的事件進行分組,進而通過某種包含聚合與推導機制的算法計算出用戶屬性。這些用戶 屬性會被寫入Canssandra,最后會被多種后端服務使用以提供個性化的用戶體驗。
設計與質量
當我們將隨著時間推移將更多新功能添加到以上個性化流程當中時,我們的拓撲結構開始變得復雜,并直接導致性能調整與事件流調試的難度越來越高。不過較高的測試覆蓋率讓我們對自身的代碼質量充滿信心,因此我們認為自己有能力對拓撲進行快速重構并使其投入正常運作。
拓撲架構
在將復雜拓撲轉化為小型可維護拓撲的整個轉換周期當中,我們通過實際操作得到了以下啟示:
- 為不同任務流創建 小型邏輯拓撲
- 通過 共享庫 而非共享邏輯的方式提高代碼的可重復使用度
- 保證方法容易被 測試
- 并行 與 批量 處理會降低IO操作
質量
我們已經使用Java開發了自己的流程,并借助JUnit對不同計算Bolt內的業務邏輯進行了測試。我們還利用backtype.storm.testing并通過集群模擬進行了端到端的測試。
可維護性
為了將軟件輕松部署在集群內的新主機中并對其運行狀況進行監控,我們采取了一系列措施來簡化維護。
配置
對外暴露所有可調參數,這可以讓我們在不變更任何代碼的前提下實現軟件調整,同時也讓我們能夠更輕松地實現小型增量變更并觀察其實際影響。我們將 bolt parallelism、source endpoints、sink endpoints以及其它拓撲性能參數映射到了一個配置文件中。
指標可視化
我們為拓撲指標創建了一套儀表板(dashboard),旨在整體評估其運作狀態并進行問題排查。我們采用高級度量指標(詳見下圖)對整套系統的運行狀態加以匯總,因為在面對一套充滿了各類指標,但又缺乏重點傾向性的儀表板時,大家往往很難從中找到真正值得關注的信息。
拓撲部署
我們這套個性化流程中的全部計算任務都是冪等的,我們還設計出了自己的部署方案,在事件處理中允許少量的重復以確保部署過程中不會丟失消息。這套 方案并不適用于全部用例,特別是在計算任務以事務形式存在的情況下。在以上圖表中,各事件由左至右依次排列,其中t1到t8代表著不同時間戳。
在我們的部署方案當中,我們需要確保Storm集群可以同時運行兩套個性化拓撲。在t1時間點上,集群運行的是個性化拓撲的v1版本。當我們準備 好發布該個性化拓撲的v2版本時,我們會構建并將v2提交至該集群。在t4時間點上,我們的集群正在同時運行這兩個版本。每套拓撲都會使用一個唯一的 Kafka consumer groupId,從而確保topic內的全部信息都被交付給這兩套版本。在此階段中信息會經過兩次處理,但由于計算的冪等屬性,這并不會造成任何問題。在 t5時間點上,我們停用v1版本,這意味著該版本將不再消費來自Kafka集群的事件。接下來,我們對v2版本的運行圖表進行監控,并確保所有指標都處于 正常范圍之內。如果一切順利,我們會移除v1版本,并在t8時間點上讓集群僅運行v2版本。不過在t7時間上,如果指標圖表顯示異常狀況,我們會激活v1 版本并使其延續停用時的狀態,繼續消費來自Kafka的事件。此時我們還將停用v2版本,這從本質上講正是我們打造的回滾機制。擁有安全的回滾機制能幫助 我們在不斷推出小型高頻度變更的同時,將相關風險控制在最低程度。
監控與警報
我們會對集群、拓撲、Source、Sink指標加以監控,并為其中的部分高級指標設定警報機制。這是為了避免冗余報警令管理員身心俱疲,甚至忽略掉真正重要的關鍵性警報。
性能
隨著時間的推移,我們已經監控到不同的系統瓶頸與相關的問題,也通過一系列調整將性能維持在理想水平。要獲得與預期相符的性能表現,大家還需要選擇正確的硬件方案。
硬件
我們最初將自己的拓撲運行在一套共享式Storm集群當中,但隨著時間推移,我們發現繁忙的拓撲導致其資源匱乏,并由此引發資源瓶頸。有鑒于此,我們開始 使用一套獨立的Storm集群,其實這并不算什么難事。現在我們的集群每天要處理超過30億個事件。整套集群中包含6臺主機,每臺主機配備24個計算核 心、雙線程以及32GB內存。即使是使用這套小型集群,我們仍然獲得了良好的運行狀態。而且在部署過程中需要并行運行2個個性化拓撲版本時,其處理強度與 最大利用率仍然相去甚遠。未來我們還會考慮在自己的Hadoop集群上將 Storm與YARN 加以結合,從而帶來更理想的資源利用率與彈性擴展能力。
吞吐能力與延遲水平
為了獲得理想的吞吐能力與延遲水平,我們需要對source與sink參數進行調節。此外我們還做出了其它一系列調整,包括緩存、并行性以及并發性等等,詳見下文。
Source和Sink調整
Kafka調整
- 我們配置了rebalancing.max.tries文件以盡可能減少常見的 rebalance錯誤 。
- 在每個版本中為不同Kafka Spout采用不同group id,從而確保新拓撲版本在部署過程中遵循冗余信息處理原則。
Cassandra調整
- 為不同TTL采用不同表。由于無需使用,我們還設置gc_grace_period=0以有效對包含TTL的行組進行讀取修復禁用。
- 使用 DateTieredCompactionStrategy 處理臨時性數據。
- 從Storm拓撲到Cassandra,對開放連接數量加以控制。
- 配置Snitch以確保調用路由的正確性。
并發問題
Storm中的OutputCollector并 非線程安全 ,也無法保證面向多線程的安全訪問——例如對異步處理流程中的Future進行回調。我們利用 java.util.concurrent.ConcurrentLinkedQueue對ack/emit Storm tuples的調用進行安全保存,并在bolt內方法執行之初對其加以刷新。
并行調節
我們從 Strata 2014 大會的Storm主題演講中得到了靈感,進而對拓撲當中的并行機制作出調節。以下幾項指導性意見在我們的實例中帶來相當出色的表現:
- 每拓撲每節點1 worker
- 對于CPU綁定類任務,每計算核心1 executor
- 對于IO綁定類任務,每計算核心1至10 executor
- 計算總體并行可能性并將其劃分為低速任務與調整任務。低速任務并行性較高,調整任務并行性較低。
Bolt緩存處理
為了維持Bolt當中用戶屬性計算擁有良好的狀態,我們需要從外部與內存兩類緩存機制當中做出選擇。我們更傾向于使用內存緩存方案,這是因為外部 緩存往往會帶來網絡IO負擔,產生不必要的延遲并增加新的故障點。不過說到內存內緩存,我們并沒有持久化或者限制內存資源。事實上,我們不太在意持久化, 因此只需要著手處理內存限制問題即可。我們最終選擇了Guava的Expirable Cache,我們可以在其中定義元素及過期數量上限,從而控制緩存的整體規模。總而言之,這套方案幫助我們在自有集群當中實現了內存的優化利用。
我們通過一整套方案來擴展Storm系統,有了這個系統的支持,我們可以為那些不斷增加的活躍用戶提供更多的新功能,同時,Storm系統也可以毫無壓力地繼續為我們提供高可用的服務。
原文英文鏈接: https://labs.spotify.com/2015/01/05/how-spotify-scales-apache-storm/