Storm企業級應用:實戰、運維和調優——1.1 什么是實時流計算

來自: http://www.adintellig.com/storm-in-action-1-1/

《Storm企業級應用:實戰、運維和調優》原版授權,未經允許不得轉載!

1.1    什么是實時流計算

所謂實時流計算就是近幾年由于數據得到廣泛應用之后,數據持久性建模不滿足現狀下,急需數據流的瞬時建模或者計算處理。這種實時計算的應用的實例有金融服務、網絡監控、電信數據管理、Web應用、生產制造、傳感檢測等等。在這種數據流模型中,單獨的數據單元可能是相關的元組(Tuple),例如網絡測量、呼叫記錄、網頁訪問等產生的數據。但是,這些數據以大量、快速、時變(可能是不可預知)的數據流持續到達,由此產生了一些基礎性的新的研究問題——實時計算,實時計算的一個重要方向就是實時流計算。

1.1.1   實時流計算背景

數據的價值隨著時間的流逝而降低,所以事件出現后必須盡快地對它們進行處理,最好數據出現時便立刻對其進行處理,發生一個事件進行一次處理,而不是緩存起來成一批處理。例如商用搜索引擎,像Google、Bing和Yahoo !等,通常在用戶查詢響應中提供結構化的Web結果,同時也插入基于流量的點擊付費模式的文本廣告。為了在頁面上最佳位置展現最相關的廣告,通過一些算法來動態估算給定上下文中一個廣告被點擊的可能性。上下文可能包括用戶偏好、地理位置、歷史查詢、歷史點擊等信息。一個主搜索引擎可能每秒鐘處理成千上萬次查詢,每個頁面都可能會包含多個廣告。為了及時處理用戶反饋,需要一個低延遲、可擴展、高可靠的處理引擎。

對于這些實時性要求很高的應用,若把持續到達的數據簡單地放到傳統數據庫管理系統(DBMS)中,并在其中進行操作,是不太切實的。傳統的DBMS并不是為快速連續的存放單獨的數據單元而設計的,而且也并不支持“持續處理”,而“持續處理”是數據流應用的典型特征。另外,現在人們都認識到,“近似性”和“自適應性”是對數據流進行快速查詢和其他處理(如數據分析和數據采集)的關鍵要素,而傳統DBMS的主要目標恰恰與之相反:通過穩定的查詢設計,得到精確的答案。

另外一些方案是采用MapReduce來進行實時數據流處理。但是,盡管MapReduce做了實時性改進,仍然很難穩定地滿足應用需求。這是因為Hadoop MapReduce框架為批處理做了高度優化,系統典型地通過調度批量任務來操作靜態數據,任務不是常駐服務,數據也不是實時流入;而數據流計算的典型范式之一是不確定數據速率的事件流流入系統,系統處理能力必須與事件流量匹配。

1.1.2   實時計算應用場景

互聯網領域的實時流計算一般都是針對海量數據進行的,除了像非實時計算的需求(如計算結果準確)以外,實時計算最重要的一個需求是能夠實時響應計算結果,一般要求為秒級。個人理解,互聯網行業的實時計算可以分為以下兩種應用場景。

1) 數據源是實時的不間斷的,要求對用戶的響應時間也是實時的。

主要用于互聯網流式數據處理。所謂流式數據是指將數據看作是數據流的形式來處理。數據流則是在時間分布和數量上無限的一系列數據記錄的集合體;數據記錄是數據流的最小組成單元。舉個例子,對于大型網站,活躍的流式數據非常常見,這些數據包括網站的訪問PV/UV、用戶訪問了什么內容,搜索了什么內容等。實時的數據計算和分析可以動態實時地刷新用戶訪問數據,展示網站實時流量的變化情況,分析每天各小時的流量和用戶分布情況,這對于大型網站來說具有重要的實際意義。

2) 數據量大且無法或沒必要預算,但要求對用戶的響應時間是實時的。

主要用于特定場合下的數據分析處理。當數據量很大,同時發現無法窮舉所有可能條件的查詢組合或者大量窮舉出來的條件組合無用的時候,實時計算就可以發揮作用,將計算過程推遲到查詢階段進行,但需要為用戶提供實時響應。

1.1.3   實時計算處理流程

互聯網上海量數據(一般為日志流)的實時計算過程可以被劃分為以下三個階段:數據的產生與收集階段、傳輸與分析處理階段、存儲對對外提供服務階段。下面分別進行簡單的介紹如圖1-1所示。

圖1-1    實時計算處理流程

(1)           數據實時采集

需求:功能上保證可以完整的收集到所有日志數據,為實時應用提供實時數據;響應時間上要保證實時性、低延遲在1秒左右;配置簡單,部署容易;系統穩定可靠等。

目前,互聯網企業的海量數據采集工具,有非死book開源的 Scribe 、LinkedIn開源的 Kafka 、Cloudera開源的 Flume ,淘寶開源的 TimeTunnel 、Hadoop的 Chukwa 等,均可以滿足每秒數百MB的日志數據采集和傳輸需求。

(2)           數據實時計算

傳統的數據操作,首先將數據采集并存儲在DBMS中,然后通過query和DBMS進行交互,得到用戶想要的答案。整個過程中,用戶是主動的,而DBMS系統是被動的,過程操作如圖1-2所示。

圖1-2    傳統的數據操作流程

但是,對于現在大量存在的實時數據,比如股票交易的數據,這類數據實時性強,數據量大,沒有止境,傳統的架構并不合適。 流計算 就是專門針對這種數據類型準備的。在流數據不斷變化的運動過程中實時地進行分析,捕捉到可能對用戶有用的信息,并把結果發送出去。整個過程中,數據分析處理系統是主動的,而用戶卻處于被動接收的狀態,處理流程如圖1-3所示。

圖1-3    流計算處理過程

需求:適應流式數據、不間斷查詢;系統穩定可靠、可擴展性好、可維護性好等。

有關計算的一些注意點:分布式計算,并行計算(節點間的并行、節點內的并行),熱點數據的緩存策略,服務端計算。

(3)            實時查詢服務

全內存:直接提供數據讀取服務,定期dump到磁盤或數據庫進行持久化。

半內存:使用Redis、Memcache、MongoDB、BerkeleyDB等內存數據庫提供數據實時查詢服務,由這些系統進行持久化操作。

全磁盤:使用HBase等以分布式文件系統(HDFS)為基礎的NoSQL數據庫,對于KeyValue內存引擎,關鍵是設計好Key的分布。

1.1.4   實時計算框架

最近這幾年隨著實時計算的流行,相繼出現了一下實時計算的框架,以下做了部分總結。

1.       IBM的StreamBase

StreamBase[1]是IBM開發的一款商業流式計算系統,在金融行業和政府部門使用,其本身是商業應用軟件,但提供了Develop Edition。相對于付費使用的Enterprise Edition,前者的功能更少,但這并不妨礙我們從外部使用和API接口來對StreamBase本身進行分析。

StreamBase使用Java開發,IDE是基于Eclipse進行二次開發,功能非常強大。StreamBase也提供了相當多的 Operator、Functor以及其他組件來幫助構建應用程序。用戶只需要通過IDE拖拉控件,然后關聯一下,設置好傳輸的Schema并且設置一下控件計算過程,就可以編譯出一個高效處理的流式應用程序了。同時,StreamBase還提供了類SQL語言來描述計算過程。StreamBase的架構如圖1-4所示。

圖1-4    StreamBase架構圖

StreamBase Server是節點上啟動的管理進程,它負責管理節點上Container的實例,每個Container通過Adapter獲得輸入,交給應用邏輯進行計算,然后通過Adapter進行輸出。各個Container相互連接,形成一個計算流圖。

Adapter負責與異構輸入或輸出交互,源或目的地可能包括CSV文件、JDBC、JMS、Simulation(StreamBase提供的流產生模擬器)或用戶定制。

每個StreamBase Server上面都會存在一個System Container,主要是產生系統監控信息的流式數據。

HA Container用于容錯恢復,可以看出它實際包含兩個部分:Heartbeat和HA Events,其中Heartbeat也是Tuple在Container之間傳輸。在HA方案下,HA Container監控Primary Server的活動情況,然后將這些信息轉換成為HA Events交給StreamBase Monitor來處理。

Monitor就是從System Container和HA Container中獲取數據并且進行處理。StreamBase認為HA 問題應該通過CEP方式處理,也就是說如果哪個部件出現問題,就肯定會反映在System Container和HA Container的輸出流上面,然后 Monitor通過復雜事件處理這些Tuples的話就能夠檢測到機器故障等問題,并做出相應處理。

2.       Yahoo的S4

Yahoo! S4(Simple Scalable Streaming System)是一個通用的、分布式的、可擴展的、分區容錯的、可插拔的流式系統[2]。基于S4框架,開發者可以容易地開發面向持續流數據處理的應用。S4的最新版本是v0.6.0,是Apache孵化項目,其設計特點有以下幾個方面。

■   Actor計算模型:為了能在普通機型構成的集群上進行分布式處理,并且集群內部不使用共享內存,S4架構采用了Actor模式,這種模式提供了封裝和地址透明語義,因此在允許應用大規模并發的同時,也提供了簡單的編程接口。S4系統通過處理單元(Processing Elements,PEs)進行計算,消息在處理單元間以數據事件的形式傳送,PE消費事件,發出一個或多個可能被其他PE處理的事件,或者直接發布結果。每個PE的狀態對于其他PE不可見,PE之間唯一的交互模式就是發出事件和消費事件。

■   對等集群架構:S4采用對等架構,集群中的所有處理節點都是等同的,沒有中心控制節點,這使得集群的擴展性很好,處理節點的總數理論上無上限;同時,S4將沒有單點容錯的問題。

■   可插拔體系架構:S4系統使用Java語言開發,采用了極富層次的模塊化編程,每個通用功能點都盡量抽象出來作為通用模塊,而且盡可能地讓各模塊實現可定制化。

■   支持部分容錯:基于ZooKeeper服務的集群管理層將會自動路由事件從失效節點到其他節點。除非顯式保存到持久性存儲,否則節點故障時,節點上處理事件的狀態會丟失。

S4的重要應用場景就是點擊通過率(CTR)預估這類應用。CTR是廣告點擊數除以展現數得到的比率,當擁有了足夠歷史的展現和點擊數據后,CTR是用戶點擊廣告可能性的一個很好的估算,精確的來源點擊對于個性化和搜索排名來說都價值無限。據S4的開發者稱,在線流量上的實驗顯示基于S4系統的新CTR計算框架可以在不影響收入的前提下將CTR值提高3%,這主要是通過快速檢測低質量的廣告并把它們過濾出去而獲得的收益。S4系統提供的低延遲處理能夠使得商務廣告部門獲益,但是潛在的風險也不能忽視,那就是事件流的速率快到一定程度后,S4可能無法處理,會導致事件的丟失。如圖1-5所示。

圖1-5    S4在流量壓力測試下的事件丟失情況

3.       推ter的Storm

推ter的Storm[3][4]:Storm是一個分布式的、容錯的實時計算系統。Storm用途:可用于處理消息和更新數據庫(流處理),在數據流上進行持續查詢,并以流的形式返回結果到客戶端(持續計算),并行化一個類似實時查詢的熱點查詢(分布式的RPC)。

Storm為分布式實時計算提供了一組通用原語,可被用于“流處理”之中,實時處理消息并更新數據庫。這是管理隊列及工作者集群的另一種方式。 Storm也可被用于“連續計算”(continuous computation),對數據流做連續查詢,在計算時就將結果以流的形式輸出給用戶。它還可被用于“分布式RPC”,以并行的方式運行昂貴的運算。

Storm的主要特點如下:

■   簡單的編程模型。類似于MapReduce降低了并行批處理復雜性,Storm降低了進行實時處理的復雜性。

■   可以使用各種編程語言。可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。

■   容錯性。Storm會管理工作進程和節點的故障。

■   水平擴展。計算是在多個線程、進程和服務器之間并行進行的。

■   可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。

■   快速。系統的設計保證了消息能得到快速的處理,使用?MQ作為其底層消息隊列。

■   本地模式。Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。

4.       推ter的Rainbird

Rainbird是一款分布式實時統計系統,可以用于實時數據的統計:1統計網站中每一個頁面,域名的點擊次數;2內部系統的運行監控(統計被監控服務器的運行狀態),3記錄最大值和最小值。

Rainbird構建在Cassandra之上,使用Scala編寫,依賴于ZooKeeper、Scribe和Thrift。每秒可以寫入10萬個事件,而且都帶有層次結構,或者進行各種查詢,延遲小于100ms。目前推ter已經在Promoted Tweets、微博中的鏈接、短地址、每個用戶的微博交互等生產環境使用了Rainbird。其主要組件的功能描述如下。

■   ZooKeeper:Hadoop子項目中的一款分布式協調系統,用于控制分布式系統中各個組件中的一致性。

■   Cassandra:NoSQL中一款非常出色的產品,集合了Dynamo和BigTable特性的分布式存儲系統,用于存儲需要進行統計的數據,統計數據,并且提供客戶端進行統計數據的查詢。(需要使用分布式Counter補丁CASSANDRA-1072)

■   Scribe:非死book開源的一款分布式日志收集系統,用于在系統中將各個需要統計的數據源收集到Cassandra中。

■   Thrift:非死book開源的一款跨語言C/S網絡通信框架,開發人員基于這個框架可以輕易地開發C/S應用。

5.       非死book 的Puma

Puma是非死book的數據流處理系統,早期的處理系統如圖1-6所示,即二代Puma。PTail將數據以流的方式傳遞給Puma2,Puma2每秒需要處理百萬級的消息,處理多為Aggregation方式的操作,遵循時間序列,涉及的復雜Aggregation操作諸如獨立訪次、最頻繁事件等等。

對于每條消息,Puma2發送“Increment”操作到HBase。考慮到自動負載均衡、自動容錯和寫入吞吐等因素,Puma選擇HBase而不是Mysql作為其存儲引擎。Puma2的服務器都是對等的,也即同時可能有多個Puma2服務器向HBase中修改同一行數據。因此,非死book為HBase增加了新的功能,支持一條Increment操作修改同行數據的多個列。

圖1-6    Puma2系統數據處理通路

Puma2的架構非常簡單并且易于維護,其涉及的狀態僅僅是PTail的Checkpoint ,即上游數據位置,周期性地存儲在HBase中。由于是對稱結構,集群擴容和機器故障時的處理都非常方便。不過,Puma2的缺點也很突出,首先,HBase的 Increment操作是非常昂貴的,因為它涉及讀和寫,而HBase的隨機讀效率是比較差;另外,復雜Aggregation操作也不好支持,需要在HBase上寫很多用戶代碼;再者,Puma2在故障時會產生少量重復數據,因為HBase的Increment和PTail的Checkpoint并不是一個原子操作。

但值得一提的是Puma并沒有開源出來,讀者可以了解和借鑒其實現原理。

6.       阿里的JStorm

JStorm是一個Alibaba開源的分布式實時計算引擎,可以認為是推ter Storm的Java版本,用戶按照指定的接口實現一個任務,然后將這個任務遞交給JStorm系統,JStorm會啟動后臺服務進程7 * 24小時運行,一旦某個Worker 發生故障,調度器立即分配一個新的Worker替換這個失效的Worker。

JStorm處理數據的方式是基于消息的流水線處理, 因此特別適合無狀態計算,也就是計算單元的依賴的數據全部在接受的消息中可以找到, 并且最好一個數據流不依賴另外一個數據流。因此,JStorm適用于下面的場景中:

■   日志分析。從日志中分析出特定的數據,并將結果存入外部存儲器如數據庫。

■   管道系統。 將一個數據從一個系統傳輸到另外一個系統,比如將數據庫同步到Hadoop。

■   消息轉化器。將接受到的消息按照某種格式進行轉化,存儲到另外一個系統如消息中間件。

■   統計分析器。從日志或消息中,提煉出某個字段,然后做COUNT或SUM計算,最后將統計值存入外部存儲器。

但是,JStorm的活躍度并不高,截止到本章書寫的時間,整個JStorm項目共提交過36次,并且只有1個Committer,相比推ter Storm,不管是活躍度、認可度上還不是一個數量級的產品。

7.       其他實時計算系統

(1)              HStreaming

HStreaming[5]是建立在Hadoop上的可擴展的、可持續的數據分析系統。它可以分析、可視化并處理大量連續數據比如一個金融交易系統實時展示數據圖。HStreaming由Jana Uhlig與Volkmar Uhlig聯合創立,該公司沒有提供相關產品的開源版本,從官網信息來看只提供相關的解決方案。

HStreaming公司嘗試為Hadoop環境添加一個實時的組件,當數據提交到系統,在存儲到磁盤之前就會進行數據的處理,類似開源的Storm和Kafka。目前HStreaming已經建立了一個完整的系統,該系統能夠利用實時的引擎來處理視頻、服務器、傳感器以及其他機器上生成的數據流。而且完全兼容Hadoop作為一個歸檔和批量處理系統。

(2)              Esper

Esper[6]是EsperTech公司使用Java開發的事件流處理(ESP:Event Stream Processing)和復雜事件處理(CEP:Complex Event Processing)引擎。CEP是一種實時事件處理并從大量事件數據流中挖掘復雜模式的技術。ESP是一種從大量事件數據流中過濾,分析有意義的事件,并能夠實時取得這些有意義的信息的技術。該引擎可應用于網絡入侵探測,SLA監測,RFID讀取,航空運輸調控,金融方面(風險管理、欺詐探測)等領域。Esper可以用在股票系統、風險監控系統等等要求實時性比較高的系統中。

(3)    Borealis

Brandeis University、Brown University和MIT合作開發的一個分布式流式系統,由之前的流式系統Aurora、Medusa演化而來,學術研究的一個產品,08年已經停止維護。

Borealis具有豐富的論文、完整的用戶/開發者文檔,系統是C++實現的,運行于x86-based Linux平臺。系統是開源的,同時使用了較多的第三方開源組件,包括用于查詢語言翻譯的ANTLR、C++的網絡編程框架庫NMSTL等。

Borealis系統的流式模型和其他流式系統基本一致:接受多元的數據流和輸出,為了容錯,采用確定性計算,對于容錯性要求高的系統,會對輸入流使用算子進行定序。

8.       框架對比

實時數據流計算是近年來分布式、并行計算領域研究和實踐的重點,無論是工業界還是學術界,誕生了多個具有代表性的數據流計算系統,用于解決實際生產問題和進行學術研究。不同的系統滿足不同應用的需求,系統并無好壞之分,關鍵在于服務的對象是誰。圖8-5從各個方面比較了典型的三個數據流計算系統Puma、Storm和S4,因為StreamBase是廠商發行商用版本,HStreaming只提供解決方案,而JStorm和Storm非常相似,所以這幾種產品并沒有羅列在圖1-7中。

圖1-7    Puma、Storm和S4三種數據流計算系統對比

圖1-7從開發語言、高可用機制、支持精確恢復、主從架構、資源利用率、恢復時間、支持狀態持久化及支持去重等幾個方面對這三種系統進行了對比。可以看到,為了高效開發,兩個系統使用Java,另一種系統使用函數式編程語言Clojure;高可用方案,有兩個系統使用Passive Standby方式,系統恢復時間可控,但系統復雜度增加,資源使用率也較低,因為需要一些機器用來當備機;而Storm選擇了更簡單可行的上游回放方式,資源使用率高,就是恢復時間可能稍長些;Puma和S4都支持狀態持久化,但S4目前不支持數據去重,未來可能會實現;三個系統都做不到精確恢復,即恢復后的執行結果和無故障發生時保持一致,因為即使是Passive Standby方式,也只是定期Checkpoint,并沒有跟蹤每條消息的執行。商用的StreamBase支持精確恢復,這主要應用于金融領域。

[1]有關該實現的詳細資料,請參見官網http://www.streambase.com

[2]有關該實現的詳細資料,請參見http://s4.io/。

[3]有關該實現的詳細資料,請參見http://www.slideshare.net/kevinweil/rainbird-realtime-analytics-at-推ter-strata-2011。

[4]官方指南請參見https://Storm.canonical.com/Tutorial。

[5]有關該實現的詳細資料,請參見http://www.hstreaming.com/technology/hstreaming/。

[6]官方網站請參見http://www.espertech.com/。

</div>

 本文由用戶 hrd888888 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!