學習筆記:The Log(我所讀過的最好的一篇分布式技術文章)

jopen 9年前發布 | 53K 次閱讀 The Log 分布式/云計算/大數據

前言

這是一篇學習筆記。
學習的材料來自Jay Kreps的一篇講Log的博文。
原文很長,但是我堅持看完了,收獲頗多,也深深為Jay哥的技術能力、架構能力和對于分布式系統的理解之深刻所折服。同時也因為某些理解和Jay哥觀點吻合而略沾沾自喜。

Jay Kreps是前Linkedin的Principal Staff Engineer,現任Confluent公司的聯合創始人和CEO,Kafka和Samza的主要作者。

所謂筆記,就是看了文章,提筆就記,因為Jay哥本身本章組織的太好,而其本身的科學素養及哲學素養也很高,所以私以為出彩的東西就不省略了。

一、資料來源

The Log: What every software engineer should know about real-time data's unifying abstraction

二、筆記

2.1 Log的價值

1) Log是如下系統的核心:

  • 分布式圖數據庫
  • 分布式搜索引擎
  • Hadoop
  • 第一代和第二代K-V數據庫

2) Log可能跟計算機的歷史一樣長,并且是分布式數據系統和實時計算系統的核心。
3) Log的名字很多:

  • Commit log
  • Transaction log
  • Write-ahead log

4) 不理解Log,你就不可能充分理解

  • 數據庫
  • NoSQL存儲
  • K-V存儲
  • 復制
  • Paxos算法
  • Hadoop
  • Version Control
  • 或者,任何軟件系統

2.2 什么是Log?

2.2.1 概述

這里寫圖片描述

  • 記錄會附加到log的尾部。
  • 從左到右讀取記錄。
  • 每個entry都有唯一且有序的log entry 序號。

記錄的順序定義了這樣的一個概念:時間。
因為越靠左的記錄越早。
Entry的序號可以當作一種時間戳,將記錄的順序當作時間這一概念看起來很奇怪,但是很快你就會發現,這樣做:可以方便地將“時間”與任一特定的物理時鐘解耦。
Log和常見的文件、表(table)沒有那么大的差別。

  • 文件是一組字節
  • 表是一組記錄
  • Log可以說是某種將記錄按時間排序的文件或者表

這樣說,可能你會覺得log如此簡單,還有討論的必要嗎?
其實,log的核心意義在于:

Log記錄了何時發生了什么(they record what happened and when.)。

而這一條,通常是分布式系統最最最核心的東西。
注意,這里有必要澄清幾個概念:

  • 本篇所討論的Log和程序員通常接觸的應用日志(application logs)不同
  • 應用日志通常是一種非結構化的,記錄錯誤信息、調試信息,用于追蹤應用的運行的,給人看的日志,比如通過log4j或者 syslog來寫入本地文件的日志。
  • 而本篇所討論的log是通過編程方式訪問的,不是給人看的,比如“journal”、“data logs”。
  • 應用日志是本篇所討論的log的一種特化。

2.2.2 數據庫中的Logs

Log的起源不得而知,就像發明二分查找的人,難以意識到這種發明是一種發明。
Log的出現和IBM的System R 一樣早。
在數據庫中,需要在數據庫崩潰時,保持多種多樣的數據結構和索引保持同步。
為保證原子性和持久性,數據庫需要在對數據結構和索引進行修改提交之前,記錄其要修改的內容。
所以log記錄了何時發生了什么,而每一張表和索引本身,都是這種歷史信息的映射。
因為log是立即持久化的,所以當crash發生時,其成為恢復其它持久化結構的可靠來源。

Log從保證ACID特性的一種實現,發展成了一種數據庫之間數據復制的手段。

很顯然,數據庫中發生的一系列的數據變更,成為數據庫之間 保持同步最需要的信息。
Oracle、MySQL、PostgreSQL,都包含了log傳輸協議,將log的一部分發送到用于保持復制的從數據庫(Slave)。
Oracle的XStreams和GoldenState,將log當作一種通用的數據訂閱機制,以提供給非Oracle的數據庫訂閱數據。
MySQL和PostgreSQL也提供了類似的組件,這些組件是數據系統架構的核心。
面向機器的Log,不僅僅可被用在數據庫中,也可以用在:

  • 消息系統
  • 數據流(data flow)
  • 實時計算

2.2.3 分布式系統中的logs

Log解決了兩個很重要的分布式數據系統中的問題:
1) 有序的數據變化
2) 數據分布式化

所謂的狀態機復制原理(State Machine Replication Principle):

如果兩個確定的處理過程,從相同的狀態開始,按照相同的順序,接收相同的輸入,那么它們將會產生相同的輸出,并以 相同的狀態結束。

所謂確定的(deterministic),是指處理過程是時間無關的,其處理結果亦不受額外輸入的影響。
可以通過非確定的例子來理解:

  • 多線程的執行順序不同導致不同的結果
  • 執行getTimeOfDay()方法
  • 其它的不能重復的處理過程

所謂狀態,可以是機器上的任意數據,無論在處理結束后,是在機器的內存中還是磁盤上。
相同的輸入按照相同的順序,產生相同的結果,這一點值得引起你的注意,這也是為什么log會如此重要,這是一個直覺性的概念:如果你將同一個log輸入兩個確定性的程序,它們將產生相同的輸出。
在分布式系統的構建中,意識到這一點,可以使得:
讓所有的機器做同樣的事,規約為:
構建分布式的、滿足一致性的log系統,以為所有處理系統提供輸入。

Log系統的作用,就是將所有的輸入流之上的不確定性驅散,確保所有的處理相同輸入的復制節點保持同步。

這種方法的最妙之處在于,你可以將索引日志的時間戳,作為所有復制節點的時鐘來對待:

通過將復制節點所處理過的log中最大的時間戳,作為復制節點的唯一ID,這樣,時間戳結合log,就可以唯一地表達此節點的整個狀態。

應用這種方法的方式也很多:

  • 在log中記錄對一個服務的請求
  • 在回復請求的前后,記錄服務狀態的變化
  • 或者,服務所執行的一系列轉換命令,等等。

理論上來講,我們可以記錄一系列的機器指令,或者所調用方法的名稱及參數,只要數據處理進程的行為相同,這些進程就可以保證跨節點的一致性。
常玩兒數據庫的人,會將邏輯日志和物理日志區分對待:

  • 物理日志:記錄了所有的行內容的變化。
  • 邏輯日志:不是記錄內容的變化,而是Insert , update , delete等導致行內容變化的SQL語句。

對分布式系統,通常有兩種方式來處理復制和數據處理:
1) State machine model(active - active)
2) Primary-back model (active - passive)

如下圖所示:
這里寫圖片描述

為了理解上述兩種方式的不同,來看個簡單的例子:
現在,集群需要提供一個簡單的服務,來做加法、乘法等算術運算。初始,維護一個數字,比如0。

  • Active – active :在日志記錄這樣的一些操作,如“+1”、“*2”等,這樣,每個復制節點需要執行這些操作,以保證最后的數據狀態是一致的。
  • Active – passive:一個單獨的master節點,執行“+1”、“*2”等操作,并且在日志中記錄操作的結果,如“1”、“3”、“6”等。

上面的例子也揭示了,為什么順序是復制節點之間保持一致性的關鍵因素,如果打亂了這些操作的順序,就會得到不同的運算結果。
分布式log,可以當做某些一致性算法的數據結構:

  • Paxos
  • ZAB
  • RAFT
  • Viewstamped Replication

一條log,表征了一系列的關于下一個值是什么的決定。

2.2.4 Changelog

從數據庫的角度來看,一組記錄數據變化的changelog和表,是對偶和互通的。
1) 依據記錄了數據變化的log,可以重構某一狀態的表(也可以是非關系型存儲系統中有key的記錄)
2) 相反,表如果發生了變化,可以將變化計入log。

這正是你想要的準實時復制的秘籍所在!

這一點和版本控制所做的事情極為類似:管理分布式的、并發的、對狀態進行的修改。

版本控制工具,維護了反映修改的補丁,這其實就是log,你和一個被簽出(checked out)的分支快照進行交互,這份快照就相當于數據庫中的表。你會發現,版本控制與分布式系統中,復制都是基于log的:當你更新版本時,你只是拉取了反 映了版本變化的補丁,并應用于當前的分支快照。

2.3 數據集成(Data integration)

2.3.1 數據集成的含義

所謂數據集成,就是將一個組織中的所有服務和系統的數據,變得可用。

實際上,對數據進行有效利用,很符合馬斯洛的層次需求理論。
金字塔的最底層,是收集數據,將其整合進應用系統中(無論是實時計算引擎,還是文本文件,還是python腳本)。
而這些數據,需要經過轉換,保持一個統一、規范、整潔的格式,以易于被讀取和處理。
當上面的要求被滿足后,就可以開始考慮多種多樣的數據處理方式,比如map – reduce 或者實時查詢系統。
很顯然,如果沒有一個可靠的、完備的數據流,Hadoop就僅僅是一個昂貴的、難以整合的加熱器(集群很費電么?)。
相反,如果能保證數據流可靠、可用且完備,就可以考慮更高級的玩法、更好的數據模型和一致的、更易被理解的語義。
接著,注意力就可以轉移到可視化、報表、算法和預測上來(挖啊機啊深度啊)。

2.3.2 數據集成的兩個復雜性

事件

事件數據,記錄了事件是怎么發生的,而不僅僅是發生了什么,這一類log通常被當做應用日志,因為一般是由應用系統寫入的。但這一點,其實混淆了log的功能。
Google的財富,其實,是由一個建立在(用戶)點擊流和好惡印象(體驗)之上的相關性pipeline產生的,而點擊流和印象,就是事件。

各種各樣的專業數據系統的爆發

這些系統存在的原因:

  • 聯機分析(OLAP)
  • 搜索
  • 簡單的在線存儲
  • 批處理
  • 圖譜分析
  • 等等(如spark)

顯然,要將數據整合進這樣的系統中,對于數據集成來講,極為困難。

2.3.3 基于日志結構的數據流

每種邏輯意義上的數據源,都可以依據log進行建模。

數據源可以是記錄了事件(點擊和PV)的應用程序,可以是接受更改的數據庫表。

每個訂閱者,都盡可能快地從這些數據源產生的log中獲取新的記錄,應用于本地的存儲系統,并且提升其在log中的讀取偏移(offset)。訂閱者可以是任何數據系統,比如緩存、Hadoop、另一個站點的數據庫,或者搜索引擎。

Log,實際上提供了一種邏輯時鐘,針對數據變化,可以測量不同的訂閱者所處的狀態,因為這些訂閱者在log中的讀取偏移不同且相互獨立,這種偏移就像一個時間意義上的“時刻”一樣。

這里寫圖片描述

考慮這樣一個例子,一個數據庫,和一些緩存服務器:
Log提供了這樣一種能力,可以使得所有的緩存服務器得到同步,并推出它們所處的“時刻”。

假設我們寫入了一個編號為X的log,要從某個緩存服務器讀取數據,為了不讀到老數據,只需要保證:在緩存服務器將數據(同步)復制到X這個位置前,我們不從這個緩存中讀取任何東西即可。

此外,log還提供了作為緩沖區的能力,以支持生產者和消費者的行為以異步的方式進行。

最關鍵的一個支持異步的原因,是訂閱系統可能會發生崩潰、因維護而下線,接著恢復上線,而在這種情況下,每個訂閱者都以自己的步調消費數據。

一個批處理系統,比如Hadoop,或者一個數據倉庫,是以小時或天為單位消費數據,而一個實時系統,通常在秒級消費數據。
而數據源或者log,對消費數據的訂閱者一無所知,所以,需要在pipeline中做到無縫的添加訂閱者和移除訂閱者。

更重要的是,訂閱者,只需要知道log,而不需要對其所消費的數據的來源有任何了解,無論這個數據源是RDBMS、Hadoop,還是一個最新流行的K-V數據庫,等等。

之所以討論log,而不是消息系統,是因為不同的消息系統所保證的特性不同,并且用消息系統這個詞,難以全面和精確表達某種語義,因為消息系統,更重要的在于重定向消息。

但是,可以將log理解為這樣一種消息系統,其提供了持久性保證及強有序的語義,在通訊系統中,這稱作原子廣播。

2.4 在Linkedin

Linkedin目前的主要系統包括(注:2013年):

  • Search
  • Social Graph
  • Voldemort (K-V存儲)
  • Espresso (文檔存儲)
  • Recommendation engine
  • OLAP query engine
  • Hadoop
  • Terradata
  • Ingraphs (監控圖譜及metrics服務)

每個系統,都在其專業的領域提供專門的高級功能。

(這一段太長太長了,Jay兄十分能侃啊,所以挑重點的來記吧!)

1) 之所以引入數據流這個概念,是因為要在oracle數據庫的表之上,建立一個抽象的緩存層,為搜索引擎的索引構建和社交圖譜更新,提供拓展能力。

2) 為了更好的處理linkedin的一些推薦算法,開始搭Hadoop集群,但團隊在此塊的經驗尚淺,所以走了很多彎路。

3) 開始時,簡單粗暴地認為只要將數據從oracle數據倉庫中拉出來,丟進hadoop就可以了。結果發現:第一,將數據從oracle數據倉庫快速導出是 個噩夢;第二,也是更糟糕的一點,數據倉庫中某些數據的處理不對,導致了hadoop的批處理任務不能按預期輸出結果,且通過hadoop批處理執行任 務,通常不可逆,特別是在出了報表之后。

4) 最后,團隊拋棄了從數據倉庫中出數據的方式,直接以數據庫和logs為數據源。接著,造出了一個輪子:K-V 存儲(Voldemort)。

5) 即使是數據拷貝這樣不高大上的活兒,也占據了團隊大量的時間去處理,更糟的是,一旦數據處理的pipeline中有個點出錯,hadoop立馬變得廢柴,因為再牛逼的算法跑在錯誤的數據上,只有一個后果,就是產生更多的錯誤數據。

6) 即使團隊構建的東西抽象層次很高,針對每種數據源還是需要特定的配置,而這也是很多錯誤和失敗的根源。

7) 一大批程序員想跟進,每個程序員都有一大批的想法,集成這個系統,添加這個功能,整合這個特色,或者想要自定義的數據源。

8) Jay哥開始意識到:
第一, 雖然他們構建的pipelines還很糙,但是卻極其有價值。即使是解決了數據在新的系統(如hadoop)中可用的問題,也解鎖了一大批可能性。以前難做的計算開始變為可能。新的產品和分析,僅需要解鎖其它系統中的數據,并且進行整合,就可以容易地做出來。

第二, 很明顯,可靠地數據裝載需要更堅實的支撐,如果能夠捕獲所有的結構,就可以讓hadoop數據裝載完全自動化,不需要加入新的數據源或人工修改數據的模式。數據會神奇地出現在HDFS中,而新的數據源加入后,Hive的表會用合適的列自動化地、自適應地生成。

第三,數據覆蓋度遠遠不足。因為要處理很多新的數據源,很難。

9) 為了解決新數據源加入后的數據裝載問題,團隊開始了這樣的嘗試:

這里寫圖片描述

很快,他們發現這樣搞行不通,因為發布和訂閱、生產和消費,數據流通常還是雙向的,這成了一個O(n^2)的問題。
所以,他們需要的是這樣的模型:

這里寫圖片描述

需要將每個消費者從數據源隔離,理想的情況下,這些消費者只和一個data repository進行交互,而這個repository可以提供它們訪問任意數據的能力。

10)消息系統 + log = Kafka,kafka橫空出世。

2.5 Log和ETL、數據倉庫的關系

2.5.1 數據倉庫

1) 一個裝有干凈的、結構化的、集成的數據repository,用于分析。
2) 雖然想法很美好,但是獲取數據的方式有點過時了:周期性地從數據庫獲取數據,將其轉換為某種可讀性更佳的格式。
3) 之前的數據倉庫問題在于:將干凈的數據和數據倉庫高度耦合

數據倉庫,應該是一組查詢功能的集合,這些功能服務于報表、搜索、ad hot 分析,包含了計數(counting)、聚合(aggregation)、過濾(filtering)等操作,所以更應該是一個批處理系統。

但是將干凈的數據和這樣的一種批處理系統高度耦合在一起,意味著這些數據不能被實時系統消費,比如搜索引擎的索引構建、實時計算和實時監控系統,等等。

2.5.2 ETL

Jay哥認為,ETL無非做兩件事:

1) 對數據進行抽取和清洗,將數據從特定的系統中解鎖
2) 重構數據,使其能通過數據倉庫進行查詢。比如將數據類型變為適配某個關系型數據庫的類型,將模式轉換為星型或者雪花模式,或者將其分解為某種面向列的存儲格式。

但是,將這兩件事耦合在一起,問題很大,因為集成后的、干凈的數據,本應能被其它實時系統、索引構建系統、低延時的處理系統消費。

數據倉庫團隊,負責收集和清洗數據,但是,這些數據的生產者往往因為不明確數據倉庫團隊的數據處理需求,導致輸出很難被抽取和清洗的數據。
同時,因為核心業務團隊對和公司的其它團隊保持步調一致這件事兒不敏感,所以真正能處理的數據覆蓋度很低,數據流很脆弱,很難快速應對變化。

所以,更好的方式是:

這里寫圖片描述

如果想在一個干凈的數據集上做點搜索、實時監控趨勢圖、實時報警的事兒,以原有的數據倉庫或者hadoop集群來作為基礎設施,都是不合適的。更糟的是,ETL所構建的針對數據倉庫的數據加載系統,對其它(實時)系統點兒用沒有。

最好的模型,就是在數據發布者發布數據之前,就已經完成了數據的清洗過程,因為只有發布者最清楚它們的數據是什么樣的。而所有在這個階段所做的操作,都應該滿足無損和可逆

所有豐富語義、或添加值的實時轉換,都應在原始的log發布后處理(post-processing),包括為事件數據建立會話,或者添加某些感興趣的字段。原始的log依舊可被單獨使用,但是此類實時應用也派生了新的參數化的log。

最后,只有對應于具體的目標系統的數據聚合操作,應作為數據裝載的一部分,比如轉換為星型或雪花型模式,以在數據倉庫中進行分析和出報表。因為這個階段,就像傳統的ETL所做的那樣,因為有了非常干凈和規范的數據流,(有了log后)現在變得非常簡單。

2.6 Log文件和事件

以log為核心的架構,還有個額外的好處,就是易于實現無耦合的、事件驅動的系統。

傳統的 捕獲用戶活動和系統變化的方式,是將此類信息寫入文本日志,然后抽取到數據倉庫或者hadoop集群中進行聚合和處理,這個問題和前面所述的數據倉庫和ETL問題類似:數據與數據倉庫的高度耦合。

在Linkedin,其基于kafka構建了事件數據處理系統。為各種各樣的action定義了成百上千種事件類型,從PV、用戶對于廣告的趕腳(ad impressions)、搜索,到服務的調用和應用的異常,等等。

為了體會上述事件驅動系統的好處,看一個簡單的關于事件的例子:
在工作機會頁面上,提供一個機會。這個頁面應該只負責如何展示機會,而不應該過多地包含其它邏輯。但是,你會發現,在一個具有相當規模的網站中,做這件事,很容易就會讓越來越多的與展示機會無關的邏輯牽扯進來。

比如,我們希望集成以下系統功能:
1) 我們需要將數據發送到hadoop和數據倉庫做離線處理。
2) 我們需要統計頁面瀏覽次數,以確保某些瀏覽不是為了抓取網頁內容什么的。
3) 我們需要聚合對此頁面的瀏覽信息,在機會發布者的分析頁面上呈現。
4) 我們需要記錄某用戶對此頁面的瀏覽記錄,以確保我們對此用戶提供了有價值的、體驗良好的任何適宜此用戶的工作機會,而不是對此用戶一遍又一遍地重復展示某 個機會(想想老婆不在家才能玩的游戲吧,那紅綠藍閃爍的特效,配合那勁爆的DJ風舞曲,或者那搖擺聚焦的事業峰和齊X小短裙的girls,然后點進去才發 現是標題黨的ad吧!)。
5) 我們的推薦系統需要記錄對此頁面的瀏覽記錄,以正確地追蹤此工作機會的流行度。

很快,僅僅展示機會的頁面邏輯,就會變得復雜。當我們在移動端也增加了此機會的展示時,不得不把邏輯也遷移過去,這又加劇了復雜程度。還沒完,糾結的東西是,負責處理此頁面的工程師,需要有其它系統的知識,以確保上述的那些功能能正確的集成在一起。

這只是個極其簡單的例子,在實踐中,情況只會更加復雜。
事件驅動可以讓這件事變得簡單。

負責呈現機會的頁面,只需要呈現機會并記錄一些和呈現相關的因素,比如工作機會的相關屬性,誰瀏覽了這個頁面,以及其它的有用的與呈現相關的信息。 頁面不需要保持對其它系統的知識和了解,比如推薦系統、安全系統、機會發布者的分析系統,還有數據倉庫,所有的這些系統只需要作為訂閱者,訂閱這個事件, 然后獨立地進行它們各自的處理即可,而呈現機會的頁面不需要因為新的訂閱者或消費者的加入而做出修改。

2.7 構建可擴展的log

分離發布者和訂閱者不新鮮,但是要保證多個訂閱者能夠實時處理消息,并且同時保證擴展能力,對于log系統來說,是一件比較困難的事。

如果log的構建不具備快速、低開銷和可擴展能力,那么建立在此log系統之上的一切美好都免談。

很多人可能認為log系統在分布式系統中是個很慢、重型開銷的活兒,并且僅用來處理一些類似于ZooKeeper更適合處理的元數據等信息。

但是Linkedin現在(注:2013年),在kafka中每天處理600億條不同的消息寫入(如果算數據中心的鏡像的話,那就是幾千億條寫入)。

Jay哥他們怎么做到的呢?

1) 對log進行分割(partitioning the log)
2) 通過批量讀寫優化吞吐量
3) 避免不必要的數據拷貝

通過將log切為多個partition來提供擴展能力:

這里寫圖片描述

1) 每個partition都是有序的log,但是partitions之間沒有全局的順序。

2) 將消息寫入哪個partition完全由寫入者控制,通過依照某種類型的key(如user_id)進行分割。

3) 分割使得log的附加操作,可以不用在分片(sharding)之間進行協調就進行,同時,保證系統的吞吐量和kafka集群的規模呈線性關系。

4) 雖然沒有提供全局順序(實際上消費者或者訂閱者成千上萬,討論它們的全局順序一般沒有啥價值),但是kafka提供了這樣一種保證:發送者按照什么順序將 消息發給某個partition,從這個partition遞交出去的消息就是什么順序(什么順序進,什么順序出)。

5) 每個partition都按照配置好的數目進行復制,如果一個leader節點掛了,其它的節點會成為新的leader。

6) 一條log,同文件系統一樣,線性的讀寫模式可被優化,將小的讀寫log可以組成更大的、高吞吐量的操作。Kafka在這件事上做的很猛。批處理用在了各 種場景之下,比如客戶端將數據發送到服務端、將數據寫入磁盤、服務器之間的數據復制、將數據傳送給消費者,以及確認提交數據等場景。

7) 最后,kafka在內存log、磁盤log、網絡中發送的log上,采用了很簡單的二進制格式,以利于利用各種優化技術,比如零拷貝數據傳輸技術(zero-copy data transfer)。

諸多的優化技術,匯聚起來,可以讓你即使在內存爆滿的情形下,也能按照磁盤或網絡能提供的最大能力進行數據讀寫。

2.8 Logs和實時處理

你以為Jay哥提供了這么個美麗的方法把數據復制來復制去就完了?
你!錯!了!

Log是流的另一種說法,logs是流處理的核心。

2.8.1 什么是流處理

Jay哥認為:
1)流處理是連續數據處理的基礎設施。
2)流處理的計算模型,就如同MapReduce或其它分布式處理框架一樣,只是需要保證低延遲。
3)批處理式的收集數據模式,導致了批處理式的數據處理模式。
4)連續的收集數據模式,導致了連續的數據處理模式。
5)Jay哥講了個美國人口普查的方式來解釋批處理。

在linkedin,無論是活動數據還是數據庫的變化,都是連續的。
批處理按天處理數據,和連續計算將窗口設為一天雷同。

所以,流處理是這樣一種過程:
6)在處理數據時,帶了一個時間的概念,不需要對數據保持一個靜態的快照,所以可以在用戶自定義的頻率之下,輸出結果,而不必等數據集到達某種“結束”的狀態。
7)從這個意義上講,流處理是批處理的一種泛化,并且考慮到實時數據的流行程度,這是一種極其重要的泛化。
8)許多商業公司無法建立流處理引擎,往往因為無法建立流數據收集引擎。
9)流處理跨越了實時響應式服務和離線批處理的基礎設施之間的鴻溝。
10)Log系統,解決了很多流處理模式中的關鍵問題,其中最大的一個問題就是如何在實時的多個訂閱者模式下,提供可用數據的問題(流數據收集)。

2.9 數據流圖譜

流處理中最有趣的地方在于,其拓展了什么是數據源(feeds)這一概念。
無論是原始數據的logs、feeds,還是事件、一行一行的數據記錄,都來自應用程序的活動。
但是,流處理還可以讓我們處理來自其它feeds的數據,這些數據和原始數據,在消費者看來,并無二致,而這些派生的feeds可以包含任意程度的復雜性。

這里寫圖片描述

一個流處理任務,應該是這樣的:從logs讀取數據,將輸出寫入logs或者其它系統。

作為輸入和輸出的logs,連通這些處理本身,和其它的處理過程,構成了一個圖。

事實上,以log為核心的系統,允許你將公司或機構中的數據捕獲、轉換以及數據流,看作是一系列的logs及在其上進行寫入的處理過程的結合。

一個流處理程序,其實不必很高大上:可以是一個處理過程或者一組處理過程,但是,為了便于管理處理所用的代碼,可以提供一些額外的基礎設施和支持。

引入logs有兩個目的:

1) 保證了數據集可以支持多個訂閱者模式,及有序。
2) 可以作為應用的緩沖區。這點很重要,在非同步的數據處理進程中,如果上游的生產者出數據的速度更快,消費者的速度跟不上,這種情況下,要么使處理進程阻塞,要么引入緩沖區,要么丟棄數據。
丟 棄數據似乎不是個好的選擇,而阻塞處理進程,會使得所有的數據流的處理圖譜中的處理進程卡住。而log,是一種很大,特大,非常大的緩沖區,它允許處理進 程的重啟,使得某個進程失敗后,不影響流處理圖譜中的其它進程。這對于一個龐大的機構去擴展數據流是非常關鍵的,因為不同的團隊有不同的處理任務,顯然不 能因為某個任務發生錯誤,整個流處理進程都被卡住。

Storm和Samza就是這樣的流處理引擎,并且都能用kafka或其它類似的系統作為它們的log系統。

(注:Jay哥相當猛,前有kafka,后有samza。)

2.10 有狀態的實時處理

很多流處理引擎是無狀態的、一次一記錄的形式,但很多用例都需要在流處理的某個大小的時間窗口內進行復雜的counts , aggregations和joins操作。
比如,點擊流中,join用戶信息。

那么,這種用例,就需要狀態的支持。在處理數據的地方,需要維護某個數據的狀態。

問題在于,如何在處理者可能掛掉的情況下保持正確的狀態?

將狀態維護在內存中可能是最簡單的,但抵不住crash。

如果僅在某個時間窗口內維護狀態,當掛掉或者失敗發生,那么處理可以直接回退到窗口的起點來重放,但是,如果這個窗口有1小時那么長,這可能行不通。

還有個簡單的辦法,就是把狀態存在某個遠程的存儲系統或數據庫中,但是這會損失數據的局部性并產生很多的網絡間數據往返(network round-trip)。

回憶下,上文中曾提到的數據庫中的表和log的對偶性
一個流處理組件,可以使用本地的存儲或索引來維護狀態:

  • Bdb
  • Leveldb
  • Lucene
  • Fastbit

通過記錄關于本地索引的changelog,用于在crash后恢復狀態。這種機制,其實也揭示了一種一般化的,可以存儲為任意索引類型的,與輸入流同時被分割(co-partitioned)的狀態。

當處理進程崩潰,其可以從changelog中恢復索引,log充當了將本地狀態轉化為某種基于時間備份的增量記錄的角色。

這種機制還提供了一種很優雅的能力:處理過程本身的狀態也可以作為log被記錄下來,顯然,其它的處理過程可以訂閱這個狀態。

結合數據庫中的log技術,針對數據集成這一場景,往往可以做出很強大的事:

將log從數據庫中抽取出來,并在各種各樣的流處理系統中進行索引,那么,與不同的事件流進行join就成為可能。

2.11 Log 合并

顯然,用log記錄全時全量的狀態變更信息,不太可能。

Kafka使用了log合并或者log垃圾回收技術:

1) 對于事件數據,kafka只保留一個時間窗口(可在時間上配置為幾天,或者按空間來配置)
2) 對于keyed update,kafka采用壓縮技術。此類log,可以用來在另外的系統中通過重放技術來重建源系統的狀態。

如果保持全時全量的logs,隨著時間增長,數據將會變得越來越大,重放的過程也會越來越長。
Kafka不是簡單地丟棄老的日志信息,而是采用合并的方式,丟棄廢棄的記錄,比如,某個消息的主鍵最近被更新了。

這里寫圖片描述

2.12 系統構建

2.12.1 分布式系統

Log,在分布式數據庫的數據流系統和數據集成中所扮演的角色是一致的:

  • 抽象數據流
  • 保持數據一致性
  • 提供數據恢復能力

你可以將整個機構中的應用系統和數據流,看作是一個單獨的分布式數據庫。
將面向查詢的獨立系統,比如Redis , SOLR , Hive tables 等等,看作是一種特別的、數據之上的索引。
將Storm、Samza等流處理系統,看做一種精心設計過的觸發器或者物化視圖機制。

各式各樣的數據系統,爆發性的出現,其實,這種復雜性早已存在。
在關系型數據庫的輝煌時期(heyday),某個公司或者機構光關系型數據庫就有很多種。

顯然,不可能將所有的東西都丟進一個Hadoop集群中,期望其解決所有的問題。所以,如何構建一個好的系統,可能會像下面這樣:

構建一個分布式系統,每個組件都是一些很小的集群,每個集群不一定能完整提供安全性、性能隔離、或者良好的擴展性,但是,每個問題都能得到(專業地)解決。

Jay哥覺得,之所以各式各樣的系統爆發性地出現,就是因為要構建一個強大的分布式系統十分困難。而如果將用例限制到一些簡單的,比如查詢這樣的場景下,每個系統都有足夠的能力去解決問題,但是要把這些系統整合起來,很難。

Jay哥覺得在未來構建系統這事兒有三種可能:

1) 保持現狀。這種情況下,數據集成依然是最頭大的問題,所以一個外部的log系統就很重要(kafka!)
2) 出現一個強大的(如同輝煌時期的關系型數據庫)能解決所有問題的系統,這似乎有點不可能發生。
3) 新生代的系統大部分都開源,這揭示了第三種可能:數據基礎設施可被離散為一組服務、以及面向應用的系統API,各類服務各司其事,每個都不完整,卻能專業滴解決專門的問題,其實通過現存的java技術棧就能看出端倪:

  • ZooKeeper:解決分布式系統的同步、協作問題(也可能受益于更高抽象層次的組件如helix、curator).
  • Mesos、YARN:解決虛擬化和資源管理問題。
  • 嵌入式的組件Lucene、LevelDB:解決索引問題。
  • Netty、Jetty及更高抽象層次的Finagle、rest.li解決遠程通訊問題。
  • Avro、Protocol Buffers、Thrift及umpteen zlin:解決序列化問題。
  • Kafka、bookeeper:提供backing log能力。

從某種角度來看,構建這樣的分布式系統,就像某個版本的樂高積木一樣。這顯然跟更關心API的終端用戶沒有太大關系,但是這揭示了構建一個強大系統并保持簡單性的一條道路:
顯然,如果構建一個分布式系統的時間從幾年降到幾周,那么構建一個獨立的龐大系統的復雜性就會消失,而這種情況的出現,一定是因為出現了更可靠、更靈活的“積木”。

2.12.2 Log在系統構建中的地位

如果一個系統,有了外部log系統的支持,那么每個獨立的系統就可以通過共享log來降低其自身的復雜性,Jay哥認為log的作用是:

1) 處理數據一致性問題。無論是立即一致性還是最終一致性,都可以通過序列化對于節點的并發操作來達到。

2) 在節點間提供數據復制。

3) 提供“提交”的語義。比如,在你認為你的寫操作不會丟失的情況下進行操作確認。

4) 提供外部系統可訂閱的數據源(feeds)。

5) 當節點因失敗而丟失數據時,提供恢復的能力,或者重新構建新的復制節點。

6) 處理節點間的負載均衡。

以上,大概是一個完整的分布式系統中應提供的大部分功能了(Jay哥確實愛Log!),剩下的就是客戶端的API和諸如一些構建索引的事了,比如全文索引需要獲取所有的partitions,而針對主鍵的查詢,只需要在某個partition中獲取數據。

(那把剩下的事情也交代下吧,Jay哥威武!)

系統可被分為兩個邏輯組件(這強大的理解和功力):

1) Log層
2) 服務層

Log層,以序列化的、有序的方式捕獲狀態的變化,而服務層,則存儲外部查詢需要的索引,比如一個K-V存儲可能需要B-tree、sstable索引,而一個搜索服務需要倒排索引。

寫操作既可以直接入log層,也可以通過服務層做代理。寫入log會產生一個邏輯上的時間戳(log的索引),比如一個數字ID,如果系統partition化了,那么,服務層和log層會擁有相同的partitions(但其各自的機器數可能不同)。

這里寫圖片描述

服務層訂閱到log層,并且以最快的速度、按log存儲的順序追log,將數據和狀態變化同步進自己的本地索引中。

客戶端將會得到read-your-write的語義:

通過對任一一個節點,在查詢時攜帶其寫入時的時間戳,服務層的節點收到此查詢,通過和其本地索引比較時間戳,如果必要,為了防止返回過期的老數據,推遲請求的執行,直到此服務節點的索引同步跟上了時間戳。

服務層的節點,也許需要、也許不需要知道leader的概念。在很多簡單的用例中,服務層可不構建leader節點,因為log就是事實的來源。

還有一個問題,如何處理節點失敗后的恢復問題。可以這樣做,在log中保留一個固定大小的時間窗口,同時對數據維護快照。也可以讓log保留數據的 全量備份并使用log合并技術完成log自身的垃圾回收。這種方法,將服務層的眾多復雜性移至log層,因為服務層是系統相關(system- specific)的,而log層確可以通用。

基于log系統,可以提供一組完備的、供開發使用的、可作為其它系統的ETL數據源、并供其它系統訂閱的API。

Full Stack !:

這里寫圖片描述

顯然,一個以log為核心的分布式系統,其本身立即成為了可對其它系統提供數據裝載支持及數據流處理的角色。同樣的,一個流處理系統,也可以同時消費多個數據流,并通過對這些數據流進行索引然后輸出的另一個系統,來對外提供服務。

基于log層和服務層來構建系統,使得查詢相關的因素與系統的可用性、一致性等因素解耦。

也許很多人認為在log中維護數據的單獨備份,特別是做全量數據拷貝太浪費、太奢侈,但事實并非如此:

1) linkedin(注:2013年)的kafka生產集群維護了每數據中心75TB的數據,而應用集群需要的存儲空間和存儲條件(SSD+更多的內存)比kafka集群要高。
2) 全文搜索的索引,最好全部裝入內存,而logs因為都是線性讀寫,所以可以利用廉價的大容量磁盤。
3) 因為kafka集群實際運作在多個訂閱者的模式之下,多個系統消費數據,所以log集群的開銷被攤還了。
4) 所有以上原因,導致基于外部log系統(kafka或者類似系統)的開銷變得非常小。

2.13 結語

Jay哥在最后,不僅厚道地留下了很多學術、工程上的有價值的論文和參考鏈接,還很謙遜地留下了這句話:

If you made it this far you know most of what I know about logs.

終。

【版權所有@foreach_break】 【博客地址 http://www.cnblogs.com/foreach-break】 可以轉載,但必須注明出處并保持博客超鏈接
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!