使用Akka來優化Spark+ElasticSearch的準實時系統
假如有這樣一個場景:系統每秒鐘都會收到大量的事件,每個事件又包含很多參數,用戶不僅需要準實時地還需要定期地判斷每一種事件、事件的每一種參數值的組合是否超過了系統設定的閾值。面對這一場景,用戶應該采用什么樣的方案呢?最近,來自于 Premium Minds 的軟件架構師 André Camilo 在博客上發表了一篇文章,介紹了他們是 如何使用Akka解決這一棘手問題的 。
在該文章中André Camilo首先介紹了他們的應用場景:
我們的系統每秒鐘最多會收到幾百個事件,有些事件有8個參數,有些事件有超過240,000個參數值的組合(*假如有一個 PhoneCall(phoneNumber, countryCode, geoZone)事件,該事件有三個參數,其中phoneNumber有4,000 個值, countryCode有5個值,geoZone有10個值,那么可能的參數值組合約為(4000+1)(5+1)(10+1)=240k個*),我們不 僅需要實時地判斷這些事件以及參數值的組合是否超過了系統設定的閥值,還要保留最近30分鐘的數據,以便于判斷在這段時間內它們出現的頻率是否也超過了閥 值。
處理該問題最簡單的方式或許就是將這些數據都存起來,然后每隔一秒鐘就去計算每一種組合出現的頻率,但是事實上這是無法實現的,因為這樣每秒鐘會有超過240,000個查詢,系統是無法承受的。 André Camilo 給出的第一種方案是使用Spark和ElasticSearch:
我們創建了一個Spark Streaming的數據流管道,該管道首先從JMS隊列中讀取消息并將其轉換成PhoneCall事件,然后根據事件的參數值將一個事件分離成多個事 件,之后再使用countByWindow函數計算每一種事件組合的頻率,最后檢查每種組合的平均頻率是否超過了閾值。在使用countByWindow 計算時,每秒鐘都會設置一個30分鐘的窗口,同時函數輸出值會除以1800秒以得到每個窗口的平均頻率,最終結果使用ElasticSearch集群存 儲。
該方案的流程如下:
這一方案雖然可行,但是并沒有解決André Camilo的問題,不是因為Spark不行,而是因為雖然Spark Streaming能夠處理大量的實時數據,但是卻無法處理大量的窗口。在André Camilo的實驗中,如果組合數低于1000,那么這種方案能夠工作的很好,但是如果超出了這一數量,那么就會導致內存溢出問題。
André Camilo給出的第二種方案是使用 Akka :
- 對每一種參數值的組合創建一個組合Actor
- 創建一個負責接收所有事件的Actor,該Actor根據事件的參數值將一個事件分離成多個事件,并根據參數組合的對應關系將分離后的事件發送到步驟1創建的組合Actor
- 每一個組合Actor通過 環形緩沖區 存儲最近30分鐘的事件數(單位為秒),每過一秒,該緩沖區就滾動一個位置,同時該Actor會計算事件的頻率,檢查該頻率是否超過了系統設定的閾值,并將結果發送到ElasticSearch Actor
- ElasticSearch Actor僅僅是一個ActorPublisher,負責將數據發送到 ElasticSearch流驅動
第二種方案的流程如下:
環形緩沖區的結構如下:
你可能會問,為每一種組合創建一個Actor會不會導致Actor太多?André Camilo告訴我們,對Akka這個超輕量級的事件驅動框架來說這都不是問題。使用該方案André Camilo在一個i7 4GB的筆記本上輕松解決了800個事件的分離處理。更為重要的是,Akka支持水平擴展,如果系統有更多的參數值組合,或者需要更大的吞吐量,那么只需 要增加更多的機器即可。
最后,André Camilo的結論是:Spark有非常好的特性,它的解決方案更簡單、更直觀,但不太適合這個場景。Akka非常適合處理CPU敏感的問題,Actor模型更適合處理高并發的問題。