基于go+protobuff實現的多種持久化方案的mq框架:kiteq

dd2d 9年前發布 | 23K 次閱讀 kiteq 消息系統

基于go+protobuff實現的多種持久化方案的mq框架

簡介

* 基于zk維護發送方、訂閱方、broker訂閱發送關系、支持水平、垂直方面的擴展
* 基于與topic以及第二級messageType訂閱消息
* 基于mysql、文件存儲方式多重持久層消息存儲
* 保證可靠異步投遞
* 支持兩階段提交分布式事務

工程結構

kiteq/
├── README.md
├── binding           訂閱關系管理處理跟ZK的交互
├── build.sh          安裝腳本
├── client            KiteQ的客戶端
├── doc               文檔
├── handler           KiteQ所需要的處理Handler
├── kite_benchmark.go KiteQ的Benchmark程序
├── kiteq.go          KiteQ對外啟動入口
├── pipe              類似netty的pipeline結構的框架,組織event和handler流轉
├── protocol          KiteQ的協議包,基于PB和定義的Packet
├── remoting          網絡層包括remoting-server和client及重連邏輯、客戶端管理
├── server            KiteQ的Server端組裝需要的組件
├── stat              狀態信息統計
└── store             KiteQ的存儲結構
概念:
* Binding:訂閱關系,描述訂閱某種消息類型的數據結構
* Consumer : 消息的消費方
* Producer : 消息的發送方
* Topic: 消息的主題比如 Trade則為消息主題,一般可以定義為某種業務類型
* MessageType: 第二級別的消息類型,比如Trade下存在支付成功的pay-succ-200的消息類型

架構圖

image

Zookeeper數據結構

KiteServer : /kiteq/server/${topic}/ip:port
    Producer   : /kiteq/pub/${topic}/${groupId}/ip:port
    Consumer   : /kiteq/sub/${topic}/${groupId}-bind/#$data(bind)
流程:
1. KiteQ啟動會將自己可以接受和投遞的Topics列表給到zookeeper
2. KiteQ拉取Zookeeper上的Topics下的訂閱關系(Bingding:訂閱方推送上來的訂閱消息信息)。
3. Consumer推送自己需要訂閱的Topic+messageType的消息的訂閱關系(Binding)到Zookeeper
4. Consumer拉取當前提供推送Topics消息的KiteQ地址列表,并發起TCP長連接
5. Producer推送自己可以發布消息Topics列表到Zookeeper
6. Producer拉取當前提供接受Topics消息的KiteQ地址列表,并發起TCP長連接
訂閱方式:
Direct (直接訂閱): 明確的Topic+MessageType訂閱消息
Regx(正則式訂閱):  Topic級別下,對MessageType進行正則匹配方式訂閱消息
Fanout(廣播式訂閱): Topic級別下,訂閱所有的MessageType的消息
兩階段提交:
因為引入了異步投遞方案,所以在有些場景下需要本地執行某個事務成功的時候,本條消息才可以被訂閱方消費。
例如:
    用戶購買會員支付成功成功需要修改本地用戶賬戶Mysql的余額、并且告知會員系統為用戶的會員期限延長。
    這個時候就會碰到、必須在保證mysql操作成功的情況下,會員系統才可以接收到會員延期的消息。

對于以上的問題,KiteQ的處理和ali的Notify系統一樣,
    1. 發送一個UnCommit的消息到KiteQ ,KiteQ 不會對Uncommite的消息做投遞操作
    2. KiteQ定期對UnCommit的消息向Producer發送TxAck的詢問
    3. 直到Producer明確告訴Commit或者Rollback該消息
    4. Commit會走正常投遞流程、Rollback會對當前消息回滾即刪除操作。
QuickStart
1.編譯:sh build.sh 
2.安裝裝Zookeeper:省略
啟動KiteQ:
    ./kiteq -bind=172.30.3.124:13800 -pport=13801 -db="mock://kiteq" -topics=trade,feed -zkhost=localhost:2181
    -bind  //綁定本地IP:Port
    -pport //pprof的Http端口
    -db //存儲的協議地址  mock:// 啟動mock模式 mysql:// mmap:// 
    -topics //本機可以處理的topics列表逗號分隔
    -zkhost //zk的地址

啟動客戶端:
    對于KiteQClient需要實現消息監聽器,我們定義了如下的接口:
    type IListener interface {
        //接受投遞消息的回調
        OnMessage(msg *protocol.StringMessage) bool
        //接收事務回調
        // 除非明確提交成功、其余都為不成功
        // 有異常或者返回值為false均為不提交
        OnMessageCheck(tx *protocol.TxResponse) error
    }

啟動Producer :
    producer := client.NewKiteQClient(${zkhost}, ${groupId}, ${password}, &defualtListener{})
    producer.SetTopics([]string{"trade"})
    producer.Start()
    //構建消息
    msg := &protocol.StringMessage{}
    msg.Header = &protocol.Header{
        MessageId:     proto.String(store.MessageId()),
        Topic:         proto.String("trade"),
        MessageType:   proto.String("pay-succ"),
        ExpiredTime:   proto.Int64(time.Now().Unix()),
        DeliveryLimit: proto.Int32(-1),
        GroupId:       proto.String("go-kite-test"),
        Commit:        proto.Bool(true)}
    msg.Body = proto.String("echo")
    //發送消息
    producer.SendStringMessage(msg)

啟動Consumer:
    consumer:= client.NewKiteQClient(${zkhost}, ${groupId}, ${password}, &defualtListener{})
    consumer.SetBindings([]*binding.Binding{
        binding.Bind_Direct("s-mts-test", "trade", "pay-succ", 1000, true),
    })
    consumer.Start()

就可以完成發布和訂閱消息的功能了.....

項目主頁:http://www.baiduhome.net/lib/view/home/1426562780351

 本文由用戶 dd2d 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!