基于go+protobuff實現的多種持久化方案的mq框架:kiteq
基于go+protobuff實現的多種持久化方案的mq框架
簡介
* 基于zk維護發送方、訂閱方、broker訂閱發送關系、支持水平、垂直方面的擴展 * 基于與topic以及第二級messageType訂閱消息 * 基于mysql、文件存儲方式多重持久層消息存儲 * 保證可靠異步投遞 * 支持兩階段提交分布式事務
工程結構
kiteq/ ├── README.md ├── binding 訂閱關系管理處理跟ZK的交互 ├── build.sh 安裝腳本 ├── client KiteQ的客戶端 ├── doc 文檔 ├── handler KiteQ所需要的處理Handler ├── kite_benchmark.go KiteQ的Benchmark程序 ├── kiteq.go KiteQ對外啟動入口 ├── pipe 類似netty的pipeline結構的框架,組織event和handler流轉 ├── protocol KiteQ的協議包,基于PB和定義的Packet ├── remoting 網絡層包括remoting-server和client及重連邏輯、客戶端管理 ├── server KiteQ的Server端組裝需要的組件 ├── stat 狀態信息統計 └── store KiteQ的存儲結構
概念:
* Binding:訂閱關系,描述訂閱某種消息類型的數據結構 * Consumer : 消息的消費方 * Producer : 消息的發送方 * Topic: 消息的主題比如 Trade則為消息主題,一般可以定義為某種業務類型 * MessageType: 第二級別的消息類型,比如Trade下存在支付成功的pay-succ-200的消息類型
架構圖
Zookeeper數據結構
KiteServer : /kiteq/server/${topic}/ip:port Producer : /kiteq/pub/${topic}/${groupId}/ip:port Consumer : /kiteq/sub/${topic}/${groupId}-bind/#$data(bind)
流程:
1. KiteQ啟動會將自己可以接受和投遞的Topics列表給到zookeeper 2. KiteQ拉取Zookeeper上的Topics下的訂閱關系(Bingding:訂閱方推送上來的訂閱消息信息)。 3. Consumer推送自己需要訂閱的Topic+messageType的消息的訂閱關系(Binding)到Zookeeper 4. Consumer拉取當前提供推送Topics消息的KiteQ地址列表,并發起TCP長連接 5. Producer推送自己可以發布消息Topics列表到Zookeeper 6. Producer拉取當前提供接受Topics消息的KiteQ地址列表,并發起TCP長連接
訂閱方式:
Direct (直接訂閱): 明確的Topic+MessageType訂閱消息 Regx(正則式訂閱): Topic級別下,對MessageType進行正則匹配方式訂閱消息 Fanout(廣播式訂閱): Topic級別下,訂閱所有的MessageType的消息
兩階段提交:
因為引入了異步投遞方案,所以在有些場景下需要本地執行某個事務成功的時候,本條消息才可以被訂閱方消費。 例如: 用戶購買會員支付成功成功需要修改本地用戶賬戶Mysql的余額、并且告知會員系統為用戶的會員期限延長。 這個時候就會碰到、必須在保證mysql操作成功的情況下,會員系統才可以接收到會員延期的消息。 對于以上的問題,KiteQ的處理和ali的Notify系統一樣, 1. 發送一個UnCommit的消息到KiteQ ,KiteQ 不會對Uncommite的消息做投遞操作 2. KiteQ定期對UnCommit的消息向Producer發送TxAck的詢問 3. 直到Producer明確告訴Commit或者Rollback該消息 4. Commit會走正常投遞流程、Rollback會對當前消息回滾即刪除操作。
QuickStart
1.編譯:sh build.sh 2.安裝裝Zookeeper:省略 啟動KiteQ: ./kiteq -bind=172.30.3.124:13800 -pport=13801 -db="mock://kiteq" -topics=trade,feed -zkhost=localhost:2181 -bind //綁定本地IP:Port -pport //pprof的Http端口 -db //存儲的協議地址 mock:// 啟動mock模式 mysql:// mmap:// -topics //本機可以處理的topics列表逗號分隔 -zkhost //zk的地址 啟動客戶端: 對于KiteQClient需要實現消息監聽器,我們定義了如下的接口: type IListener interface { //接受投遞消息的回調 OnMessage(msg *protocol.StringMessage) bool //接收事務回調 // 除非明確提交成功、其余都為不成功 // 有異常或者返回值為false均為不提交 OnMessageCheck(tx *protocol.TxResponse) error } 啟動Producer : producer := client.NewKiteQClient(${zkhost}, ${groupId}, ${password}, &defualtListener{}) producer.SetTopics([]string{"trade"}) producer.Start() //構建消息 msg := &protocol.StringMessage{} msg.Header = &protocol.Header{ MessageId: proto.String(store.MessageId()), Topic: proto.String("trade"), MessageType: proto.String("pay-succ"), ExpiredTime: proto.Int64(time.Now().Unix()), DeliveryLimit: proto.Int32(-1), GroupId: proto.String("go-kite-test"), Commit: proto.Bool(true)} msg.Body = proto.String("echo") //發送消息 producer.SendStringMessage(msg) 啟動Consumer: consumer:= client.NewKiteQClient(${zkhost}, ${groupId}, ${password}, &defualtListener{}) consumer.SetBindings([]*binding.Binding{ binding.Bind_Direct("s-mts-test", "trade", "pay-succ", 1000, true), }) consumer.Start() 就可以完成發布和訂閱消息的功能了.....
本文由用戶 dd2d 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!