RabbitMQ基本概念和使用
RabbitMQ是一個消息代理,核心原理:發送消息,接收消息。
RabbitMQ主要用于組件之間的解耦,消息發送者無需知道消息使用者的存在,反之亦然。
單向解耦 雙向解耦(如:RPC)
例如一個日志系統,很容易使用RabbitMQ簡化工作量,一個Consumer進行消息的正常處理,另一個Consumer復制對消息進行日志記錄,只要在程序中指定兩個Consumer所監聽的queue以相同的方式綁定到同一個exchange即可,剩下的消息分發工作由RabbitMQ完成。
安裝rabbitmq,請參考官網。
首先通過一個非常簡單的”hello world“例子介紹如何使用RabbitMQ,然后再介紹其涉及的基本概念并對交換機和隊列多做點介紹。
一、helloworld例子
本例非常簡單——發送一個消息”hello world“,然后獲取它并輸出到屏幕。
總共需要兩個程序,一個發送消息叫send.py,一個接受消息并打印消息內容叫receive.py。
該圖為helloworld例子的原理圖:生產者send.py(Productor)把消息(”hello world“)發送到一個名為”queue“的隊列中,消費者receive.py從這個隊列中獲取消息。接下來看代碼:
send.py(發送消息)
#!/usr/bin/env python import pika第一步,連接RabbitMq服務器
rabbit_username='xxx' rabbit_password='xxx' credentials = pika.PlainCredentials(rabbit_username, rabbit_password) connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials))
channel是進行消息讀寫的通道
channel = connection.channel()
第二步,創建一個名為queue的隊列,然后把消息發送到這個隊列
channel.queue_declare(queue='queue')
第三步,現在可以發送消息,但是RabbitMQ不能把消息直接發送到隊列,要發送到交換器,這個稍后介紹,這里使用默認交換器(exchange),它使用一個空字符串標
識,routing_key參數必須指定為隊列名稱,這里為queue
channel.basic_publish(exchange='', routing_key='queue', body='hello world') print "send.py:send message 'hello world',wait for receive.py deal with this message"
退出程序前,通過關閉連接保證消息已經投遞到RabbitMq
connection.close()</pre>
</div>receive.py(獲取數據)
print ' [*] Waiting for messages. To exit press CTRL+C'!/usr/bin/env python
import pika
第一步,同樣連接RabbitMq服務器
rabbit_username='xxx' rabbit_password='xxx' credentials = pika.PlainCredentials(rabbit_username, rabbit_password) connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials)) channel = connection.channel()
為確保隊列存在,再次執行queue_declare創建一個隊列,我們可以多次運行該命令,但是只要一個隊列會創建
因為不能保證send.py先執行還是receive.py先執行,所以重復聲明隊列來確保其存在
channel.queue_declare(queue='hellolxy')
第三步,定義一個回調函數,當獲得消息時,Pika庫調用這個回調函數來處理消息,該回調函數將消息內容打印到屏幕
def callback(ch, method, properties, body): print "receive.py: Received message %r" % (body,)
第四步,告訴rabbbitMq回調函數將從queue隊列接收消息
channel.basic_consume(callback, queue='queue', no_ack=True)
第五步,輸入一個無限循環來等待消息數據并運行回調函數
print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()</pre>
</div>現在在終端運行程序。首先,用send.py發送一條消息:
$ python send.py send.py:send message 'hello world',wait for receive.py deal with this message生產者(producer)程序send.py每次運行后就會停止。現在運行receive.py來接收消息:
$ python receive.py receive.py: Received message 'hello world' [*] Waiting for messages. To exit press CTRL+C成功了!現在已經通過RabbitMQ發送了第一條消息。但是receive.py程序并沒有退出,它一直在準備獲取消息,可以通過ctrl-c來中斷它。
二、RabbitMQ基本概念
總結一下發送接收消息的過程:
通過上面例子對RabbitMQ有一個感性認識后,現在來介紹RabbitMQ中的基本概念。
Broker:消息隊列服務器實體
消息:每個消息都有一個路由鍵(routing key)的屬性。就是一個簡單的字符串。
connection:應用程序與broker的網絡連接。
channel:幾乎所有的操作都在channel中進行,channel是進行消息讀寫的通道。客戶端可建立多個channel,每個channel代表一個會話任務。
交換機:接收消息,根據路由鍵轉發消息到綁定的隊列。
綁定:一個綁定就是基于路由鍵將交換機和隊列連接起來的路由規則,所以交換機不過就是一個由綁定構成的路由表。
舉例:一個具有路由鍵“key1”的消息要發送到兩個隊列,queueA和queueB。要做到這點就要建立兩個綁定,每個綁定連接一個交換機和一個隊列。兩者都是由路由鍵“key1”觸發,這種情況,交換機會復制一份消息并把它們分別發送到兩個隊列中。
隊列:消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
三、交換機
交換機用來接收消息,轉發消息到綁定的隊列,是rabbitMq中的核心。
交換機共有4種類型:direct,topic,headers和fanout。
為什么不創建一種交換機來處理所有類型的路由規則?因為每種規則匹配時的CPU開銷是不同的,所以根據不同需求選擇合適交換機。
舉例:一個"topic"類型的交換機會將消息的路由鍵與類似“dog.*”的模式進行匹配。一個“direct”類型的交換機會將路由鍵與 “dogs”進行比較。匹配末端通配符比直接比較消耗更多的cpu,所以如果用不到“topic”類型交換機帶來的靈活性,就通過“direct”類型交 換機獲得更高的處理效率。
1、Direct交換機:轉發消息到routingKey指定隊列(完全匹配,單播)。
routingKey與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發routingkey標記為dog的消息,不會轉發dog.puppy,也不會轉發dog.guard等。
2、Topic交換機:按規則轉發消息(最靈活,組播)
Topic類型交換機通過模式匹配分配消息的routing-key屬性。將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。
它將routing-key和binding-key的字符串切分成單詞。這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配不多不少一個單詞。
例如,binding key:*.stock.#匹配routing key: usd.stock和eur.stock.db,但是不匹配stock.nana。
例如,“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。
3、Fanout交換機:轉發消息到所有綁定隊列(最快,廣播)
fanout交換機不處理路由鍵,簡單的將隊列綁定到交換機上,每個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。
很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。
4、Note
- 如果沒有隊列綁定在交換機上,則發送到該交換機上的消息會丟失。
- 一個交換機可以綁定多個隊列,一個隊列可以被多個交換機綁定。
- 還有一些其他類型的交換機類型,如header、failover、system等,現在在當前的RabbitMQ版本中均未實現。
- 因為交換機是命名實體,聲明一個已經存在的交換機,但是試圖賦予不同類型是會導致錯誤。客戶端需要刪除這個已經存在的交換機,然后重新聲明并且賦予新的類型。
- 交換機的屬性:
- 持久性:如果啟用,交換機將會在server重啟前都有效。
- 自動刪除:如果啟用,那么交換機將會在其綁定的隊列都被刪掉之后刪除自身。
- 惰性:如果沒有聲明交換機,那么在執行到使用的時候會導致異常,并不會主動聲明。
</ul> </li> </ul>四、隊列
- 隊列的屬性:
- 持久性:如果啟用,隊列將在Server服務重啟前都有效。
- 自動刪除:如果啟用,那么隊列將會在所有的消費者停止使用之后自動刪除自身。
- 惰性:如果沒有聲明隊列,那么在執行到使用的時候會導致異常,并不會主動聲明。
- 排他性:如果啟用,隊列只能被聲明它的消費者使用。
</ul> </li> </ul>深入了解可參考:
官網教程
上面兩篇是翻譯成中文的,剩下的都沒有翻譯,可參考英文版
python采用pika庫使用rabbitmq總結,多篇筆記和示例
</div>
來自:http://www.cnblogs.com/starof/p/4173413.html本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!