海量數據實時計算利器Tec
引子
在剛剛過去的2015年雙11大促中,搜索事業部的實時計算和在線學習系統Pora經受住了前所未有的雙11巨量用戶行為消息的沖擊,在流入實時消息量持續超過300w/s,甚至峰值飆升至501w/s的壓力下始終保持了端到端秒級實時效果,助力相關的搜索和推薦實時業務取得了很好的效果。
Pora如何能在如此巨大的壓力下不延遲?除了其在業務層面所做的種種優化以及集群層面的有力支持外,與其核心層Tec的設計實現是分不開的。
另一條完全不同的戰線上,AliExpress(AE)搜索的離線數據庫Dump業務在雙11期間實現了從幾小時批量到秒級實時的歷史性飛躍,同時做到了全量增量一體化。得益于離線實時化,AE算法在雙11中也取得了驚艷的效果。
與Pora相同的是,AE的離線新架構中也采用了Tec進行實時計算。
什么是Tec?
Tec是針對海量流式數據實時計算場景的一套高效的輕量級實時計算框架,支持快速開發高吞吐、低延遲的實時應用。
Tec名稱取自于潮汐發電領域的Tidal Energy Converter(TEC),寓意其最適合的應用場景是在海量的流式數據驅動下作實時計算產出高價值的數據。
這里的實時計算具體邏輯可以依據應用需要靈活定制,可以是對流式輸入數據進行的簡單加工處理,也可以是基于輸入數據和其它已有數據的復雜加工處理。無論何種場景,Tec都會盡可能快速地完成所有邏輯的處理以保證實時,這對于海量數據實時業務中經常需要關聯查詢或更新相關數據的復雜場景特別有用。
在實時基礎上,Tec還作了大量封裝和抽象,具備很強的通用性,從而使得開發各種實時應用可以很簡單。
歷史
Tec脫胎于Pora。
從2013年開始第一代Pora系統的研發后,期間經歷海量數據和復雜實時業務場景的千錘百煉之后,終于到2014年形成了高性能的第二代Pora系統,其核心層部分已對實時數據的處理過程作了相當程度的優化,這一系統在14年的雙11大促中初試身手即通過其實時性能幫助算法取得了實時流量調控的很好效果。
我們在14年中的時候已經意識到Pora的核心層完全可以復用于更普遍的流式計算場景,解決更多業務的實時性問題,于是在14年雙11之后立刻啟動了相關的工作,將Pora核心層代碼剝離出來,將其作了更深層的抽象和改進后,形成了現在的Tec。
目前Tec除了應用于類似Pora的實時日志處理場景,也廣泛用于搜索Dump中心的其它實時場景。
特點
Tec具備以下鮮明特點。
- 低延遲
Tec內部的In-Memory DAG實時計算框架通過盡可能地將DAG節點并行處理,可最大程度加快數據處理過程,從而縮短總體端到端數據處理延遲。
- 高吞吐
除了低延遲帶來的吞吐保障外,Tec支持多線程并發處理,每個線程相互獨立,通過內部的DAG批量處理輸入數據,從而可進一步提高整體吞吐。
- 海量數據處理
Tec原生支持HBase作為海量數據存儲并在使用方式上進行了大量抽象和優化,方便應用使用,并確保支持快速的高并發隨機讀寫。
- 易嵌入
通過使用Tec提供的API,開發人員可以很方便地將Tec嵌入到自己的程序中用于數據處理。
- 跨平臺
由于易嵌入的特點,Tec可以嵌入多種計算平臺的分布式應用中(比如iStream和MapReduce),同時復用代碼邏輯。
- 易監控
Tec自帶了眾多Metric并允許使用方擴充,可方便實時或事后觀察系統運行情況,統計運行數據;同時Tec也提供Trace API,支持跟蹤單條數據處理的詳細過程。
- 少開發
Tec抽象出了通用的存儲、數據結構和常用數據處理邏輯,可復用于眾多業務場景,開發人員只需開發少量較特殊的業務邏輯;同時Tec支持和鼓勵使用配置代替代碼,從而使得業務開發維護可以進一步簡化為配置工作。
- 可定制
Tec采用松耦合的設計,對通用的存儲、數據結構和處理邏輯無法滿足的場景,用戶可遵循Tec的接口靈活擴展定制。
- 通用
Tec適用于任何流式計算場景,特別適合海量流式數據的實時處理。
定位
Tec定位于底層流式計算平臺和數據存儲之上,業務層之下,通過嵌入到底層流式計算平臺的處理進程中運行。處在這個位置,Tec可以隔離業務和底層系統,通過底層計算平臺接入數據,通過自身高度優化后的實時計算實現保證業務實際運行時具備高吞吐和低延遲的能力,通過暴露少量接口給業務層大幅降低業務開發成本。
由此帶來的另外一個好處是,業務可以在不同底層計算平臺(比如MapReduce和iStream)間復用相同代碼,這對某些既需要批處理又需要實時流程的業務(比如搜索批次全量和實時增量)來說意味著可以只需要維護一套代碼,同時也降低了未來可能的底層平臺切換成本。
目前Tec使用的底層系統和支持的上層業務如下圖所示。
業務層表示不同業務場景可以基于Tec定制自身特定的平臺框架,比如Pora和Dump領域各種完全不同的業務平臺。
計算平臺部分目前原生支持iStream和MapReduce,分別對應long-running流式計算和批次流式計算場景。后者聽上去有點奇怪,但實際上很多批次任務特別是Map Only任務在處理數據時大多是逐條處理的,其本質還是流式處理,因而完全可以使用Tec。至于其它目前未支持的流式計算平臺,理論上Tec也都可以很容易地嵌入其中。
針對海量數據,HBase既可以提供存儲支持,又可以提供快速隨機讀寫,從而使得對海量數據流的各種實時的復雜關聯計算成為可能,Tec因此選擇它作為原生支持的數據存儲進行了抽象封裝和大量的優化使用,使應用開發可以低門檻高效率的使用HBase。HBase之上,HQueue是Tec目前配合iStream使用時在上下游應用間可能需要重新分發數據時默認支持的一種基于HBase的隊列實現,OpenTsdb則用于存儲Tec的Metric歷史數據以便監控應用運行狀況。
DAG實時計算框架
基本概念
- Container
嵌入式場景下不同流式計算平臺中Tec的載體,比如對應IStream的IStreamContainer、對應MapReduce的MapContainer/ReduceContainer。
- TecWorker
每個Container實例內部通過實例化一個TecWorker對象將Tec嵌入其內部,將數據交由TecWorker處理。
- TecThread
一個TecWorker內部可以有多個TecThread線程,各自獨立地并發處理輸入數據。
- Dispatching
TecWorker將數據交由某個TecThread的過程稱為Dispatching,對應的實現類稱為Dispatcher。
- 數據源
每個輸入數據對象都有一個自己所屬數據源的標記。
- DAG處理鏈
允許對每個數據源配置處理鏈,處理鏈包含多個節點,節點間按數據處理流程彼此依賴形成一個DAG。
TecThread內部對每個數據源維護一個內存中的DAG處理鏈,輸入數據會觸發對應數據源的DAG處理鏈上每個節點依次執行,直到所有節點都執行完畢后認為該輸入數據處理完畢。
- Executor
DAG處理鏈上的每個節點稱為一個Executor,表示該節點負責的具體數據處理邏輯。
基本數據流
Container(對應MapReduce是Mapper/Reducer的Task進程,對應iStream是Role的Worker進程)獲取輸入數據,交由內部的TecWorker。
TecWorker將數據通過Dispatching機制實時轉發至緩沖區。
各個TecThread以不停循環的方式,異步地從緩沖區內獲取最新一批數據,驅動內部的DAG處理鏈,DAG上各個節點(稱為Executor)負責實現各個不同的具體處理邏輯,按照彼此依賴關系順序或并行的執行,完成對一批數據的處理。
DAG處理鏈
針對每一種輸入數據,將總的處理流程細分為一些更小的節點,每個節點實現對應的Executor Java接口中的如下3個方法。
- init
void init(String initParam)
TecThread初始化Executor的時候執行,一個Executor只會被初始化一次。
- execute
Object[] execute(Object... inObjs)
定義了處理數據的具體邏輯,對不同輸入數據反復執行該方法。
輸入參數Object數組表示來自數據源或上游Executor輸出的輸入數據。假設該Executor配置了M個輸入,則傳入數組的長度也為M,其中每個元素分別表示一個輸入。
返回值Object數組表示該Executor的輸出數據。假設該Executor配置了N個輸出,則輸出數組長度也為N,其中每個元素分別表示一個輸出。
- cleanup
void cleanup()
只在TecThread退出的時候執行一次。
實現各個Executor后需要配置在某個數據源的處理鏈上,并指定Executor的輸入輸出關系,通過這種輸入輸出依賴自動形成了DAG。某個Executor當且僅當其需要的所有輸入數據都準備好之后可以執行,這同時意味著某些情形下多個不同Executor可以并行執行。
下圖是Pora中的一個簡化了的DAG例子。

上圖中幾個g_開頭的Executor是典型的訪問存儲的Executor,通常相比其它Executor慢,因而將這些Executor并行化可以明顯縮短DAG總的處理耗時。
DAG中所有的Executor位于相同的進程內,所以其輸入輸出數據都在同一個JVM heap內,因而可以通過內存直接獲取Executor需要的輸入數據,不需要再經過任何序列化/反序列化和消息傳遞過程,既提升了處理效率也使得Executor可處理任意Object數據。
多線程并發
海量數據處理中通常需要訪問存儲,在這種場景下單線程處理通常總會受制于IO而無法充分利用cpu的計算能力,對此Tec支持多個TecThread并發處理,可更充分地利用cpu從而進一步加大吞吐。
每個TecThread相互獨立,內部使用相同拓撲的DAG處理鏈,處理不同的輸入數據。每批輸入數據處理完后,TecThread隨即從緩沖區中獲取新的一批實時輸入數據。獲取的策略是實時小批量,即以實時性為主兼顧批處理能帶來的效率提升。簡單說是在不超過batch.max前提下有多少取多少,從而使得緩沖區內的數據能被最快地處理完成。這里采用小批量的方式有助于業務針對批量作優化,比如去重、聚合、批量訪問存儲等。
Dispatching
引入多線程后,TecWorker需要選擇將數據發給哪個TecThread。目前Tec支持以下兩種分發策略,兩種方式各有利弊。
缺省的RoundRobin方式,好處是每個TecThread收到大體相同數量的數據,沒有數據傾斜;壞處是不適合多線程并發修改相同key的業務場景。
FieldDispatching方式,好處是可以將相同key的數據交由同一個TecThread處理,避免多線程并發修改的問題;壞處是有可能因為熱點數據造成數據傾斜(針對這種熱點Tec會自動識別并優化處理盡可能減輕影響)。
通用數據結構
Executor設計上支持任意Object的處理,但為了盡可能提供通用實現以減輕應用開發成本,Tec提供了FieldMap的通用數據結構。
FieldMap是一個Map實現類,其數據是字段名到字段值的映射,符合大量業務場景數據建模需求。
基于FieldMap Tec可以支持FieldDispatching,確保相同字段值的FieldMap數據被相同TecThread處理。FieldMap也支持序列化/反序列化,以便存儲至隊列或其它介質。另外,針對FieldMap Tec還提供FieldMatcher,可用來檢查FieldMap數據是否符合特定的字段值條件,從而更方便地對數據做過濾。
為了方便后續處理,Tec建議在DAG的第一個Executor中將輸入數據轉換為FieldMap數據,此類Executor稱為InputParser。為此Tec提供了一個抽象類BaseInputParser供其它具體InputParser繼承實現,BaseInputParser除定義方法接口外,還附帶了實時統計qps和gap metric的功能。如果輸入數據本身就是FieldMap類型,則可以直接使用FieldMapParser作為InputParser。
存儲抽象
Tec原生支持HBase作為海量數據存儲,并針對常用的隨機查詢模式進行了針對性的封裝、抽象和優化,使應用可以高效率低門檻地使用HBase。
Tec首先通過抽象類BaseTable在rowkey sharding、batch訪問、HTable創建等方面統一了對hbase的使用,既方便了用戶又優化了讀寫hbase的性能。
在此基礎上,Tec更進一步將常用HTable抽象為KVTable,KKVTable和KKTVTable 3種,并提供了通用實現及對應的Executor,從而使得大部分業務不需要再開發任何有關HBase讀寫的代碼。
通用計算邏輯
以FieldMap和HBase抽象為基礎,Tec提供了目前目前已知的各種FieldMap操作及HBase讀寫等通用Executor實現,新業務基于Tec開發時可直接復用這些已有的Executor實現,不需要再開發。
通過這些通用實現,Tec希望新業務開發時可以盡可能地復用各種通用Executor,只在業務確實有特殊邏輯時定制自己的Executor,從而用最少的開發工作實現業務需求。
下圖是一個典型的實時業務場景對應的DAG處理鏈。
其中,parse將輸入數據解析為FieldMap,getXXX基于解析結果查詢若干hbase表數據,joinXXX將查到的hbase數據合并到FieldMap,modify由業務根據需要對數據進行加工,最后將結果輸出回一張hbase表。
圖中只有modify Executor需要業務自主實現,其它Executor都已有通用實現可以復用。
高級功能
以上是Tec的基本功能,除此之外Tec還有其它一些高級功能。
熱點優化
針對FieldDispatching時可能出現的數據傾斜,Tec實現了相應的自動優化處理功能。對正常范圍的數據傾斜,接收數據最多的TecThread將獲得更大的緩沖區,以減輕對其它TecThread的影響;對熱點數據造成的數據傾斜,Tec可以自動識別出這些熱點數據并開辟專用的TecThread進行處理,既可以避免對其它TecThread造成影響,又可以允許業務通過批量去重、聚合等手段加快對熱點數據的處理。
LRUKVCache
Tec對hbase等存儲的抽象中還支持開啟cache,這是一個write-through的LRU kv cache,在TecWorker級別緩存存儲中訪問最頻繁的數據,供TecThread共享使用。LRUKVCache可減少對存儲的讀取,特別是配合FieldDispatching策略時可以顯著減少對主鍵關聯數據的查詢,既減少對存儲系統的壓力,減輕熱點數據影響,又可以加快DAG處理速度,進一步縮短延遲。
其它
此外,Tec還有子DAG、異步Executor實現、配合IStream熱切換配置等高級功能,這里不再展開。
總結
Tec通過DAG實時計算框架和對存儲的優化使用確保低延遲和高吞吐,通過大量的抽象和通用Executor實現大幅度減少應用開發工作量,可極大降低海量數據實時計算應用的開發成本。
來自: http://yq.aliyun.com/articles/2971