大數據系列之(一) Streaming模式基礎知識
- 作者:Tyler Akidau
- 譯者:張磊
- 原文: http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html </ul>
- Streaming 101 : 本篇會先講述一些最基礎的知識和澄清一些專業術語,之后我們深入地講述時間的作用,以及數據處理常用方法,這主要包括batch和streaming兩種模式。
- The Dataflow Model : 第二篇用一些實際例子講述Cload Dataflow, 它綜合了batch和streaming兩種模式。之后,我回總結現有batch和streaming系統在語義上的相同點和不同點。
- 術語 : 任何復雜的問題都需要精確的術語。我會嘗試把一些被濫用的術語重新定義,所以讓大家能明白我用他們的時候在說什么。
- 功能 :我會列舉現有streaming系統常見的缺點,然后提出一個我覺得streaming系統應該有的功能,這些功能能夠解決現有或者將來數據處理的需求。
- 時間問題(Time Domains) : 介紹兩個數據處理中跟時間相關的兩個定義,并分析他們是怎么互相聯系,然后指出這兩個時間給數據處理帶來怎樣的困難。
- Unbounded Data無窮數據 :一種持續產生并且無窮的數據,他們經常被稱為‘streaming data’流數據,但是用streaming和batch去描述數據是錯誤的,正如我剛才說的,這兩個詞隱喻只能用某種類型 執行引擎 去處理這類數據。數據只分成兩類:無窮和有限,不能說數據是streaming或者batch。我建議當我說infinite ‘streaming’ data無窮流的數據請直接用unbounded data無窮數據,當我們說finite batch data有限‘batch’數據時直接用bounded data有限數據。(譯者:作者只是強調streaming和batch不能用來描述數據,它們只是數據處理的方式。)
- Unbounded data processing無窮數據處理 :一種用來處理無窮數據的方式。我自己也常常使用streaming來代表這種方式,但是不對的,batch也能實現它。因此,為了減少誤解,我不會用streaming來代替它,我會直接使用無窮數據處理這個術語。
- 低延遲,近似結果 :他們經常跟流處理引擎連在一起。批處理系統通常不是設計用來解決低延遲近似結果,這是現狀,不代表它不可以。事實上,批處理引擎是可以提供近似結果的。因此,低延遲就是低延遲(譯者:延遲是指從系統收到消息,到完成整個消息計算的時間差),近似結果就是近似結果(譯者:很多計算開銷很大,近似結果用很少的開銷就能提供可控的精度的正確性,比如yahoo最近開源的 Sketches 就能解決unique users count),不要用streaming流處理引擎去代表他們。
- Event time發生時間 :事件實際觸發時間(譯者:我常叫client time,比如你了解手機app5分鐘活躍度,那Event time就是你實際用手機的時間,由手機app打的時間戳)
- Processing time處理時間 :時間被系統獲知的時間(譯者:我常叫server time,當事件進入這個系統的時間,大部分是應用層收到消息后加的時間戳。)
- 無序數據 :意味著你需要處理時序問題,如果你關心event time。(譯者:所有事件從絕對時間上看,一定都是有順序的,如果一切都是單機,你一定定保證順序。)
- event time時間差 :你不能保證在X+Y時能看到大部分X發生的數據,這里Y是固定的數值。(譯者:還是強調消息到底的不確定性)
- Time-agnostic 時間無關的邏輯
- Approximation 近似算法
- Processing time分塊
- Event time分塊
- Fixed windows 固定窗口 :按時間分成固定大小的塊。
- Sliding windows 滑動窗口 :更一般的固定窗口,滑動窗口一般都是固定長度加上固定時間段。如果段小于長度,滑動窗口就是重疊的,否則就是sampling窗口,只使用部分數據。跟fixed窗口一樣,大部分滑動窗口都是對齊的,有時候我們用不對齊的方式優化性能。
- Sessions 序列 :動態窗口,用不活躍段將所有事件分成一個個session,所以session是一串連續的事件,不活躍段都是用超時時間。session一般用來分析用戶行為數據,取決于實際時間情況,不能預先計算。sessions是經典的不對齊窗口,因為沒有兩個人的數據是完全相同的。
- 簡單 :你不需要管時間是否亂,只需要保存來的數據,到時扔給下游
- 完整性 :系統能清楚的知道數據是否完整,沒必要處理“晚來”的數據。
- 如果你想推斷一些關于上游的情況,只需要用processing time。 : 監控系統就是最好的例子,比如你想知道一個全球化web service的QPS,使用processing time計算QPS是最好的方法。
- Buffering :更長的生命要求我們保存更多的數據。很幸運,現在持久化已經是數據處理系統中最便宜的資源了,相對于CPU,網絡帶寬和內存來說,因此buffer不是太大的問題,至少比設計強一致性存儲和內存cache要容易。很多聚合操作不需要保存整個輸入,比如和或者均值。
- Completeness 完整性 :我們都沒有好方法知道所有數據都到了,我們如何知道何時發布結果?而且我們沒法簡單計算“何時”。(譯者:發布數據是指讓下游感知,比如把數據結果更細到DB)對于大部分情況,系統能使用類似MillWheel的watermarks去可靠地預測數據是否完整(我會在下一章講)。但是在某些情況下,正確性非常重要,想想計費系統,我們唯一的方法是提供一個方法讓系統自己能夠控制什么時候發布數據,并且讓系統自己能反復修改最終數據。完整性是一個非常有趣的話題,最好能用一些強有力的例子來說明,我會在下一篇分享。
- 澄清術語,特別是streaming的定義只限于執行引擎,而將其他術語,比如unbounded data和近似算法都放在streaming的概念下。
- 分析了設計正確的batch和streaming系統,總結出 streaming是batch的功能超集 ,Lambda架構最終會被streaming取代。
- 提出兩個重要的概念,能夠幫助streaming追趕并超越batch: 完整性 和 時間工具
- 列出了 event time和processing time 的關系,并且指出兩個時間給我們帶來的困難,根據完整性的概念,提出系統應該能夠擁抱在時間上的變化,提供完整精確的結果。
- 分析了常用數據處理方法,包括bounded和unbounded數據,主要是batch和streaming引擎,并且把unbounded數據處理分成4類: time-agnostic, approximation, windowing by processing time, and windowing by event time 。
- 從更高層面分析 數據處理 的概念,主要從4個方面入手: what, where, when, and how 。
- 詳細分析如何用Dataflow Model來完成各種不同的需求。他講幫助你更徹底的理解啥是event time和processing time,也包括一個新的概念:watermarks
- 比較現有數據處理系統,特別是一些最要的特性,讓我們更好的選擇他們,并且鼓勵大家改善他們,幫助我實現我的最終目標:讓streaming成為大數據處理的最好形式。
譯者摘要
現在大數據,云計算已經成為互聯網的標配,但是現在主流的大數據處理依舊是使用batch模式,batch模式就是將數據按某種規則分成塊,然后對整個塊跑計算邏輯,缺點是延遲太高(至少是分鐘),常用的工具就是Hadoop。在日益變化的需求面前,高延遲越來越不能忍受,因此Streaming模式應運而生,他最大的特點就是低延遲,最快能到毫秒級別,常用的Streaming工具主要是storm,spark等,但是這些工具都有各自的優缺點,功能上不能完全取代batch,這篇文章就是想深入分析什么樣的Streaming系統能徹底替代batch,并最終打敗batch。
序
盡管現在市場上對streaming越來越關注,但是相對于batch來說,大部分streaming系統還不是很成熟,所以streaming系統正處于積極開發和進步中。
作為一個在分布式steaming系統上工作過5年的從業者(MillWeel, Cloud DataFlow), 我非常榮幸給大家分享streaming系統能解決什么問題,以及batch系統和streaming系統在語義上的區別,這非常重要,會幫助我們更好地使用它。內容眾多,我會分成兩篇文章來講述:
下面是一篇很長的文章,不要分心。(譯者:原文是nerdy,主要指呆滯但是專注的技術宅)
背景知識
本節講述一些非常重要的背景知識,幫助理解剩下的部分。
術語:什么是streaming?
在我們深入之前,我想先解決:什么是streaming。現在‘Streaming’有很多不一樣的意義,讓我們誤解streaming到底能解決什么問題,所以我一定要先 精確 定義它。
問題的本質是:很多術語不是告訴我們他們是什么,而是告訴我們他們是怎么實現的,比如Unbounded data processing無窮數據處理和Approximate resulte近似結果處理被認為就是streaming,而實際上只是streaming常常用來解決這類問題,其實batch也能解決。沒有精確定義的streaming被人們狹隘的認為streaming只是擁有某些常被稱做‘streaming’的特性,比如近似結果。一個設計合理的streaming系統是可以提供 correct正確 , consistent一致 并且 repeatable可重復計算 的結果,就像現在所有批處理引擎,我覺得 streaming定義 是: 一種能處理無窮數據的數據處理引擎 。(為了這個定義更完整,我必須強調它包含了我們常說的streaming和micro-batch)。
下面是一些經常和‘streaming’聯系在一起的幾個術語,我分別為他們給出了更精確,更清楚的定義,我建議整個工業界都應該采納它們:
到此,我已經定義了什么是流處理引擎:一種用來處理無窮數據的引擎,對于其他術語,我不會用streaming來替代他們。我們已經在Cloud Dataflow中使用這些術語了,我建議其他人也可以采用它們。
(譯者:在下面的文章中,我會直接用streaming和batch來代替流處理引擎和批處理引擎,這樣可以少敲幾個字,敲字很辛苦)
泛化streaming的功能
下面讓我們看看streaming到底能做什么和不能做什么,特別是設計合理的streaming能做什么。長久以來,streaming被工業界狹隘地認為是提供低延遲,近似結果,批處理是用來提供正確結果的,比如 Lambda Architecture (譯者注:下文我會用Lambda架構表示,我理解為啥叫它Lambda)。
如果你熟悉Lambda架構,你應該知道這個架構最基本的思想就是使用批處理引擎和流處理引擎,執行一樣的計算,流處理引擎提供低延遲不精確結果(可能是使用近似算法,或者流處理就不準備提供正確性),之后批處理引擎提供正確的結果,覆蓋流處理引擎的結果。起初,這個架構,由 Nathna Marz, Storm創始人 提出,真的獲得不少成功,因為這個想法在當時看來真是不錯,那時候流處理引擎還不令人滿意,批處理引擎也不能像我們預期的那樣發展得很好,Lambda架構卻提供一種短期解決方案,并且很多項目真的用了。Lambda架構也存在缺點:維護成本高,你需要維護兩套獨立的系統,而且需要在最后合并他們的結果。
作為一個在 強一致 流處理引擎工作多年的從業人員,我不同意Lambda架構的出發點,而且我特別喜歡 Jay Kreps’ Questioning the Lambda Architecture 這篇文章,它很有遠見地質疑了雙引擎的必要性,并且給出很強的理由。Kreps使用repeatability可重放的消息隊列系統 kafka 去解決repeatability可重放問題,然后他提出一種新的架構:Kappa架構:基本思想就是只使用一套合理設計的引擎。雖然我不認為這個概念需要一個新的名字,但是我非常贊同他的思想。
(譯者:可重放是指你可以隨時回到一個時間點順序讀取任何信息,kafka能做到這點,但現實是kafka也有開銷,比如你的model需要一年的歷史數據,你會讓kafka存下一年的數據?基本上不會,你應該把數據存在開銷更低的系統,比如hadoop,但是streaming系統可以讀取回這些歷史數據)
現在我們應該更近一步,我認為:設計合理的streaming是可以提供比batch更多的功能,到那時我們不再需要batch。(譯者:當前所有streaming比batch占用更多的資源,從商業上說batch一定會持續存在直到streaming能更加高效利用資源)Flink就是利用這個想法創造了一個完全的streaming系統,并且支持batch模式,我非常喜歡它。
隨著streaming系統越來越成熟,它將提供一種無窮流處理的框架,并且讓Lambda架構消失在歷史中。我相信它已經在發生。如果我想徹底打敗batch,我們需要完成兩件事:
正確性:這讓streaming和batch能夠等同。
本質上,正確性取決于consistent一致的存儲。steaming需要一種類似checkpointing持久化存儲,正如Kreps在它這篇文字所寫,這種存儲在機器掛掉的情況也能保證一致性。當幾年前Spark streaming首次出現時,它在streaming世界里就是一致性的代名詞。一切都在向前發展,仍有很多streaming系統不提供強一致性,我實在是不理解為啥at-most-once仍然存在,但是實際情況它還在。(譯者:如果at-most-once指的是系統保證這個消息最多被處理一次,其他方式是:at-least-once至少一次和exactly-once只有一次,作者想表達的是最多一次就是系統會丟數據,除非你不關心正確性,否則這種方式就不該存在。但實際是上實現三種方式的開銷不一樣,隨著系統越來越成熟,可能三種開銷就不會那么大,到那時候估計就沒人愿意使用最多一次的方式了。)
重要的事情再說一次:強一致性必須要exactly-once處理,否則無正確性可言,而且只有這樣才能追上并且最終超越batch系統。除非你毫不在乎結果的正確性,否則我真誠地建議你放棄任何不提供強一致性的streaming系統,不要浪費時間在他們身上。
如果你真的想學習如何給streaming系統設計強一致性,我推薦你可以讀讀 MillWheel 和 Spark Streaming 的論文,這兩篇論文都花費了很長的時間講述一致性。本文時間有限,就不在此詳述了。 (譯者:還沒讀,看完會給大家分享下)
時間工具:這讓streaming超越batch。
當我們處理無窮無序數據時,時間工具是一切的基礎。當前,越來越多的需求要求我們處理無窮無序數據,而現有batch系統(包括大多數streaming系統)都缺乏必要的工具去解決這個困難。我會用本文剩下的部分和下一篇文章著重解釋它。(譯者:無序是難點,大部分分布式系統都不可能提供順序保證,這里時間工具是指系統提供api,讓我們自己控制超時以及如何按時間分塊,下面會有詳述。)
我們首先會了解時間問題中重要的概念,而且深入分析什么是無窮無序數據的時間差,之后我會用剩下的部分講解batch和streaming系統處理有限和無窮數據的常用方法。
Event Time和 Processing Time
(譯者:下面我會直接用event time 和processing time)
很多需求都不關注event time,這樣的生活會簡單很多,但是還是有不少需求是關心的,比如為帶時序的用戶行為建立特征,大多數付費應用和很多異常檢查。(譯者:廣告的attribution就是帶時序的行為,你只能在看過廣告后點擊)
完美情況下,Event time和processing time應該永遠是相等的,事件發生后立即被處理。現實是殘酷的,兩者的時間差不僅不是0,而是一個與輸入,執行引擎和硬件相關的函數。下面幾個是經常影響時間差:
* 有限的共享資源:比如網絡阻塞,網絡分區,不獨占條件下的共享CPU
* 軟件:分布式系統邏輯,競爭
* 數據本身的特征:包括key的分布,吞吐量差異,無序(比如:很多人在坐飛機時關閉飛行模式使用手機,等手機網絡恢復后手機會把事件發給手機)
下圖就是一個真實的event time和processing time差異圖:
黑色點線代表兩個事件完全一致。在這個例子中,系統延遲在中間到最低點。橫向距離差代表兩個時間的時間差,基本上這個時間差都是由延遲導致。
這兩個時間沒有固定的相關性(譯者:不能簡單的用一個函數去計算),所以如果你真的關心event time,你就不能用processing time去分析你的數據。不幸的是大多數現有系統都是用processing time去處理無窮數據,他們一般都會將輸入按processing time用一些臨時的分界線拆分小塊去處理。
如果你們系統關心正確性,那就千萬不能用processing time去分塊,否則一部分消息會因此被分到錯誤的塊,無正確性而言。我們會在下面看到更多這樣的例子。
即使我們用event time作為分界線,其實也不能實現正確性。如果event time和processing time之間沒有一個可預測的關系,你怎么能確定你已經收到所有消息?(比如:你要統計5分鐘的數據,你怎么能保證收到所有5分鐘的數據)現在大部分數據處理系統都依賴某種“完整性”,但是這么做讓他們處理無窮數據遇到嚴重的困難。(譯者:完整性一般都是用超時來實現,等一段時間發現沒有了就放棄)
我們應該設計工具能夠讓我們生活在這種不確定性中(譯者:不確定性是指時間差不能預測)。當新數據到底時,我們可以獲取或者修改老數據,所有系統都應該自然而然去優化“完整性”,而不是認為它至少可有可無的語義。(譯者:優化完整性的意思是系統能夠提供api控制超時。)
在我們深入如何實現類似Cloud Dataflow數據模型前,我們先了解一個更有用的話題:常見數據處理方式。
數據處理方式
現在我們可以開始談一些重要的數據處理模式了:batch和streaming(我把micro-batch歸到streaming中,因為兩者差異在這里不是很重要)。
有限數據
處理有限數據很簡單,大多數都已經熟悉。在下圖中,左邊是一個完整數據集,中間是數據處理引擎(一般是batch,當然一些設計合理的streaming也可以),比如 MapReduce ,右邊則是處理的結果:
更讓我們感興趣的是無窮數據,我們會先分析傳統的batch,然后分析常見streaming或者micro-batch。
無窮數據 —— batch
雖然batch從字面上看不像用來處理無窮數據,但是從它出生就已經被業界使用了。大家都能想到,只要我們能把無窮數據分塊,我們就能用batch引擎出處理。
固定窗口
最常見的方法就是把數據拆分成固定大小的塊,然后依次處理各個塊數據。特別是日志數據,我們可以自然而然把數據拆分成以時間為單位的樹狀結構,這個很直觀,因為基本上就是按event time分塊。
實際上,大部分系統仍然要處理完整性問題:如果一些事件由于網絡延遲到達,怎么辦?如果消息是全球的,但是存在一個公共地方,怎么辦?如果事件來自手機,怎么辦?我們可能需要用一些特別的辦法解決它們:比如延遲事件處理直到所有數據都到達,或者重新計算整個數據集只要有新的數據到達。
** Sessions 序列 **這個比簡單分塊復雜,我們需要把事件拆的更細。Sessions一般是指把事件按活躍間隔拆分。如果在batch中計算sessions,你會出現一個Session橫跨多個塊,比如下圖紅色部分。當然,當你增加每個batch的時間間隔,這種橫跨多個batch的概率會降低,但是它會帶來更高的延遲。另外一個方法是增加額外的邏輯出處理這種情況,代價是邏輯復雜。
兩者都不是完美的方法,更好的方法是用streaming的方式處理session,我們下面會講。
無窮數據 —— streaming
streaming從開始就是設計用來處理無窮數據,這跟大多數batch引擎不太一樣。正如我們上面說的,大多數分布式系統的數據不只是無窮,還是一些其他讓人討厭的特性:
因此出現了一批處理這類數據的方法,我大致把他們分為4類:
我們簡單分析下他們
Time-agnostic
這類處理邏輯完全跟時間沒關心,只是更數據本身有關,batch和streaming在處理這種邏輯時沒有本質區別,所以大部分streaming都支持。當然batch引擎也支持,你可以用任意方法把數據分塊,再依次處理。下面是一些實際例子:
Filtering過濾
最基礎的邏輯,比如你在處理web日志,你想要某些域名的數據,你可以在每個事件到底的時候丟掉那些不想要的,跟無窮,無序,時間差一點關系都沒有。
Inner-joins
當join兩類數據時,你只關系兩者都到達的情況,如果一方先到,你可以把它存起來,等第二個到底之后取回前一個,然后計算。當然你想回收一些單個事件占用的資源,那就是跟時間有關了。(譯者:超時回收)
當然如果要支持某些outer-join,就存在數據完整性問題:當你看到一方時,你怎么知道另一方是否還會來?你肯定不知道,除非你設計超時,超時又跟時間相關,本質上又是另一種形式的分塊,我們會詳細分析。
近似算法
第二大類就是近似算法,比如近似Top-N,streaming K-kmeans等等。近似算法的好處是開銷低,可以處理無窮數據。壞處是這類算法數量有限,而且實現復雜,近似的本質限制他們的使用,不可能所有場景都能用近似。這些算法大多數都有一些時間特征,比如decay,當然他們大多用processing time。另一個很重要的特點就是他們都提供可控的錯誤率。如果錯誤率可以根據數據到達的順序預測出來,這些錯誤率就可以忽略不計,即便是無窮數據,這點很重要,你最好能記住這點。
近似算法讓人很興奮,但本質上是另一種形式的時間無關算法。
分塊
剩下兩個方法都是講如何將數據按時間分塊。我想先講明白啥是windowing,它就是把無窮或者有限數據按分界線拆分成有限的塊。下圖是三種不同的分塊策略:
我們會看看用processing time和event time去分窗口到達有什么不同,當然從processing time開始,因為它更常用。
** Processing time分塊 **
系統只需要保存來的數據直到一段時間完成。比如:5分鐘分塊,系統可以保存5分鐘的數據,然后認為所有數據都到了,發給下游處理就行。這種方式有下面幾個很好的特性:
當然processing time也有缺點: 如果數據含有event time,并且你想用processing time來分塊解決,那么這些數據必須是有序的。 不幸的是分布式上游大部分無法保證有序。
一個簡單的例子:手機app想了解用戶使用情況。如果一個手機斷網一段時間,這段時間的數據不會即時發到服務器端直到手機再次連網。我們無法用processing time分塊獲得任何有用的信息。
另一個例子:有些分布式上游在正常情況下可以保證event time順序,但是不代表上游一直能保持一樣的時間差。比如一個全球系統,由于網絡問題,可能某個州的網絡導致很高的延遲,如果你是processing time分塊,分塊的數據已經不能代表當時的情況了,可能是新舊數據的混合。
我們希望這兩個例子都是用event time分塊,保證邏輯的正確性。
** Event time分塊 **
Event time分塊能讓我們觀察到上游數據在事件發生時的情況,這是最正確的方法。但是,大多數數據處理系統都沒能從語義很好的支持它,雖然我們知道任何強一致性的系統(Hadoop或者Spark Streaming)經過一些修改是能解決的。(譯者:作者說從語義上支持就是系統能保證按Processing time分塊,我常用的工具都沒有這樣的語義)
下圖顯示了一個按event time分成一小時的塊:
白色的線代表事件的processing time不同于event time。毋容置疑,event time分塊一定能保證event time正確性。
另一個好處是你可以創建動態大小的塊,比如session,但是不存在session跨越多個固定的塊。
當然,任何事情都是有代價的,包括event time分塊。它有兩個很明顯的缺點,因為塊必須比實際長度存活更久:
總結
這篇包含太多信息了。如果你讀到這里,你應該受到表揚,我們已經完成了一半了,讓我們回顧下我們講了什么,并且知道下一篇講什么。讓人高興的是這篇無聊些,但是下一篇一定有趣。