RabbitMQ教程 - 路由

jopen 9年前發布 | 21K 次閱讀 RabbitMQ 消息系統

路由

(使用 pika 0.9.8 Python客戶端)

在這篇教程中,我們將給它添加一個功能 - 我們將使它能夠只訂閱消息的一個子集。比如,我們將能夠只把嚴重的error消息給導到log文件(保存到磁盤)中,而仍然能夠在終端打印所有的log消息。

綁定

在前一個例子中,我們已經創建了綁定。你可以回憶一下類似下面的代碼:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)
一個綁定是一個exchange和一個隊列之間的關系。這可以被簡單地讀為:隊列對這個exchange的消息感興趣。

綁定操作可以帶一個額外的routing_key參數。為了避免與basic_publish的那個參數混淆,我們將把它稱作binding key。這是我們如何創建一個帶有key的綁定的方法:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')
一個binding key的具體含義依賴于exchange type。 fanout類型的exchanges,即我們前面用到的那個,是簡單地忽略它的值。

Direct exchange

我們前一篇教程中的logging系統把所有的消息廣播給所有的消費者。我們想要擴展它,以允許基于消息的嚴重性來過濾消息。比如,我們可能希望將log消息寫入磁盤的腳本只接收嚴重的errors,而不在warning或info log消息上浪費磁盤空間。

我們之前使用了fanout exchange,但它沒有給我們太大的靈活性 - 它只會沒頭沒腦地廣播。

我們將使用一個direct exchange來替代。一個direct exchange背后的路由算法很簡單 - 一個消息將進入binding key與消息的routing key完全匹配的隊列。

處消息的routing key是否是basic_publish的那個routing_key?但那個參數不是應該表示隊列的名字么?

為了描述那個場景,可以看下下面的圖:

RabbitMQ教程 - 路由

在這個圖中,我們可以看到direct exchange X有兩個隊列與它綁定。第一個隊列使用了binding key orange來綁定,第二個有兩個綁定,一個的binding key是black,另一個是green

在這個圖中,一個發送給這個exchange并帶有routing key orange的消息將被路由到隊列Q1。帶有routing key blackgreen的消息將被路由到Q2。所有其它的消息將被丟棄。

多綁定

RabbitMQ教程 - 路由

使用相同的binding key綁定多個隊列也是完全合法的。在我們的例子中,我們可以使用binding key black來在XQ1之間添加一個綁定。在那種情況下,direct exchange的行為將像fanout一樣,將消息廣播到所有的隊列。一個帶有routing key black的消息將被同時發送到Q1Q2

由此看來basic_publish 的routing_key參數指的只是exchange的這個binding key,而不是隊列的名字。

發送logs

我們將在我們的logging系統中使用這個模型。我們將把消息發送給一個direct exchange,而不是fanout。我們將提供log severity作為一個routing key。那樣的話,接收腳本將能夠選擇它感興趣的severity的消息來接收。讓我們先將注意力放在發射logs。

我們總是需要先創建一個exchange:

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

然后我們就為發送消息做好了準備了:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
為了簡化問題,我們將假設'severity'可能是'info','warning','error'中的一個。

訂閱

接受消息的工作方式與前一份教程中的類似,有一個例外 - 我們將能夠為我們感興趣的每個severity創建一個新的綁定。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

代碼綜合

emit_log_direct.py的代碼:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
receive_logs_direct.py的代碼:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

如果你只想把'warning'和'error'(而不是'info')的log消息保存進一個文件,則可以打開終端并鍵入:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

如果你想要在你的屏幕上看到所有的log消息,則打開一個新的終端并執行:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

而要發射一條error log消息,則只需鍵入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

emit_log_direct.pyreceive_logs_direct.py的完整的源代碼。

進入tutorial 5來了解如何基于一個模式監聽消息。

生產者,消費者。生產者關心的是exchange和routing key。消費者關心的是exchange、隊列和routing key。

Done。

原文鏈接

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!