淺析消息隊列 RabbitMQ
什么是rabbitMQ
rabbitMQ是一款基于AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。而且使用消息中間件利于應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。
rabbitMQ工作原理
首先我們得先理解rabbitMQ里的一些基本定義,主要如下:
exchange: producer只能將消息發送給exchange。而exchange負責將消息發送到queues。Exchange必須準確的知道怎么處理它接受到的消息,是被發送到一個特定的queue還是許多quenes,還是被拋棄,這些規則則是通過exchange type來定義。主要的type有direct,topic,headers,fanout。具體針對不同的場景使用不同的type。
queue: 消息隊列,消息的載體。接收來自exchange的消息,然后再由consumer取出。exchange和queue是可以一對多的,它們通過routingKey來綁定。
Producer:生產者,消息的來源,消息必須發送給exchange。而不是直接給queue
Consumer:消費者,直接從queue中獲取消息進行消費,而不是從exchange。
從以上可以看出Rabbitmq工作原理大致就是producer把一條消息發送給exchange。rabbitMQ根據routingKey負責將消息從exchange發送到對應綁定的queue中去,這是由rabbitMQ負責做的。而consumer只需從queue獲取消息即可。
大致流程如下:
rabbitMQ工作模型
下面通過幾個列子來詳細說明一下如何使用rabbitmq
簡單發送模型
在rabbit MQ里消息永遠不能被直接發送到queue。這里我們通過提供一個空字符串來使用默認的exchange。這個exchange是特殊的,它可以根據routingKey把消息發送給指定的queue。所以我們的設計看起來如下所示
代碼如下
send.py
receive.py
工作隊列模型
這種模式常常用來處理耗資源耗時間的任務在多個workers中,主要是為了避免立即去處理一個耗時的任務而等待它的完成。代替的做法是一個稍后去處理這個任務,讓一個worker process 在后臺處理這個任務。當有許多workers的時候,消息將會以輪詢的方式被workers獲取。模型如下
這里就會有一個問題,如果consumer在執行任務時需要花費一些時間,這個時候如果突然掛了,消息還沒有被完成,消息豈不是丟失了,為了不讓消息丟失,rabbitmq提供了消息確認機制,consumer在接收到,執行完消息后會發送一個ack給rabbitmq告訴它可以從queue中移除消息了。如果沒收到ack。Rabbitmq會重新發送此條消息,如果有其他的consumer在線,將會接收并消費這條消息。消息確認機制是默認打開的。如果想關閉它只需要設置no_ack=true。在此處我們不需要設置。默認如下就行
channel.basic_consume(callback, queue='hello')
除了consumer之外我們還得確保rabbitMQ掛了之后消息不被丟失。這里我們就需要確保隊列queue和消息messages都得是持久化的。
隊列的持久話需要設置durable屬性
channel.queue_declare(queue= task_queue, durable=True)
消息的持久話則是通過delivery_mode屬性,設置值為2即可
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
還有一個屬性相對比較重要,它可以保證consumer確認消費完一條消息之后再去獲取下一條消息。如果consumer正在忙碌的狀態,消息將會被分發到下一個不是很忙的consumer。設置如下
channel.basic_qos(prefetch_count=1)
下面貼出部分代碼
producer.py
consumer.py
廣播模型
在前面2個示例我們都適用默認的exchange。這里我們將自己定義一個exchange。并設置type為fanout。它可以將消息廣播給綁定的每一個queue。而不再是某一個queue。我們在此創建一個叫logs的exchange,就像下面這樣
channel.exchange_declare(exchange='logs', type='fanout')
所以發布消息就變成了下面這樣
channel.basic_publish(exchange='logs',routing_key='', body=message)
在這里我們需要將消息發送給所有的queues。而不需要指定某些隊列。所以我們這里就用臨時隊列代替。并設置在失去連接后刪除隊列。當然我們也可以不這么做。
設置臨時隊列,讓rabbitmq給我們一個隨機的隊列名字,設置exclusive為true確保失去連接的時候隊列也被刪除了。因為我們這里不需要持久話它。
result = channel.queue_declare(exclusive=True)
下面就是要綁定queues和exchange
channel.queue_bind(exchange='logs', queue=result.method.queue)
綜上所述我們的設計就像下面這樣
部分代碼如下
producer.py
consumer.py
direct模型
在上個模型中,消息被發送給所有的消費者,而在這一部分我們將通過路由的方式使exchange通過定義的路由方式將消息發送給隊列。所以我們需要在綁定exchange和queue的時候指定routing_key字段,注意這里的routing_key不是basic_publish中的routing_key。見如下
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
這里我們將使用type為direct的exchange。這種路由方式exchange將消息通過綁定的routing_key發送到指定的隊列。而且exchange可以通過多個routing_key把消息發送給同一個queue
通過下面這張圖我們來分析一下
在上面的圖中,我們可以看出type為direct的exchange X 綁定了2個隊列。隊列Q1與關聯路由orange。隊列Q2關聯路由black和green。所以一個帶有路由健orange消息將被exchange發送給隊列Q1。而帶有路由健black或者green的消息將被發送給隊列Q2。
我們還是通過修改前面的日志系統,來展示direct類型的exchange如何工作,如下圖
部分代碼如下
producer.py
consumer.py
讓我們運行一下看看結果是什么,我們啟動了3個consumer,routing_key分別指定為warning, error,第三個同時指定這2個。然后在運行producer時帶上路由信息routing_key。運行后可以看出指定了warning的不會收到error的消息。同時指定warning 和error的consumer則會都收到消息。
發送消息
只收到warning的消息
只收到error的消息
error和waring的都能收到
Topic模型
這種模型是最靈活的,相比較于direct的完全匹配和fanout的廣播。Topic可以用類似正則的手法更好的匹配來滿足我們的應用。下面我們首先了解一下topic類型的exchange
topic類型的routing_key不可以是隨意的單詞,它必須是一系列的單詞組合,中間以逗號隔開,譬如“quick.orange.rabbit”這個樣子。發送消息的routing_key必須匹配上綁定到隊列的routing_key。消息才會被發送。此外還有個重要的地方要說明,在如下代碼處綁定的routing_key種可以有*和#2種字符
channel.queue_bind(exchange='topic_logs',queue=queue_name,
routing_key=binding_key)
它們代表的意義如下
*(星號) 可以匹配任意一個單詞
#(井號) 可以匹配0到多個單詞
我們通過下圖來解釋一下
Q1匹配3個單詞中間為orange的routing_key ,而Q2可以匹配3個單詞最后一個單詞為rabbit和第一個單詞為lazy后面可以有多個單詞的routing_key
下面貼上部分示例
producer.py
consumer.py
RPC應用模型
當我們需要在遠程服務器上執行一個方法并等待它的結果的時候,我們將這種模式稱為RPC。下面我們用rabbitMQ建立一個RPC系統
在rabbit MQ中為了能讓client收到server端的response message。需要定義一個callback queue ,就像下面這樣
不過現在有一個問題,就是每次請求都會創建一個callback queue .這樣的效率是極其低下的。幸運的是我們可以通過correlation_id為每一個client創建一個單獨的callback queue。通過指定correlation_id我們可以知道callback queue中的消息屬于哪個client。要做到這樣只需client每次發送請求時帶上這唯一的correlation_id。然后當我們從callback queue中收到消息時,我們能基于 correlation_id 匹配上我們的消息。匹配不上的消息將被丟棄,看上去就像下圖這樣
總結一下流程如下
- client發起請求,請求中帶有2個參數reply_to和correlation_id
- 請求發往rpc_queue
- server獲取到rpc_queue中的消息,處理完畢后,將結果發往reply_to指定的callback queue
- client 獲取到callback queue中的消息,匹配correlation_id,如果匹配就獲取,不匹配就丟棄.
從上面的6個示例我們大致了解了如何運用rabbitMQ解決我們的實際需求,下面我們再來看看如何管理和監控rabbitMQ的實際運行情況
rabbitMQ管理和監控
rabbitmq management插件
rabbitMQ提供了一個管理插件,通過這個插件我們可以查看當前rabbitMQ服務的運行情況。在解壓縮官網提供的rabbitMQ安裝包之后,在sbin目錄可以看見rabbitmq-plugins文件,我們只需運行一下命令
rabbitmq-pluginsenablerabbitmq_management
然后再游覽器中輸入http:// server-name :15672/ 就可以查看當前rabbitMQ的一些運行狀況。如下所示
在這個管理控制臺我可以做很多事情,譬如
- 查看運行的exchanges,queues,users,virtual hosts還有權限
- 添加exchanges,queue,users,virtual host,以及給用戶賦予權限
- 監控消息長度,通道,消息速度。連接數
- 發送接收消息
- 關閉連接,清除隊列
rabbitmqctl使用
在rabbitMQ中,rabbitctl是一個被廣泛使用的命令。對用戶的增加,刪除,列出列表,創建權限,都是通過rabbitmqctl完成的。下面舉幾個例子來熟悉一下如何使用
創建一個用戶名和密碼都為test的新用戶
./rabbitmqctl add_usertesttest
刪除的話使用一下命令
./rabbitmqctl delete_usertest
列出所有用戶
./rabbitmqctl list_users
同樣也可以用此命令為用戶賦予權限
譬如我們想為用戶test在vhost rabbitmq賦予全部訪問權限,只許執行如下命令
./rabbitmqctlset_permissions –p rabbitmqtest “.*” “.*” “.*”
列出權限
./rabbitmqctlset_permissions –p rabbitmqtest “.*” “.*” “.*”
刪除權限
./rabbitmqctlclear_permissions –p rabbitmq
同樣的rabbitmqctl也可以用來查看rabbitmq的運行狀況,如下
列出隊列和消息數目
./rabbitmqctl list_queues –p rabbitmq
如果想要了解更多的隊列消息,譬如名字,消息數目,消費者數目,內存使用情況,以及其他屬性 。則可以發送一下命令
./rabbitmqctllist_queuesnamemessagesconsumersmemorydurableauto_delete
列出exchanges相關信息
./rabbitmqctllist_exchanges name type durable auto_delete
rabbitmqctl還有很多功能,這里不一一例舉了。有興趣的可以去官方網站查看。
來自:http://blog.jobbole.com/107237/