RabbitMQ高級指南——AMQP解析(上)
AMQP(Advanced Message Queuing Protocol) 高級消息隊列協議 是一個規范,它規定了
-
系統模型,定義了AMQP中各個 實體 的名稱,交互流程
-
通訊規范,定義了實體之間交互的數據包格式、 實體 之間通訊的具體命令
-
序列化標準,定義了通訊數據的序列化和反序列化格式
《AMQP解析》分為上下,主要討論系統模型和通訊規范。這兩部分是AMQP的核心內容,理解這兩部分內容也就掌握了AMQP的精髓。
AMQP模型
也許你見過下面這幅圖
這就是AMQP的 實體 模型,它把整個系統分代表 客戶端 的Publisher(生產者)、Consumer(消費者);代表 服務器端 的Broker。 不同于我們用過的其他MQ系統"消息=隊列",AMQP模型解耦和 消息 和 隊列 ,我們模擬一個Publisher發送log到Consumer。
-
Publisher產生一條數據,發送 到Broker。例子中就是發送 到服務器
-
Broker中的Exchange可以理解為一個 規則表 (routing key和queue的映射關系——Binding),Broker收到消息后根據 routing key 查詢投遞的目標 queue
-
Consumer向Broker發送 訂閱 消息的時候會指定自己監聽哪個queue,當有數據到達queue的時broker會推送數據到consumer。
Publisher指明誰可以收到消息(routing key)而不具體指定某個Queue;Consumer只是純粹的監聽Queue而不會關心數據從而來 。Exchange非常像計算機網絡通訊交換機, 收到數據后查詢轉發表決定下一跳出口 ;實際上Exchange并不真實的存在它僅僅是一個規則表。匹配->投遞的動作是由Broker內部的進程完成的。 Publisher、Consumer連接到Broker都是通過 Connection 完成的,它代表了一個TCP連接。TCP連接對于操作系統來說是非常昂貴的資源,如果每秒鐘千上萬個消息發送給Broker,每次發送都是一個TCP連接這是一筆非常大的系統開銷。為了提高通道的利用率AMQP規定了channel的概念,可以把它理解為和broker的一次會話。如果你接觸過Http2.0應該知道多路復用(Multiplexing),這個特性和AMQP是一模一樣的。
Exchange的類型
Exchange代表了消息和Queue的映射關系,它非常靈活。AMQP定義了三種類型的關聯方式
-
直接關聯(direct)
這種類型的Exchange看似簡單其實非常有威力,它的邏輯是:Exchange根據routing key尋找到對應的Queue。比如我定義下面的Exchange
我們需要把“性能數據”分別存放在Mysql和InfluxDB,可以定義一個“metric”Exchange,把“mysql”、“InfluxDB”和“mertic”做綁定(binding)。 Broker收到發送給Exchange的數據后會判斷routing key,如果publisher沒有指定routing key則分別投遞給InfluxDB、mysql。所以看起來就像是一條數據被鏡像成兩份分別發給兩個Queue。 “空”也是一種routing key。設置為“空”其實還是會發生“比較”,只不過publisher剛好也是“空”所以才會往兩個隊列中同時發送。 如果我們把mysql的綁定修改一下
Broker收到發送給Exchange的數據后會判斷routing key,如果publisher沒有指定routing key則投遞給InfluxDB;如果routing key是mysql則投遞給mysql。
-
廣播(fanout)
這種類型的Exchange最簡單,無視routing key,有多少個Queue和Exchange綁定就直接推送到Queue。 比如上面的例子中如果Exchange是fanout類型的,則無論你設置不設置routing key,publisher發送不發送routing key;Exchange都會分別投遞到兩個Queue。一般這種類型的Exchange很少用。
-
模糊匹配(topic)
和direct很像,direct比較routing key的時候是嚴格比較。topic則允許你用“模糊匹配”。
上面的映射可以翻譯為:
-
所有cpu、mem開頭的數據給mysql Queue
-
所有disk、network開頭的數據給InfluxDB Queue
發送數據的時候我們可以這樣
內存數據我們可以用“mem.free”、“mem.usage”,它們都會被投遞到mysql queue。同樣的道理,disk、network開頭的數據會被投遞到InfluxDB隊列中。 需要特別注意的是
如果你routing key什么都不設置,它的含義不是指“匹配所有的消息”;而是指匹配“routing key為空”的數據(publisher發送的routing key是空)。如果你要匹配所有的數據,正確的方法應該是——“#”。 提示:RabbitMQ還有一個擴展類型的Exchange——header。它會根據publisher的發送數據的頭部信息選擇Queue。比如可以根據header部分的userId選擇Queue其實有點像IM了(userId代表的是發送者用戶名)
小技巧
學習的最好方式是實踐,AMQP的規范并不像想想的那么復雜,特別是RabbitMQ的實現代碼非常簡潔,甚至有人懷疑AMQP協議是專門為Erlang定制的(其實二者真沒有任何關系) RabbitMQ提供了一個漂亮的Web界面可以讓我們不用寫一行代碼就能實驗AMQP的所有功能,修改rabbitmq的配置文件啟用rabbitmq_management插件
配置文件enabled_plugins [rabbitmq_management].
你可以用命令行搞定 rabbitmq-plugins enable rabbitmq_management 重啟RabbitMQ后訪問15672端口
過命令行新建一個用戶
rabbitmqctl add_user fireflyc 123 #新建用戶fireflyc,密碼123 rabbitmqctl set_user_tags fireflyc administrator #設置fireflyc是超級管理員
登錄
-
Exchange標簽我們可以看到所有的Exchange,最底下可以Add a new exchange
-
Queue標簽我們可以看到所有的Queue,最底下可以Add a new queue
-
點擊某個Exchange我們可以看到Binding,可以通過Publish Message發送消息
-
點擊某個Queue我們可以通過Get message獲取隊列中的消息
來自:http://mp.weixin.qq.com/s?__biz=MzIxMjAzMDA1MQ==&mid=2648945635&idx=1&sn=966633eeba2567e7759b597e43568054&chksm=8f5b54efb82cddf9678821ad9708fc404c087034471f3385ccac09dae0392a146b3673e3ccbd#rd