大數據-數據采集和集成
最近在對已有的大數據采集和數據集成工具進行梳理,并考慮進行相關的產品整合工作,經過對已有的產品的測試和驗證,已經實際需要的業務場景,初步考慮清楚后續需要進行新增和完善部分的內容。
數據庫實時同步和復制
對于數據庫實時同步和復制一定會談到的兩款商用產品就是Oracle GoldenGate和Quest SharePlex,具體的介紹網上也比較多,其核心特點就是支持異構數據庫之間的實時數據同步和負責,而且對源數據庫本身侵入性很小。兩個商用產品基本都是對各種數據庫的Log日志文件進行分析,然后進行復制。
那對于這塊如果要自研來實現有無可能,對于Mysql來說由于采用Binlog日志方式,類似淘寶的Otter已經可以完整的實現數據庫的實體同步復制。如果單純是Oracle-Oracle數據庫之間,我們也可以采用Oracle DataGuard或者Oracle Stream流復制技術進行復制,還有就是基于Oracle LogMiner進行redo log日志分析后進行兩個數據庫之間的同步。因此關鍵問題還是在異構數據庫之間的同步復制上。
對于數據庫復制,Oracle當前常用的解決方案主要有:
oracle日志文件,比如LogMiner,OGG,SharePlex
oracle CDC(Change Data Capture)
oracle trigger機制,比如DataBus , SymmetricDS
oracle 物化視圖(materialized view)比如淘寶的yugong開源
在這些解決方案里面可以看到有開源的SymmetricDS解決方案,但是是基于觸發器機制,侵入性還是相對較大。也有淘寶的yugong可以實現Oracle->mysql的全量或增量復制,但是基于增量物化視圖方式,本身會影響到源庫數據表的CUD操作。
而實際上最佳的解決方案仍然是基于log日志的實時同步復制,其核心思路包括三個步驟
1. 在源庫設置為記錄日志或歸檔模式,源庫首先能夠記錄下日志信息。
2. 實時的能夠讀取到日志信息,并對日志信息進行解析或適當轉換映射,包括和目標庫的適配。
3. 在目標數據庫直接運行相應解析后的日志SQL語句,實現同步更新。
由于Mysql本身提供可讀性很強的Binlog日志,因此可以看到Mysql->Mysql,Mysql->Oracle的實時同步日志問題是可以得到很好解決的。而對于Oracle->Oracle也可以解決,較難的就是Oracle->Mysql或其它異構數據庫,這里需要分析Oracle本身的redo log日志(當前Oracle提供有logminer工具),如果我們自己寫一個解析包的話就需要對Oracle redo log結構有完整的了解。
而結合Oracle 流復制技術,我們可以考慮Oracle首先將變更信息寫入到自己的AQ,然后我們從AQ訂閱消息后直接處理或者寫入到我們自己的消息隊列或流處理軟件,然后在流處理軟件中完成相關的映射轉換后寫入到目標異構數據庫中。
數據庫采集同步
對于數據庫采集同步,當前談到比較多的工具主要有Sqoop和結構化數據庫間的ETL工具,當然當前對于開源的Kettle和Talend本身也集成了大數據集成內容,可以實現和hdfs,hbase和主流Nosq數據庫之間的數據同步和集成。而淘寶的DataX則主要可以實現常見主流的結構化數據庫(Oracle, Mysql,SqlServer)和hdfs之間的數據集成和同步。
對于Sqoop和DataX等當前也支持基于Key關鍵字段或時間戳的數據增量導入。即我們可以將導入命令行語句或Shell腳本通過任務或調度管理平臺(類似開源的Chronos)配置為定時的調度作業,來實現對數據庫的定時增量采集。
而對于常規的數據庫包括大數據存儲之間的采集和集成,再充分考慮性能的情況下,核心思路為:
1. 將源數據庫數據進行導出,使用Sql或DB原生的導出命令直接導出為txt文件,字段以分隔符進行分隔。
1.1 可以部署多個代理端,對數據庫數據啟用多個線程進行導出
1.2 支持基于key值或時間戳的增量數據導出
2. 對導出的數據進行壓縮后進行傳輸(特別是在源和目標庫不在同一個數據中心時)
3. 在目標庫端基于數據庫原生的load命令對數據進行bulk批量導入。
在整個實現里面有兩個核心,一個就是可以啟用多個代理端和多線程機制并行導出數據庫,其次就是導出數據壓縮傳輸,然后在通過load data原生命令進行數據庫的bulk批量裝載提升性能。
如果基于以上思路我們可以看到數據采集的重點還是在性能上面,不會去實現ETL工具本身復雜的數據映射和轉化,數據聚合等操作。核心只是做異構數據庫和Hdfs文件存儲之間的數據搬移。而我們完全自己研發的DataPipe產品基本參考上述思路實現,其測試性能對于結構化數據庫之間采集和集成是Sqoop或DataX的2-3倍左右,而對于hdfs之間的集成則在5-10倍左右的性能提升。
對于這種采集存在的約束就是不要去處理數據變更的問題,僅僅是做數據的全量同步或者是數據庫表數據的簡單Append處理,否則性能本身會下降很多。如果有大量數據更新需要同步,最好的方式還是首先Truncate掉目標數據庫表,然后再進行全量同步。簡單驗證對于Mysql數據庫間100萬數據,180M左右數據量的全量同步整體同步時間在14秒左右即全部完成。
文件采集
對于文件的采集大家談的比較多的還是flume進行實時的文件采集和處理,當然對于ELK(Elasticsearch、Logstash、Kibana三者的組合)雖然是處理日志,但是也有基于模板配置的完整增量實時文件采集實現。如果是僅僅是做日志的采集和分析,那么用ELK解決方案就完全夠用的。
我們談的文件采集還是是采集文件后進行預處理,然后將采集的文件導入到hdfs庫進行存儲。對于這種方式即需要實現flume和hdfs的集成。同時我們看到如果對采集的數據要進行實時的處理和分析,則還需要結合kafka和storm來共同完成。這是一個完整的組合,網上也有完整的例子可以參考。
對于文件采集,其核心的實現思路可以概括為如下幾個步驟來完成:
1. 實現對服務器源目錄的實時監聽,同時對文件流實時采集。
2. 實現對采集的文件流進行預處理,如通過flume 定制sink或其它相應插件。
3. 對采集的文件流輸出到txt文件再導入到hdfs或者直接通過hdfs api接口增量導入hdfs文件系統。
4. 對于流處理模式則需要將flume接到kafa+storm上,實現流處理,storm處理結果可以存到redis庫。