淺析消息隊列 RabbitMQ

jgszk1986 8年前發布 | 50K 次閱讀 RabbitMQ 消息系統

什么是rabbitMQ

rabbitMQ是一款基于AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。而且使用消息中間件利于應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。

rabbitMQ工作原理

首先我們得先理解rabbitMQ里的一些基本定義,主要如下:

exchange: producer只能將消息發送給exchange。而exchange負責將消息發送到queues。Exchange必須準確的知道怎么處理它接受到的消息,是被發送到一個特定的queue還是許多quenes,還是被拋棄,這些規則則是通過exchange type來定義。主要的type有direct,topic,headers,fanout。具體針對不同的場景使用不同的type。

queue: 消息隊列,消息的載體。接收來自exchange的消息,然后再由consumer取出。exchange和queue是可以一對多的,它們通過routingKey來綁定。

Producer:生產者,消息的來源,消息必須發送給exchange。而不是直接給queue

Consumer:消費者,直接從queue中獲取消息進行消費,而不是從exchange。

從以上可以看出Rabbitmq工作原理大致就是producer把一條消息發送給exchange。rabbitMQ根據routingKey負責將消息從exchange發送到對應綁定的queue中去,這是由rabbitMQ負責做的。而consumer只需從queue獲取消息即可。

大致流程如下:

rabbitMQ工作模型

下面通過幾個列子來詳細說明一下如何使用rabbitmq

簡單發送模型

在rabbit MQ里消息永遠不能被直接發送到queue。這里我們通過提供一個空字符串來使用默認的exchange。這個exchange是特殊的,它可以根據routingKey把消息發送給指定的queue。所以我們的設計看起來如下所示

代碼如下

send.py

receive.py

工作隊列模型

這種模式常常用來處理耗資源耗時間的任務在多個workers中,主要是為了避免立即去處理一個耗時的任務而等待它的完成。代替的做法是一個稍后去處理這個任務,讓一個worker process 在后臺處理這個任務。當有許多workers的時候,消息將會以輪詢的方式被workers獲取。模型如下

這里就會有一個問題,如果consumer在執行任務時需要花費一些時間,這個時候如果突然掛了,消息還沒有被完成,消息豈不是丟失了,為了不讓消息丟失,rabbitmq提供了消息確認機制,consumer在接收到,執行完消息后會發送一個ack給rabbitmq告訴它可以從queue中移除消息了。如果沒收到ack。Rabbitmq會重新發送此條消息,如果有其他的consumer在線,將會接收并消費這條消息。消息確認機制是默認打開的。如果想關閉它只需要設置no_ack=true。在此處我們不需要設置。默認如下就行

channel.basic_consume(callback,  queue='hello')

除了consumer之外我們還得確保rabbitMQ掛了之后消息不被丟失。這里我們就需要確保隊列queue和消息messages都得是持久化的。

隊列的持久話需要設置durable屬性

channel.queue_declare(queue= task_queue, durable=True)

消息的持久話則是通過delivery_mode屬性,設置值為2即可

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                        delivery_mode = 2, # make message persistent
                      ))

還有一個屬性相對比較重要,它可以保證consumer確認消費完一條消息之后再去獲取下一條消息。如果consumer正在忙碌的狀態,消息將會被分發到下一個不是很忙的consumer。設置如下

channel.basic_qos(prefetch_count=1)

下面貼出部分代碼

producer.py

consumer.py

廣播模型

在前面2個示例我們都適用默認的exchange。這里我們將自己定義一個exchange。并設置type為fanout。它可以將消息廣播給綁定的每一個queue。而不再是某一個queue。我們在此創建一個叫logs的exchange,就像下面這樣

channel.exchange_declare(exchange='logs', type='fanout')

所以發布消息就變成了下面這樣

channel.basic_publish(exchange='logs',routing_key='', body=message)

在這里我們需要將消息發送給所有的queues。而不需要指定某些隊列。所以我們這里就用臨時隊列代替。并設置在失去連接后刪除隊列。當然我們也可以不這么做。

設置臨時隊列,讓rabbitmq給我們一個隨機的隊列名字,設置exclusive為true確保失去連接的時候隊列也被刪除了。因為我們這里不需要持久話它。

result = channel.queue_declare(exclusive=True)

下面就是要綁定queues和exchange

channel.queue_bind(exchange='logs', queue=result.method.queue)

綜上所述我們的設計就像下面這樣

部分代碼如下

producer.py

consumer.py

direct模型

在上個模型中,消息被發送給所有的消費者,而在這一部分我們將通過路由的方式使exchange通過定義的路由方式將消息發送給隊列。所以我們需要在綁定exchange和queue的時候指定routing_key字段,注意這里的routing_key不是basic_publish中的routing_key。見如下

channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

這里我們將使用type為direct的exchange。這種路由方式exchange將消息通過綁定的routing_key發送到指定的隊列。而且exchange可以通過多個routing_key把消息發送給同一個queue

通過下面這張圖我們來分析一下

在上面的圖中,我們可以看出type為direct的exchange X 綁定了2個隊列。隊列Q1與關聯路由orange。隊列Q2關聯路由black和green。所以一個帶有路由健orange消息將被exchange發送給隊列Q1。而帶有路由健black或者green的消息將被發送給隊列Q2。

我們還是通過修改前面的日志系統,來展示direct類型的exchange如何工作,如下圖

部分代碼如下

producer.py

consumer.py

讓我們運行一下看看結果是什么,我們啟動了3個consumer,routing_key分別指定為warning, error,第三個同時指定這2個。然后在運行producer時帶上路由信息routing_key。運行后可以看出指定了warning的不會收到error的消息。同時指定warning 和error的consumer則會都收到消息。

發送消息

只收到warning的消息

只收到error的消息

error和waring的都能收到

Topic模型

這種模型是最靈活的,相比較于direct的完全匹配和fanout的廣播。Topic可以用類似正則的手法更好的匹配來滿足我們的應用。下面我們首先了解一下topic類型的exchange

topic類型的routing_key不可以是隨意的單詞,它必須是一系列的單詞組合,中間以逗號隔開,譬如“quick.orange.rabbit”這個樣子。發送消息的routing_key必須匹配上綁定到隊列的routing_key。消息才會被發送。此外還有個重要的地方要說明,在如下代碼處綁定的routing_key種可以有*和#2種字符

channel.queue_bind(exchange='topic_logs',queue=queue_name,
                    routing_key=binding_key)

它們代表的意義如下

*(星號) 可以匹配任意一個單詞

#(井號) 可以匹配0到多個單詞

我們通過下圖來解釋一下

Q1匹配3個單詞中間為orange的routing_key ,而Q2可以匹配3個單詞最后一個單詞為rabbit和第一個單詞為lazy后面可以有多個單詞的routing_key

下面貼上部分示例

producer.py

consumer.py

RPC應用模型

當我們需要在遠程服務器上執行一個方法并等待它的結果的時候,我們將這種模式稱為RPC。下面我們用rabbitMQ建立一個RPC系統

在rabbit MQ中為了能讓client收到server端的response message。需要定義一個callback queue ,就像下面這樣

不過現在有一個問題,就是每次請求都會創建一個callback queue .這樣的效率是極其低下的。幸運的是我們可以通過correlation_id為每一個client創建一個單獨的callback queue。通過指定correlation_id我們可以知道callback queue中的消息屬于哪個client。要做到這樣只需client每次發送請求時帶上這唯一的correlation_id。然后當我們從callback queue中收到消息時,我們能基于 correlation_id 匹配上我們的消息。匹配不上的消息將被丟棄,看上去就像下圖這樣

總結一下流程如下

  1. client發起請求,請求中帶有2個參數reply_to和correlation_id
  2. 請求發往rpc_queue
  3. server獲取到rpc_queue中的消息,處理完畢后,將結果發往reply_to指定的callback queue
  4. client 獲取到callback queue中的消息,匹配correlation_id,如果匹配就獲取,不匹配就丟棄.

從上面的6個示例我們大致了解了如何運用rabbitMQ解決我們的實際需求,下面我們再來看看如何管理和監控rabbitMQ的實際運行情況

rabbitMQ管理和監控

rabbitmq management插件

rabbitMQ提供了一個管理插件,通過這個插件我們可以查看當前rabbitMQ服務的運行情況。在解壓縮官網提供的rabbitMQ安裝包之后,在sbin目錄可以看見rabbitmq-plugins文件,我們只需運行一下命令

rabbitmq-pluginsenablerabbitmq_management

然后再游覽器中輸入http:// server-name :15672/ 就可以查看當前rabbitMQ的一些運行狀況。如下所示

在這個管理控制臺我可以做很多事情,譬如

  1. 查看運行的exchanges,queues,users,virtual hosts還有權限
  2. 添加exchanges,queue,users,virtual host,以及給用戶賦予權限
  3. 監控消息長度,通道,消息速度。連接數
  4. 發送接收消息
  5. 關閉連接,清除隊列

rabbitmqctl使用

在rabbitMQ中,rabbitctl是一個被廣泛使用的命令。對用戶的增加,刪除,列出列表,創建權限,都是通過rabbitmqctl完成的。下面舉幾個例子來熟悉一下如何使用

創建一個用戶名和密碼都為test的新用戶

./rabbitmqctl  add_usertesttest

刪除的話使用一下命令

./rabbitmqctl  delete_usertest

列出所有用戶

./rabbitmqctl  list_users

同樣也可以用此命令為用戶賦予權限

譬如我們想為用戶test在vhost rabbitmq賦予全部訪問權限,只許執行如下命令

./rabbitmqctlset_permissions –p rabbitmqtest “.*” “.*” “.*”

列出權限

./rabbitmqctlset_permissions –p rabbitmqtest “.*” “.*” “.*”

刪除權限

./rabbitmqctlclear_permissions –p rabbitmq

同樣的rabbitmqctl也可以用來查看rabbitmq的運行狀況,如下

列出隊列和消息數目

./rabbitmqctl  list_queues –p rabbitmq

如果想要了解更多的隊列消息,譬如名字,消息數目,消費者數目,內存使用情況,以及其他屬性 。則可以發送一下命令

./rabbitmqctllist_queuesnamemessagesconsumersmemorydurableauto_delete

列出exchanges相關信息

./rabbitmqctllist_exchanges  name  type  durable  auto_delete

rabbitmqctl還有很多功能,這里不一一例舉了。有興趣的可以去官方網站查看。

 

來自:http://blog.jobbole.com/107237/

 

 本文由用戶 jgszk1986 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!