ElasticSearch Aggregations 分析
來自: http://www.jianshu.com/p/56ad2b7e27b7
承接上篇文章 ElasticSearch Rest/RPC 接口解析 ,這篇文章我們重點分析讓ES步入數據分析領域的Aggregation相關的功能和設計。
</div>
前言
我記得有一次到一家公司做內部分享,然后有研發問我,即席分析這塊,他們用ES遇到一些問題。我當時直接就否了,我說ES還是個全文檢索引擎,如果要做分析,還是應該用Impala,Phenix等這種主打分析的產品。隨著ES的發展,我現在對它的看法,也有了比較大的變化。而且我認為ES+Spark SQL組合可以很好的增強即席分析能夠處理的數據規模,并且能夠實現復雜的邏輯,獲得較好的易用性。
需要說明的是,我對這塊現階段的理解也還是比較淺。問題肯定有不少,歡迎指正。
Aggregations的基礎
Lucene 有三個比較核心的概念:
- 倒排索引
- fieldData/docValue
- Collector
倒排索引不用我講了,就是term -> doclist的映射。
fieldData/docValue 你可以簡單理解為列式存儲,索引文件的所有文檔的某個字段會被單獨存儲起來。 對于這塊,Lucene 經歷了兩階段的發展。第一階段是fieldData ,查詢時從倒排索引反向構成doc-term。這里面有兩個問題:
- 數據需要全部加載到內存
- 第一次構建會很慢
這兩個問題其實會衍生出很多問題:最嚴重的自然是內存問題。所以lucene后面搞了DocValue,在構建索引的時候就生成這個文件。DocValue可以充分利用操作系統的緩存功能,如果操作系統cache住了,則速度和內存訪問是一樣的。
另外就是Collector的概念,ES的各個Aggregator 實現都是基于Collector做的。我覺得你可以簡單的理解為一個迭代器就好,所有的候選集都會調用 Collector.collect(doc) 方法,這里collect == iterate 可能會更容易理解些。
ES 能把聚合做快,得益于這兩個數據結構,一個迭代器。我們大部分聚合功能,其實都是在fieldData/docValue 上工作的。
Aggregations 分類
Aggregations種類分為:
- Metrics
- Bucket
Metrics 是簡單的對過濾出來的數據集進行avg,max等操作,是一個單一的數值。
Bucket 你則可以理解為將過濾出來的數據集按條件分成多個小數據集,然后Metrics會分別作用在這些小數據集上。
對于最后聚合出來的結果,其實我們還希望能進一步做處理,所以有了Pipline Aggregations,其實就是組合一堆的Aggregations 對已經聚合出來的結果再做處理。
Aggregations 類設計
下面是一個聚合的例子:
{ "aggregations": { "user": { "terms": { "field": "user", "size": 10, "order": { "_count": "desc" } } } } }
其語義類似這個sql 語句: select count(*) as user_count group by user order by user_count desc 。
對于Aggregations 的解析,基本是順著下面的路徑分析:
TermsParser -> TermsAggregatorFactory -> GlobalOrdinalsStringTermsAggregator
在實際的一次query里,要做如下幾個階段:
-
Query Phase 此時 會調用GlobalOrdinalsStringTermsAggregator的Collector 根據user 的不同進行計數。
-
RescorePhase
-
SuggestPhase
-
AggregationPhase 在該階段會會執行實際的aggregation build, aggregator.buildAggregation(0) ,也就是一個特定Shard(分片)的聚合結果
-
MergePhase。這一步是由接受到請求的ES來完成,具體負責執行Merge(Reduce)操作 SearchPhaseController.merge 。這一步因為會從不同的分片拿到數據再做Reduce,也是一個內存消耗點。所以很多人會專門搞出幾臺ES來做這個工作,其實就是ES的client模式,不存數據,只做接口響應。
在這里我們我們可以抽取出幾個比較核心的概念:
- AggregatorFactory (生成對應的Aggregator)
- Aggregation (聚合的結果輸出)
- Aggregator (聚合邏輯實現)
另外值得注意的,PipeLine Aggregator 我前面提到了,其實是對已經生成的Aggregations重新做加工,這個工作是只能單機完成的,會放在請求的接收端執行。
Aggregation Bucket的實現
前面的例子提到,在Query 階段,其實就會調用Aggregator 的collect 方法,對所有符合查詢條件的文檔集都會計算一遍,這里我們涉及到幾個對象:
- doc id
- field (docValue)
- IntArray 對象
collect 過程中會得到 doc id,然后拿著docId 到 docValue里去拿到field的值(一般而言字符串也會被編碼成Int類型的),然后放到IntArray 進行計數。如果多個doc id 在某filed里的字段是相同的,則會遞增計數。這樣就實現了group by 的功能了。
Spark-SQL 和 ES 的組合
我之前一直在想這個問題,后面看了下es-hadoop的文檔,發現自己有些思路和現在es-hadoop的實現不謀而合。主要有幾點:
- Spark-SQL 的 where 語句全部(或者部分)下沉到 ES里進行執行,依賴于倒排索引,DocValues,以及分片,并行化執行,ES能夠獲得比Spark-SQL更優秀的響應時間
- 其他部分包括分片數據Merge(Reduce操作,Spark 可以獲得更好的性能和分布式能力),更復雜的業務邏輯都交給Spark-SQL (此時數據規模已經小非常多了),并且可以做各種自定義擴展,通過udf等函數
- ES 無需實現Merge操作,可以減輕內存負擔,提升并行Merge的效率(并且現階段似乎ES的Reduce是只能在單個實例里完成)
</ol> </div>