基于Go實現的分布式MQ

jopen 9年前發布 | 31K 次閱讀 消息系統 mq

講師:趙超(Beta版廚子3.0)

個人簡介:

6年的Java開發經驗、先后就職于淘寶Java中間件團隊、騰訊無線媒體產品部。現就職于陌陌擔任基礎業務組主管。專注于分布式消息總線、LBS技術領域、golang在大規模生產換環境應用的探索。

今天交流的內容也是我上半年主要做的一個開源的MQ的項目,希望對大家有幫助

一、RPC與MQ之間對比

基于Go實現的分布式MQ

我們通常接觸到的RPC同步調用的種類非常多比如fb 的thrift/阿里的dobbo

騰訊的taf、淘寶的hsf這類同步調用框架

從圖里面可以看到作為一個業務完成后端要發生非常多的RPC通信

隨著業務的復雜度提高,各服務間的依賴度也逐步加大,那么服務間的響應時間也就各有參差了

在一個請求鏈路上如果存在一個慢的服務那么可能會引起雪崩的效用,短板非常明顯

最重要的時在一些要求一致性高的場景下,對錯誤的處理也是非常重要的。所以個服務也都要去做容錯處理的代碼保證邏輯和數據一致

基于Go實現的分布式MQ

A、B、C。。。服務之間通過共同的消息協議進行通信,數據一致性問題完全交給MQ去處理即可

A、B、C服務的同步響應效率得到提升

總結:

基于Go實現的分布式MQ

所以在我理解的消息中間件就是以消息作為信息載體實現系統間的可靠異步的調用,減少系統間耦合的中間層框架

二、消息傳輸模型

基于Go實現的分布式MQ

隊列模式或者也可以叫點對點

很明顯看到這個圖很多人就會聯想到redis的list結構

rpush—>lpop,沒錯

通過輪訓去完成消息的送達

但是對于點對點模型來說存在的問題是,沒法做到消息被B、C、D服務都消費的目標。

例如日常開發中我們想將用戶登錄“陌陌”的消息同時要給用戶中心、anti-spam

這個時候就非常的不方便,那么PUB、SUB模式就呼之欲出了

基于Go實現的分布式MQ

pub/sub模式 以Topic為單位進行對消息的訂閱、發布

B、C、D這些訂閱了該Topic的服務均可以處理該Topic的消息

就好比你打開收音機,你在聽91.5飛魚秀、別人也訂閱了飛魚秀,可以同時收聽感興趣的主題的信息一樣的道理

三、KiteQ是什么?

基于Go實現的分布式MQ

kiteq 是我在今年2月份開始開發mq,它具有如上描述的特性,分布式、支持2PC、多種存儲方式、跨語言的特性。

首先想了解KiteQ,先對幾個概念有所了解

基于Go實現的分布式MQ

messageType產生是為了更細分業務類型下的消息。精細化控制消息的粒度

group對于kiteq來說是為了以組為單位,邏輯上認為是一個消費或者發送方,但groupId下面可以對應多臺實例用于實現負載均衡

最重要的時Binding,訂閱關系:訂閱方發給mq的簽約信條。kiteq擁有這個信條才會將訂閱方感興趣的話題消息推送給訂閱方,非常重要。

Listener當然分兩種:

第一種是MessageListener訂閱方接收方的消息送達回調

第二種是CheckMessageListener是用來解決分布式事務的本地事務提交或回滾的確認消息;

KiteQ的整體架構:

基于Go實現的分布式MQ

整體的流程就入圖描述

基于Go實現的分布式MQ

使用zk做集群發現包括 server producer consumer

同樣zk做訂閱關系的管理

基于Go實現的分布式MQ

如果(圖)描述了kiteq的三個組成部分在zk中的數據體現

基于Go實現的分布式MQ 基于Go實現的分布式MQ

普通消息就是不牽扯分布式事務的消息

生產者發送消息到kiteq,kiteq首先做的時存儲(哪怕是內存、Mysql、file)

同步給producer返回成功,或失敗

然后producer就去干別事情

kiteq內部會根據消息頭部的topic/messageType 去查詢kiteq通過zk下發的訂閱關系找到匹配的訂閱方分組id

然后通過分組ID選取客戶端物理連接 write出去

等待消費方回執:超時重發、成功記錄已經投遞成功的分組ID,下次不再投遞或失敗重投

基于Go實現的分布式MQ

事務消息,先是為了解決,比如在A系統中執行Mysql操作

同時要告訴用戶系統訂單充值成功夸兩個資源的訪問

這個時候就要保證兩個跨資源的事務同時成功同時失敗

所以就引入了事務消息

區別于普通消息

事務消息在發送階段在沒有得到本地producer回執本地事務成功或者回滾時是不會給consumer投遞該消息,這樣就保證了producer和遠端consumer服務的事務一致性

所以剛才提到的listener分兩種 CheckMessageListener就是為了做事務消息KiteQ詢問Producer本地事務處理成功或回滾的入口

要么Producer主動通知kiteq結果和KiteQ主動詢問Producer成功與否

三、KiteQ對幾種錯誤場景的處理

基于Go實現的分布式MQ

場景一:存儲失敗那就同步返回消息發送失敗,producer需要重新發送。注意我們指的MQ做到異步是指(發送方與消息被投遞給訂閱方是異步的,而發送方和MQ之間是同步方式不可避免,不然就會有丟消息不可靠的問題)

場景二:事務消息發送給KiteQ時頭部是一個未提交標識,所以Kiteq是不會投遞的,只投遞已提交消息,對本地事務結果的確認有客戶端主動 通知回滾和服務端主動詢問機制雙重保證消息最終的可投遞狀態,不會產生本地事務失敗,但是kiteq將消息投遞給訂閱者不一致的情況

基于Go實現的分布式MQ

場景三、四:訂閱方接收超時、或者宕機,KiteQ在每次投遞后會記錄當前已經投遞成功的分組也就是groupId,后續根據這些groupId再去重發

保證消息的一定送達,但是當消息超過的TTL或者最大投遞次數就會被放到deadline queue中不再投遞。所以即使機器groupId內的機器全部宕機也不會有丟消息的風險。

四、重復消息、消息順序的處理、消息堆積

重復消息:

大家其實可以想一下重復消息產生的原因

消息的ID現在采用的UUID,所以可以做到唯一

重復消息,比如消費端處理超時了(服務端不知道是否處理成功)、消費端掛了,必然需要重投,如果需要做到消息去重,我建議還是在客戶端做一個消息處理狀態用于去重

消息順序:

分布式場景下消息的順序是在是難以保證,然后很多人會說那用分布式鎖

what 's a fucking idea!

但我給的建議是消費端做狀態機。比如 處理順序是 1、2、3如果消息到來順序是2、1、3那么只需要客戶端做到2先到的時候返回處理失敗,通過下次重投自旋去保證狀態吧。

消息堆積:

所以mq設計也要考慮到這個問題,解決方案也就是你的存儲

試想redis不具有海量堆消息的能力,這是考驗MQ在消費方異常時非常關鍵的系統穩定性的指標。

在kiteq設計的時候,根據你消息的重要程度也提供了三種可靠級別的存儲mysql master-slave / file /memory供不同業務場景使用。

kiteq中的file就是模仿了kafka順序寫數據,分為data和log。data為消息體,log中保存消息的每次投遞結果(哪些失敗了哪些成功了等)、消息是否提交等數據,即對消息的更新操作記錄。內存中保持指定數目的segment

保證讀的性能,消息重投時通過邏輯id,二分查找消息所在的segment,然后按照tlv格式讀取消息體即可。

好了kiteq的東西就分享這么多吧

然后最后總結一下

Go開發的體會

基于Go實現的分布式MQ

希望對家學習go有所幫助吧

五:Q & A

1 為什么用zookeeper不用etcd

etcd在gopher china的時候不是提到了對于支持臨時序列節點簡直非常蛋疼。。。 而且我對zk比較熟吧

2. 目前這個消息系統應用于線上系統了嗎?

內部在推廣中

3 可以分別根據消息重要度來區別存儲么,比如重要的持久化到mysql,不重要的內存,還是全局只能用統一的消息存儲方式

根據消息重要性來區分,我覺得最好按照集群來劃分即可;比如可以丟掉消息就用file存儲、重要的就用Mysql的Master-slave;比較好管理

4. 看介紹采用的是推模式,那對consumer的處理速度做了適配處理么?

設計的時候有預留watermark;不同的consumer可以有不同的推送速度,歸根到底是偏向對訂閱方的保護了

5.對于分布式事務,服務器主動詢問發送方實現機制能不能詳細講解實現機制,尤其是發送方的處理

比如你購買會員:扣錢在本地mysql操作,然后要通知會員系統給用戶發送會員

這個時候都有可能失敗,所以kiteq的處理就是先發一個未提交的消息給kiteq,然后處理本地事務,本地事務處理成功就發送commit消 息、如果處理失敗就會提交rollback。如果本地事務處理時間比較長或者正好發送方掛掉了,那么服務端就會根據配置去回掃未提交的消息,并下發 txack消息用于詢問本地事務是否成功。這樣客戶端和服務端同時保證,未提交消息的最后確定的狀態。

6. 一個topic或者messagetype的msg會發送到不同的kiteq節點上嗎。然后一個group描述了topic、msgtype、produsers、consumers的關系?一個topic可以在多個group嗎

同一個topic的消息會發送到不同的kiteq上,因為KiteQ也是集群,具體的負載均衡是客戶端邏輯,當前采用的是隨機

topic這些事跟消息相關的、group是用來劃分不同集群,兩者屬于不同類別的概念

7. 推1,2,3。收1,3,2。接收端怎么知道這個順序。設計一組消息時對順序有嚴格要求的加個sort標識嗎

1、2、3是代表你的業務順序號,比如統一個訂單有訂單創建、未支付、支付完畢,1、2、3指的這個

所以你的消息里肯定有orderid,當然你可以把你的業務id放在消息頭部便于對這種進行處理

你要有序那就必須嚴格顯示跳轉啦

9. 使用zk 做訂閱關系管理 主要考慮 zk的服務發現和良好的異步通知功能吧?還要其他考量么

zk偏向成熟吧。

10 protobuf的使用是基于什么考慮呢,為何不用簡單的jsonrpc

至于json和pb的選擇 json的類型約束太弱了,而且效率和壓縮方面pb好于json。

11. 為什么不在Kafka的基礎上做擴展?

kafka還是感覺太重了些。而且如果在一個公司考慮運維成本和開發成本來說,開發一個代價相對較小吧。

12. kiteq 和 nsq 區別列舉一些,為什么說nsq可靠性不太夠

(不可靠因為)是否是消息來了持久化存儲

是否給發送方以回饋消息持久化成功

在訂閱方宕機的期間的消息是否在恢復后能夠收到期間的消息

這個也是非常重要的衡量標準

13 之前的問題,如果生產者在收到反饋前崩潰或mq崩潰,能否之后確認消息持久化

生產者和MQ之間是同步關系。所以狀態是清楚的,超時、失敗都認為是失敗需要重新發送,只有明確告知發送方成功才認為是成功的

這一階段跟普通的RPC是一樣的

調用成功就是成功失敗就是失敗哪怕是超時

所以說重復消息必然會有。發送方超時重新發送、接收方失敗重投。。。

(怎么去重?)消息ID是一方面,然后你的消息頭部可以定義你的業務ID號 然后本地數據庫保持這些業務ID號對應的狀態 同一個事務多條消息.....

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!