通過HBase Observer同步數據到ElasticSearch
原文 http://guoze.me/2015/04/23/hbase-observer-sync-elasticsearch/
Observer希望解決的問題
眾所周知,HBase是一個分布式的存儲體系,數據按照RowKey分成不同的Region,再分配給RegionServer管理。但是 RegionServer只承擔了存儲的功能,如果Region能擁有一部分的計算能力,從而實現一個HBase框架上的MapReduce,那 HBase的操作性能將進一步提升。正是為了解決這一問題,HBase 0.92版本后推出了Coprocessor — 協處理器,一個工作在Master/RegionServer中的框架,能運行用戶的代碼,從而靈活地完成分布式數據處理的任務。
Coprocessor包含兩個組件,一個是EndPoint(類似關系型數據庫的存儲過程),用以加快特定查詢的響應,另一個就是 Observer(類似關系型數據庫的觸發器)。Observer也分為幾個類型,其中RegionObserver提供了一組表數據操作的鉤子函數,覆蓋了Get、Put、Scan、Delete等操作(通常有pre和post兩種情況,表示在操作發生之前或發生之后),我們可以通過重載這些鉤子函數,利用RegionServer實現特定的數據處理需求。
應用場景
我們在同一批主機上同時建立了一個HBase集群和一個ElasticSearch集群,然后存儲到HBase的數據必須實時地同步到ElasticSearch。而恰好HBase和ElasticSearch都沒有更新的概念,我們的需求可以簡化為兩步:
- 當一個新的Put操作產生時,將Put數據轉化為json,索引到ElasticSearch,并把RowKey作為新文檔的ID
- 當一個新的Delete操作產生時,獲取Delete數據的RowKey,刪除ElasticSearch中對應的ID
Java實現
Observer的Java實現并不復雜,只需要繼承BaseRegionObserver的基類,然后重載postPut和postDelete兩個函數。考慮到未來HBase的寫入比較頻繁,我們利用ElasticSearch的 Bulk API 做了一層緩沖,不是每次提交HBase數據都觸發索引操作,而是積累到一定數量或者到達一定時間間隔才去批量操作,從而降低了RegionServer的網絡I/O壓力。
完整項目請參見: HBaseObserver
Observer的部署
Observer提供了兩種部署方式:
- 全局部署。把jar包的路徑加入HBASE_CLASSPATH并且修改hbase-site.xml,這樣Observer會對每一個表都生效。
- 單表部署。通過HBase Shell修改表結構,加入coprocessor信息。
顯然后一種更加靈活。通過HBase Shell安裝Observer的詳細步驟如下:
- 把Java項目打包為jar包,上傳到HDFS的特定路徑
- 進入HBase Shell,disable你希望加載的表
- 通過以下指令激活Observer:
alter 'table_name' , METHOD => ' table_att ', ' coprocessor ' => ' hdfs : ///your/jar/path/on/hdfs|com.foo.bar|1001|arg1=1,arg2=2'
coprocessor對應的格式以|分隔,依次為:
- jar包的HDFS路徑
- Observer的主類
- 優先級(一般不用改)
- 參數(一般不用改)
新安裝的coprocessor會自動生成名稱:coprocessor + $ + 序號(通過describe table_name可查看)
因為一張表可能擁有多個coprocessor,卸載coprocessor需要輸入對應的coprocessor名稱,比如:
alter 'table_name' , METHOD => ' table_att_unset ', NAME => ' coprocessor $1'
需要注意的是,HBase Observer的部署有一個大坑:
修改Java代碼后,上傳到HDFS的jar包文件必須和之前不一樣,否則就算卸載掉原有的coprocessor再重新安裝也不能生效