京東彈性數據庫之BinLake訂閱服務
京東彈性數據庫
京東彈性數據庫是京東商城十年數據庫生產經驗總結與升華,兼容MySQL協議,適合海量數據的事務處理、分析計算、動態擴展、靈活復制協議、自動備份恢復、自動歷史結轉、日志訂閱、可全面容器化部署的分布式數據庫產品系列。
BinLake訂閱服務
一站式MySQL BinLog日志實時采集、統一分發、消費訂閱和監控服務。是京東彈性數據庫產品系列中首先推出的產品。
BinLake訂閱服務平臺具有以下優勢:
BinLake架構設計
BinLake整體架構設計如下:
說明如下:
1 BinLake總共包括三大服務組件
1.1 Wave服務
Wave服務完成實際的數據庫Binary Log的持續采集、管理和分發寫入到下游的消息發布和訂閱系統中。在BinLake集群中會存在N個Wave服務,這些Wave服務共同組成一個無狀態集群。
1.2 Tower服務
Tower服務是整個BinLake的管理中心,提供BinLake接入服務的申請、完成Wave服務、數據源、接入應用的管理。當用戶申請接入到BinLake中時,會登錄到Tower服務提供的申請界面,填寫申請接入BinLake的應用信息、數據源信息和Topic信息,Tower服務會按照用戶提供的信息做如下判斷,并完成用戶接入申請,接入流程如下:
如果不同申請者申請相同數據源的數據采集,由Tower管理端依據其申請的采集規則(如指定表,指定庫),如果規則相同,默認復用相同規則的Topic,也可強制生成新的Topic進行訂閱。
1.3 Judge服務
Judge服務主要完成兩個功能:Wave節點監控信息采集和loadBalance決策。
-
Wave節點監控信息采集:
通過在各個Wave服務節點部署agent采集各個Wave服務節點上的監控信息,包括:服務器的內存使用、系統負載、CPU負載、網絡負載、JVM的堆內存使用、GC信息、每個Wave服務中的instance個數等,采集到的所有這些信息都會在后續的loadBalance中作為基礎metics,參與到最終的loadBalance決策中。
-
loadBalance決策:
新應用接入到BinLake時,若需要采集的數據源在BinLake現有的數據源池中不存在,則需要針對于新的數據源在相應的Wave服務上創建對應的instance(數據源與instance是1對1的關系)。那么在創建instance的時候,就需要選在在哪個Wave服務上創建。這時就會請求Judge服務提供的loadBalance決策接口,若Judge服務中沒有配置loadBalance plugin,則會返回一個隨機的Wave服務節點的IP,那么就會在該隨機的Wave服務上創建instance;若配置了loadBalance的plugin,則從Judge服務提供的loadBalance決策接口獲得建議Wave服務節點,并從該節點創建新的instance。
2 BinLake 依賴于兩大外部服務
2.1 ZooKeeper
BinLake使用zookeeper服務進行Wave無狀態集群的管理、狀態同步和消息通知等,包括:
【1】Instance的自動化創建與初始化
【2】Instance的HA
【3】數據源offset實時追蹤
【4】binlog分發失敗重試
【5】數據源切換自適配
【6】Tower元數據管理
【7】instance消息通知
2.2 消息發布與訂閱系統
目前BinLake可以無縫集成JMQ和Kafka,從而進行消息的發布和訂閱管理。instance采集到的BinLog Event會發布到JMQ或者Kafka的Topic中,實際的業務應用只需要訂閱和消費對應的topic,既可以實時的獲得BinLog Event,并在后續的業務邏輯中對獲得的Binlog Event進行處理即可。
BinLake部署拓撲
在BinLake服務實際部署時,其拓撲結構如下:
對上述部署拓撲圖說明如下:
(1)一臺Tower服務器:用于用戶元數據、過濾規則、應用和訂閱信息管理
(2)2N+1臺ZooKeeper服務器:用于構建一個zookeeper集群,從而進行Wave集群管理和消息通知等
(3)一臺Judge服務器:用戶采集負載信息,并提供負載均衡建議決策。其中負載信息的采集是通過部署在各個Wave服務器上的Judge-Agent進程定期推送給Judge服務的
(4)N臺Wave服務器:構成Wave集群。每臺Wave服務器上部署兩種服務:
-
Wave服務:用于數據庫binary log的采集并分發給下游MQ集群(Kafka或者JMQ)
-
Judge-Agent服務:用于定期采集Wave服務器的系統以及Wave服務的負載和監控信息,并調用Judge服務提供的Restful接口,推送給Judge服務
(5)N臺已經存在的線上MySQL服務器:不屬于BinLake提供的服務器,是使用的已經存在的MySQL服務器,作為BinLake的數據源
(6)N臺已經存在的MQ服務器:不屬于BinLake提供的服務器,是已經存在的MQ服務器,處于Wave服務的下游,Wave服務會將采集到的Bianry Log Events分發給MQ集群中的Topic
BinLake實現原理
1 Wave服務實現原理
Instance的高可用
BinLake采用ZooKeeper來實現Instance的高可用。每個wave服務在ZooKeeper上都會對應有一個節點,整體流程如下:
1、一個新的topic產生后,Tower將會創建一個新的topic節點。
2、所有的wave服務都會在新的topic節點下創建對應的節點,ZooKeeper將會根據loadBalance決策從這些節點中選取一個作為主,然后在主對應的wave上將創建instance為topic提供服務。
3、當提供服務的主節點對應的wave宕機或不可用時,該wave與節點的連接將被斷開,然后該節點會被從可用的節點信息中移除,移除后ZooKeeper集群會選出一個新的可用的節點成為主,其對應的wave會創建instance為該topic提供服務,從而實現高可用。
高效采集BinaryLog
(1) 異步NIO網絡服務器
采用java nio高性能網絡模型,盡可能提高資源利用率,增大服務吞吐量,并發量
整個工作過程如下:
1.與任何MySQL實例(數據源MySQL)都會創建一個socket長鏈接,而每個socket長鏈接都會由Wave服務進程中的一個監聽線程,這里暫稱為:WatchTread(Wave服務中有一個與CPU個數相符的線程池,該線程就從該線程池中取出)進行持續監聽
2.任何一個與MySQL實例相連的長鏈接產生EventLog,WatchTread都會得到通知(異步notify機制),然后從將該EventLog放入隊列中。
3.線程池中的Worker Thread從隊列中取出EventLog進行處理(將EventLog格式化為protobuffer格式,然后將格式化之后的消息放入Wave服務端的buffer)。
4.buffer size打滿之后,批量發往MQ的消息隊列中。
各個線程之間工作互不影響,不會因為轉換導致socket讀事件阻塞,整個設計采用生產消費模型。
分發BinLog Event到Topic
(1) 批量發送到每個topic
binlog event采用批量發送的方式發往消息隊列MQ,消除發送端性能瓶頸
流程如下:
1.獲取從轉換器converter獲取到一條消息,放入buffer
2.重復步驟1,直到buffer的size達到指定閾值,然后執行步驟3
3.將buffer中的消息批量發往MQ消息隊列,并重復步驟1,2
批量發送過程采用異步notify機制,不會導致循環cpu彪高發生。
2 Tower服務實現原理
instance復用及日志過濾
說明如下:
(1)Wave中的每個instance與實際的數據源MySQL實例一一對應,針對于同一個MySQL實例,無論多少個業務方采集,都會只有一個instance進行采集
(2)每個業務方都可以針對于一個MySQL實例中的數據設置filter,從而只過濾出自己感興趣的EventLog,也可以查閱某個instance對應的所有的過濾規則,并復用過濾規則
(3)Wave中的instance對采集到的Eventlog,使用設置的過濾規則依次進行過濾,然后發送到指定的一個或者多個Topic中。
Judge服務實現原理
從圖可以看到整個監控和負載主要由Agent、Alarm和Judge幾個模塊構成,它們作用的對象都是Wave服務, 此外還依賴JMQ、ES、Jimdb等中間件。
整個工作流程可以歸納為:
1.Agent定時采集JVM、網絡、CPU、wave服務TPS等信息,封裝成Metric發到JMQ
2.Alarm服務監聽JMQ,取到Metric后匹配報警策略,匹配成功則實時向用戶發告警信息
3.Judge服務監聽JMQ,取到Metric后:
3.1.先將原始數據存入ES
3.2. 根據Metric當前一個小時內的負載進行增量分析,分析結果放入緩存
3.3.Judge服務定期從ES獲取一個小時前Metric歷史數據分析,分析結果存入緩存
4.Wave服務調用Judge服務的獲取負載均衡接口,Judge服務取得3.2和3.3中的分析結果,通過指定的負載算法 計算得到獲取負載結果,返回給Wave。
其中,Agent中采集器模塊和Judge中的負載均衡等模塊都設計成了插件模式,可以靈活的設置或切換。
BinLake特性
(1)保證Binary Log Event的 At-Least-Once Delivery
(2)集群化服務:提供無狀態集群服務,可以通過硬件的橫向擴展提高性能
(3)高可用:在無人工參與的情況下,對宕機的服務節點自動恢復
(4)負載自適應:整個集群可以根據每個服務節點的負載情況,自動調整各個集群的工作負荷,從而使集群中的每個節點的負載相當
(5)動態擴容:借助于彈性集群,當集群整體負載升高時,自動創建Wave 容器,并自動加入到集群
BinLake使用限制
(1)目前版本只能采集和訂閱MySQL的Binary Log
(2)MySQL的Binary Log的Format只能是Row Base的
(3)MySQL中的表必須要有主鍵
(4)目前的過濾規則最細粒度支持到表級別
運維支持
京東彈性數據庫研發團隊為業務部門提供如下服務:
來自:http://mp.weixin.qq.com/s?__biz=MzI1NzQyOTM3Ng==&mid=2247484319&idx=1&sn=781a06627a31cdb55cbef344c7888f97&chksm=ea16d970dd6150661487ddfecc53640395e5faabe6341f2dcb44d3c0081e3a7c6c40c8dc3370&mpshare=1&scene=1&srcid=0302CV8KFh6W89hjfZoyY2ik#rd