RabbitMQ高級指南——AMQP解析(上)

nb400195 8年前發布 | 43K 次閱讀 AMQP RabbitMQ 消息系統

AMQP(Advanced Message Queuing Protocol) 高級消息隊列協議 是一個規范,它規定了

  • 系統模型,定義了AMQP中各個 實體 的名稱,交互流程

  • 通訊規范,定義了實體之間交互的數據包格式、 實體 之間通訊的具體命令

  • 序列化標準,定義了通訊數據的序列化和反序列化格式

《AMQP解析》分為上下,主要討論系統模型和通訊規范。這兩部分是AMQP的核心內容,理解這兩部分內容也就掌握了AMQP的精髓。

AMQP模型

 

也許你見過下面這幅圖

這就是AMQP的 實體 模型,它把整個系統分代表 客戶端 的Publisher(生產者)、Consumer(消費者);代表 服務器端 的Broker。 不同于我們用過的其他MQ系統"消息=隊列",AMQP模型解耦和 消息隊列 ,我們模擬一個Publisher發送log到Consumer。

  • Publisher產生一條數據,發送 到Broker。例子中就是發送 到服務器

  • Broker中的Exchange可以理解為一個 規則表 (routing key和queue的映射關系——Binding),Broker收到消息后根據 routing key 查詢投遞的目標 queue

  • Consumer向Broker發送 訂閱 消息的時候會指定自己監聽哪個queue,當有數據到達queue的時broker會推送數據到consumer。

Publisher指明誰可以收到消息(routing key)而不具體指定某個Queue;Consumer只是純粹的監聽Queue而不會關心數據從而來 。Exchange非常像計算機網絡通訊交換機, 收到數據后查詢轉發表決定下一跳出口 ;實際上Exchange并不真實的存在它僅僅是一個規則表。匹配->投遞的動作是由Broker內部的進程完成的。 Publisher、Consumer連接到Broker都是通過 Connection 完成的,它代表了一個TCP連接。TCP連接對于操作系統來說是非常昂貴的資源,如果每秒鐘千上萬個消息發送給Broker,每次發送都是一個TCP連接這是一筆非常大的系統開銷。為了提高通道的利用率AMQP規定了channel的概念,可以把它理解為和broker的一次會話。如果你接觸過Http2.0應該知道多路復用(Multiplexing),這個特性和AMQP是一模一樣的。

Exchange的類型

 

Exchange代表了消息和Queue的映射關系,它非常靈活。AMQP定義了三種類型的關聯方式

  • 直接關聯(direct)

這種類型的Exchange看似簡單其實非常有威力,它的邏輯是:Exchange根據routing key尋找到對應的Queue。比如我定義下面的Exchange

我們需要把“性能數據”分別存放在Mysql和InfluxDB,可以定義一個“metric”Exchange,把“mysql”、“InfluxDB”和“mertic”做綁定(binding)。 Broker收到發送給Exchange的數據后會判斷routing key,如果publisher沒有指定routing key則分別投遞給InfluxDB、mysql。所以看起來就像是一條數據被鏡像成兩份分別發給兩個Queue。 “空”也是一種routing key。設置為“空”其實還是會發生“比較”,只不過publisher剛好也是“空”所以才會往兩個隊列中同時發送。 如果我們把mysql的綁定修改一下

Broker收到發送給Exchange的數據后會判斷routing key,如果publisher沒有指定routing key則投遞給InfluxDB;如果routing key是mysql則投遞給mysql。

  • 廣播(fanout)

這種類型的Exchange最簡單,無視routing key,有多少個Queue和Exchange綁定就直接推送到Queue。 比如上面的例子中如果Exchange是fanout類型的,則無論你設置不設置routing key,publisher發送不發送routing key;Exchange都會分別投遞到兩個Queue。一般這種類型的Exchange很少用。

  • 模糊匹配(topic)

和direct很像,direct比較routing key的時候是嚴格比較。topic則允許你用“模糊匹配”。

上面的映射可以翻譯為:

  • 所有cpu、mem開頭的數據給mysql Queue

  • 所有disk、network開頭的數據給InfluxDB Queue

發送數據的時候我們可以這樣

內存數據我們可以用“mem.free”、“mem.usage”,它們都會被投遞到mysql queue。同樣的道理,disk、network開頭的數據會被投遞到InfluxDB隊列中。 需要特別注意的是

如果你routing key什么都不設置,它的含義不是指“匹配所有的消息”;而是指匹配“routing key為空”的數據(publisher發送的routing key是空)。如果你要匹配所有的數據,正確的方法應該是——“#”。 提示:RabbitMQ還有一個擴展類型的Exchange——header。它會根據publisher的發送數據的頭部信息選擇Queue。比如可以根據header部分的userId選擇Queue其實有點像IM了(userId代表的是發送者用戶名)

小技巧

學習的最好方式是實踐,AMQP的規范并不像想想的那么復雜,特別是RabbitMQ的實現代碼非常簡潔,甚至有人懷疑AMQP協議是專門為Erlang定制的(其實二者真沒有任何關系) RabbitMQ提供了一個漂亮的Web界面可以讓我們不用寫一行代碼就能實驗AMQP的所有功能,修改rabbitmq的配置文件啟用rabbitmq_management插件

配置文件enabled_plugins
[rabbitmq_management].

你可以用命令行搞定 rabbitmq-plugins enable rabbitmq_management 重啟RabbitMQ后訪問15672端口

過命令行新建一個用戶

rabbitmqctl add_user fireflyc 123 #新建用戶fireflyc,密碼123
rabbitmqctl set_user_tags fireflyc administrator #設置fireflyc是超級管理員

登錄

  • Exchange標簽我們可以看到所有的Exchange,最底下可以Add a new exchange

  • Queue標簽我們可以看到所有的Queue,最底下可以Add a new queue

  • 點擊某個Exchange我們可以看到Binding,可以通過Publish Message發送消息

  • 點擊某個Queue我們可以通過Get message獲取隊列中的消息

 

 

 

 

來自:http://mp.weixin.qq.com/s?__biz=MzIxMjAzMDA1MQ==&mid=2648945635&idx=1&sn=966633eeba2567e7759b597e43568054&chksm=8f5b54efb82cddf9678821ad9708fc404c087034471f3385ccac09dae0392a146b3673e3ccbd#rd

 

 本文由用戶 nb400195 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!