基于Solr的淘寶商家交易數據實時查詢方法

JasminUDKU 8年前發布 | 42K 次閱讀 Solr 搜索引擎

來自: https://yq.aliyun.com/articles/4237

前言

DT時代對平臺或商家來說最有價值的就是數據了,在大數據時代數據呈現出數據量大,數據的維度多的特點,用戶會使用多維度隨意組合條件快速召回數據。數據處理業務場景需要實時性,需要能夠快速精準的獲得到需要的數據。之前的通過數據庫的方式來處理數據的方式,由于數據庫的某些固有特性已經很難滿足大數據時代對數據處理的需求。

`

所以,在大數據時代使用hadoop,hive,spark,作為處理離線大數據的補充手段已經大行其道。以上提到的這些數據處理手段,只能離線數據處理方式,無法實現實時性。Solr作為補充,能夠很好地解決大數據的多維度查詢和數據召回實時性要求。

`

本文通過分析阿里淘寶聚石塔環境中遇到的一個具體需求是如何實現的,通過這個例子,拋磚引玉來體現SORL在數據處理上的優勢。

需求說明

阿里聚石塔是銜接淘寶大賣家,軟件開發者和平臺提供者這三者的生態圈,阿里通過聚石塔平臺,將阿里云底層的PAAS,IAAS環境提供給第三方開發者,而第三方開發者可以通過自己開發的軟件產品,比如ERP,CRM系統販賣給淘寶上的大賣家,提高大賣家的工作效率。

`

賣家的交易數據是最有價值的數據,通過交易數據可以衍生出很多產品,例如管理交易的ERP軟件,會員營銷工具CRM,在聚石塔環境中通過大賣家授權,這部分數據可以授權給獨立軟件開發者ISV。

`

在CRM系統中需要能夠通過設置買家的行為屬性快速過濾出有價值的買家記錄,進行精準會員營銷。

以下是兩個具體需求,首先看兩個線框圖:

以上是賣家需要實時篩選一段時間內購買數量在一個區間之內的買家。

再看一個線框圖:

賣家需要實時搜索一個時間段內,消費金額在某個區間之內的買家會員。這里的區間是以天為單位的,時間跨度可長可短。

`

了解了線框圖之后,我們還要再看看對應的數據庫ER圖:

`

表結構相當簡單,只有兩張表,稍微有點經驗的開發工程師就會寫出以下SQL:

select  buyer.buyer_id , count(trade.trade_id) as pay_count
From buyer 
inner join trade on(
buyer.buyer_id = trade.buyer_id and buyer.seller_id = trade.seller_id)
where trade.trade_time> ? and trade.trade_time < ? and buyer.seller_id=?
group by buyer.buyer_id
having pay_count > =1 AND pay_count <=5

第二個線框圖會用以下SQL語句來實現:

select  buyer.buyer_id , sum(trade.fee) as pay_sum
From buyer 
inner join trade on(
buyer.buyer_id = trade.buyer_id and buyer.seller_id = trade.seller_id)
where trade.trade_time> ? AND trade.trade_time < ? and buyer.seller_id=?
group by buyer.buyer_id
having pay_sum > =20 and pay_sum <=100

以上,兩個SQL語句大同小異,having部分稍有不同, SQL語句并不算復雜,但是在大數據情況下,無法在毫秒級反饋給用戶。另外,假如where部分有其他查詢條件,比如,買家的性別,買家所屬的地區等,就需要數據庫上設置更多的聯合索引,所以這個需求使用SQL語句根本無法實現的。

查詢加速

問題已經明確,那么解決的辦法是什么呢?是使用數據的存儲過程?存儲過程底層還是依賴數據庫表的固有特性,無非是提供一些以時間換空間的策略來實現罷了,換湯不換藥,而且各個數據庫產品的存儲過程實現很很大差別,一旦選擇了某一個數據的存儲過程之后以后再要遷移數據到其他數據平臺上就非常困難了。

`

這里要向大家隆重介紹搜索引擎Solr。因為,搜索引擎在底層使用倒排索引,這和數據庫有本質區別,倒排索引在數據查詢的性能上天生就比數據的Btree樹好上百倍,具體原因不在這里展開了。雖然某些數據庫也支持了倒排索引例如PG,但畢竟不是通用的解決辦法。一旦添加了這類型的索引會影響數據的寫入吞吐量,因為重建索引非常耗時間。

`

開源JAVA社區中使用最廣泛的應該屬Solr了,筆者所在的團隊就是長期研究將Solr應用到企業級應用場景中,在原生Solr之上做了很多優化和適配,方便企業級用戶使用。

`

言歸正傳,先講講大致思路,實現的架構圖如下:

全量數據準備

這里要說明的一點,發送到搜索引擎中的數據是一條寬表數據,所謂寬表數據是將ER關系為1對N的實體,聚合成一條記錄。聚合方式有兩種,一種是向1的維度聚合,比如用戶實體和消費記錄實體,寬表記錄如果是以用戶維度來聚合和話,就會將所有的消費記錄以某個特殊字符作為分割符,聚合成一個字段,作為用戶記錄的一個冗余字段。也可以以消費記錄為維度聚合,將關聯的用戶信息作為一個冗余字段,可想而知這樣的聚合方式用戶數據在索引數據中會有很多重復。

`

打寬表這個環節看似和搜索不怎么相關,但是合理的寬表數據結構能大幅度地提高用戶數據查詢效率。

`

全量流程用Hive來實現的,如果是在阿里云公有云環境中可以用ODPS,因ODPS是PAAS服務。

`

增量通道,需要寫一個打寬表操作。因為搜索引擎特有的結構,增量同步更新持續一段時間之后會生成很多索引碎片,所以必須要隔一段時間從數據源重新導出并構建一次全量索引數據。

`

這里介紹一下上面提交到用戶-消費記錄的寬表結構(簡單起見,去掉了表中和問題域不相關的字段):

Buyer表:

買家id 賣家id
Buyer_id Seller_id

Trade表:

買家id 賣家id 交易id 交易時間 單筆費用
Buyer_id Seller_id trade_id trade_time Fee

聚合寬表結構:

買家id 賣家id dynamic_info(聚合字段)
Buyer_id Seller_id sellerId_date_buyerId_payment_payCount[;sellerId_date_buyerId_payment_payCount]
`

這里需要對dynamic_info 聚合字段詳細說明一下:

`

sellerId_date_buyerId_payment_payCount 這是一個聚合單元,從左向右依次的含義是:賣家ID,購買的日期(精確到天),買家ID,購買天之內的費用總和,購買天之內的購買次數總和。

`

Dynamic_info字段可以有多個聚合單元組成,每個單元中的date是按天去重的,假如一個用戶在某一天在一家店中有多條購買記錄最終也會聚合成一個單元。給一個聚合字段的實際示例:

`

Dynamic_info:9999_20151111_222_345.6_3;9999_20151212_222_627.5_1
`

這個字段的意義就是,一個id為222的用戶在2015年雙11當天購買了3筆價值345元的商品,在雙12當天在這個商家處又購買了一筆價值627.5元的商品。

`

之所以在Solr上進行快速數據查詢的原因是,Solr的數據源是一個已經聚合好的一份數據,數據庫上執行的join操作會耗費大量IO,在Solr查詢省去了這部分時間。

`

寬表數據從多個分表聚合,數據的語義沒有變化,只是組織形式發生了變化,如果一個SAAS的服務提供上同時為十幾萬個大賣家提供篩選服務,而每個大賣家又積累的交易數據是非常大的,全部加在一起,要將數據進行聚合化操作,有非常大的CPU和IO開銷,好在在云服務時代有強大的離線計算工具如hadoop,ODPS可以將大數據如同肉面粉一般揉(處理)成任何你想要的結構,分分鐘不在話下。

Solr引擎端數據處理

準備好全量源數據,之后就是將其轉化為Lucene的索引文件了,這個過程請查閱Solr Wiki便可,這里不進行闡述。這里要重點描述的是Solr服務端如何響應用戶的查詢請求,返回給用戶需要的查詢結果。

`

處理用戶在時間段內購買量或購買額度進行過濾,需要構建一個QParser的插件,這個插件的作用是遍歷和查參數中匹配的條件項生成命中的DocSet命中結果集。

Qparser代碼實現

下面是QparserPlugin.java節選:

for (LeafReaderContext leaf : readerContext.leaves()) {
                docBase = leaf.docBase;
                reader = leaf.reader();
                liveDocs = reader.getLiveDocs();
                terms = reader.terms("dynamicinfo");
                termEnum = terms.iterator();
                String prefixStart = sellerId + "" + startTime;
                String prefixEnd = sellerId + "_" + endTime;
                String termStr = null;
                int docid = -1;
                if ((termEnum.seekCeil(new BytesRef(prefixStart))) != SeekStatus.END) {

                do {
                    Matcher matcher = DYNAMIC_INFO
                            .matcher(termStr = termEnum.term()
                                    .utf8ToString());

                    if (!matcher.matches()) {
                        continue;
                    }

                    posting = termEnum.postings(posting);
                    docid = posting.nextDoc();

if (!(docid != PostingsEnum.NO_MORE_DOCS && (liveDocs == null || (liveDocs != null && liveDocs.get(docid))))) { continue; }

                    if ((matcher.group(1) + "_" + matcher.group(2))
                            .compareTo(prefixEnd) > 0) {
                        break;
                    }

                    addStatis(buyerStatis, docBase, docid, matcher);

                } while (termEnum.next() != null);
            }

        }</code></pre><code data-language="java"> 

以上代碼的執行邏輯是,截取prefixStart和prefixEnd之間的term序列,進行分析如果符合過濾條件就將對應docid插入buyerStatis收集器中。

`

等第一輪數據處理過程中就在對聚合結果進行增量累加,代碼如下:

private static StaticReduce addStatis(
            Map<Integer, StaticReduce> buyerStatis, int docBase, int docid,
            Matcher matcher) {
        StaticReduce statis = buyerStatis.get(docBase + docid);
        if (statis == null) {
            statis = new StaticReduce(docBase + docid, Long.parseLong(matcher
                    .group(3))/ buyerid /);
            buyerStatis.put(docBase + docid, statis);
        }

    if (statis.buyerId != Long.parseLong(matcher.group(3))) {
        return statis;
    }

    try {
        statis.addPayCount(Integer.parseInt(matcher.group(5)));
    } catch (Exception e) {

    }
    try {
        statis.addPayment(Float.parseFloat(matcher.group(4)));
    } catch (Exception e) {

    }
    return statis;
}</code></pre><code data-language="java"> 

同時對購買數量,和購買金額進行累加。

`

最后對累加結果進行過濾,符合過濾條件的,將docid插入到bitset中:

for (StaticReduce statis : buyerStatis.values()) {
                // TODO 這里自己判斷是否要收集這條記錄
        if (statis.payCount > Integer.MAX_VALUE
                || statis.paymentSum > 1) {
            System.out.println("count:" + statis.payCount + ",sum:"

                    + statis.paymentSum);
                bitSet.set(statis.luceneDocId);
    }

} BitDocIdSet docIdSet = new BitDocIdSet(bitSet); DocIdSetIterator it = docIdSet.iterator(); final BitQuery bitquery = new BitQuery(it); return new QParser(qstr, localParams, params, req) { @Override public Query parse() throws SyntaxError {

                return bitquery;
            }
        };</code></pre><code data-language="java"> 

最后將bitSet包裝成BitQuery作為Qparser的parse函數的返回值,返回有solr進一步和其他結果集進行過濾。

`

Solrconfig配置實現

需要將以上的QparserPlugin插件注入到solr中,需要在solrconfig中寫以下配置:

  <queryParser name="timesegstats" class="com.xxx.qp.TimeSegStatsQParserPlugin" >                                  
      <str name="buyerField">buyer_id</str>                                                                                                              
      <str name="compoundField">dynamic_info </str>                                                                          
      <str name="countField">emailSendCount</str>                                                                                              
<str name="statsFields"></str>                                                                                                                  
  </queryParser> 

Solr查詢語句Q參數設置:

q={!multiqp q.op=AND}seller_id:1441097932588 
AND {!timesegstats sellerId=1441097932588 statsField=buyActivity startTime=20150901 endTime=20150924 startValue=1 endValue=200}
AND {!timesegstats sellerId=1441097932588 statsField=paycount startTime=20140901 endTime=20150924 startValue=2 endValue=100}  

總結

以上是一個用Solr搜索引擎解決數據庫查詢瓶頸的實例,其實搜索引擎的使用場景非常廣泛,不僅可以用在像百度這樣的大規模非結構化的數據查詢,可以定制比較復雜的排序規則。Solr更可以解決像本文講到的數據庫加速的場景,使得原本在數據庫上沒有無法實現的SQL查詢,可以通過Solr搜索引擎上輕松實現。

`

本文講到的需求,也可以使用像hive這樣的離線處理工具來實現,每次處理完成后將結果再導入到mysql中,業務端通過讀取數據庫表中的數據來向用戶展示處理結果。這樣做雖然可行,但是,沒有辦法將處理結果的實時性沒有辦法保證,而且,離線處理結果的數據結構是固化的,沒有辦法做到將處理結果靈活調整。而用Solr做到數據的查詢出口,可以很好地解決以上兩個問題。

`
</code></code></code></code></code></code></code></code></div>

 本文由用戶 JasminUDKU 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!