ElasticSearch Aggregations 分析

來自: http://www.jianshu.com/p/56ad2b7e27b7

承接上篇文章 ElasticSearch Rest/RPC 接口解析 ,這篇文章我們重點分析讓ES步入數據分析領域的Aggregation相關的功能和設計。

</div>

前言

我記得有一次到一家公司做內部分享,然后有研發問我,即席分析這塊,他們用ES遇到一些問題。我當時直接就否了,我說ES還是個全文檢索引擎,如果要做分析,還是應該用Impala,Phenix等這種主打分析的產品。隨著ES的發展,我現在對它的看法,也有了比較大的變化。而且我認為ES+Spark SQL組合可以很好的增強即席分析能夠處理的數據規模,并且能夠實現復雜的邏輯,獲得較好的易用性。

需要說明的是,我對這塊現階段的理解也還是比較淺。問題肯定有不少,歡迎指正。

Aggregations的基礎

Lucene 有三個比較核心的概念:

  1. 倒排索引
  2. fieldData/docValue
  3. Collector

倒排索引不用我講了,就是term -> doclist的映射。

fieldData/docValue 你可以簡單理解為列式存儲,索引文件的所有文檔的某個字段會被單獨存儲起來。 對于這塊,Lucene 經歷了兩階段的發展。第一階段是fieldData ,查詢時從倒排索引反向構成doc-term。這里面有兩個問題:

  • 數據需要全部加載到內存
  • 第一次構建會很慢

這兩個問題其實會衍生出很多問題:最嚴重的自然是內存問題。所以lucene后面搞了DocValue,在構建索引的時候就生成這個文件。DocValue可以充分利用操作系統的緩存功能,如果操作系統cache住了,則速度和內存訪問是一樣的。

另外就是Collector的概念,ES的各個Aggregator 實現都是基于Collector做的。我覺得你可以簡單的理解為一個迭代器就好,所有的候選集都會調用 Collector.collect(doc) 方法,這里collect == iterate 可能會更容易理解些。

ES 能把聚合做快,得益于這兩個數據結構,一個迭代器。我們大部分聚合功能,其實都是在fieldData/docValue 上工作的。

Aggregations 分類

Aggregations種類分為:

  1. Metrics
  2. Bucket

Metrics 是簡單的對過濾出來的數據集進行avg,max等操作,是一個單一的數值。

Bucket 你則可以理解為將過濾出來的數據集按條件分成多個小數據集,然后Metrics會分別作用在這些小數據集上。

對于最后聚合出來的結果,其實我們還希望能進一步做處理,所以有了Pipline Aggregations,其實就是組合一堆的Aggregations 對已經聚合出來的結果再做處理。

Aggregations 類設計

下面是一個聚合的例子:

{
    "aggregations": {
        "user": {
            "terms": {
                "field": "user",
                "size": 10,
                "order": {
                    "_count": "desc"
                }
            }
        }
    }
}

其語義類似這個sql 語句: select count(*) as user_count group by user order by user_count desc 。

對于Aggregations 的解析,基本是順著下面的路徑分析:

TermsParser ->  
        TermsAggregatorFactory -> 
                  GlobalOrdinalsStringTermsAggregator

在實際的一次query里,要做如下幾個階段:

  1. Query Phase 此時 會調用GlobalOrdinalsStringTermsAggregator的Collector 根據user 的不同進行計數。

  2. RescorePhase

  3. SuggestPhase

  4. AggregationPhase 在該階段會會執行實際的aggregation build, aggregator.buildAggregation(0) ,也就是一個特定Shard(分片)的聚合結果

  5. MergePhase。這一步是由接受到請求的ES來完成,具體負責執行Merge(Reduce)操作 SearchPhaseController.merge 。這一步因為會從不同的分片拿到數據再做Reduce,也是一個內存消耗點。所以很多人會專門搞出幾臺ES來做這個工作,其實就是ES的client模式,不存數據,只做接口響應。

在這里我們我們可以抽取出幾個比較核心的概念:

  1. AggregatorFactory (生成對應的Aggregator)
  2. Aggregation (聚合的結果輸出)
  3. Aggregator (聚合邏輯實現)

另外值得注意的,PipeLine Aggregator 我前面提到了,其實是對已經生成的Aggregations重新做加工,這個工作是只能單機完成的,會放在請求的接收端執行。

Aggregation Bucket的實現

前面的例子提到,在Query 階段,其實就會調用Aggregator 的collect 方法,對所有符合查詢條件的文檔集都會計算一遍,這里我們涉及到幾個對象:

  1. doc id
  2. field (docValue)
  3. IntArray 對象

collect 過程中會得到 doc id,然后拿著docId 到 docValue里去拿到field的值(一般而言字符串也會被編碼成Int類型的),然后放到IntArray 進行計數。如果多個doc id 在某filed里的字段是相同的,則會遞增計數。這樣就實現了group by 的功能了。

Spark-SQL 和 ES 的組合

我之前一直在想這個問題,后面看了下es-hadoop的文檔,發現自己有些思路和現在es-hadoop的實現不謀而合。主要有幾點:

  1. Spark-SQL 的 where 語句全部(或者部分)下沉到 ES里進行執行,依賴于倒排索引,DocValues,以及分片,并行化執行,ES能夠獲得比Spark-SQL更優秀的響應時間
  2. 其他部分包括分片數據Merge(Reduce操作,Spark 可以獲得更好的性能和分布式能力),更復雜的業務邏輯都交給Spark-SQL (此時數據規模已經小非常多了),并且可以做各種自定義擴展,通過udf等函數
  3. ES 無需實現Merge操作,可以減輕內存負擔,提升并行Merge的效率(并且現階段似乎ES的Reduce是只能在單個實例里完成)
  4. </ol> </div>

 本文由用戶 JasFranklyn 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!