RabbitMQ教程 - 路由
路由
(使用 pika 0.9.8 Python客戶端)
在這篇教程中,我們將給它添加一個功能 - 我們將使它能夠只訂閱消息的一個子集。比如,我們將能夠只把嚴重的error消息給導到log文件(保存到磁盤)中,而仍然能夠在終端打印所有的log消息。
綁定
在前一個例子中,我們已經創建了綁定。你可以回憶一下類似下面的代碼:
channel.queue_bind(exchange=exchange_name, queue=queue_name)一個綁定是一個exchange和一個隊列之間的關系。這可以被簡單地讀為:隊列對這個exchange的消息感興趣。
綁定操作可以帶一個額外的routing_key參數。為了避免與basic_publish的那個參數混淆,我們將把它稱作binding key。這是我們如何創建一個帶有key的綁定的方法:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')一個binding key的具體含義依賴于exchange type。 fanout類型的exchanges,即我們前面用到的那個,是簡單地忽略它的值。
Direct exchange
我們前一篇教程中的logging系統把所有的消息廣播給所有的消費者。我們想要擴展它,以允許基于消息的嚴重性來過濾消息。比如,我們可能希望將log消息寫入磁盤的腳本只接收嚴重的errors,而不在warning或info log消息上浪費磁盤空間。
我們之前使用了fanout exchange,但它沒有給我們太大的靈活性 - 它只會沒頭沒腦地廣播。
我們將使用一個direct exchange來替代。一個direct exchange背后的路由算法很簡單 - 一個消息將進入binding key與消息的routing key完全匹配的隊列。
此處消息的routing key是否是basic_publish的那個routing_key?但那個參數不是應該表示隊列的名字么?
為了描述那個場景,可以看下下面的圖:
在這個圖中,我們可以看到direct exchange X有兩個隊列與它綁定。第一個隊列使用了binding key orange來綁定,第二個有兩個綁定,一個的binding key是black,另一個是green。
在這個圖中,一個發送給這個exchange并帶有routing key orange的消息將被路由到隊列Q1。帶有routing key black或green的消息將被路由到Q2。所有其它的消息將被丟棄。
多綁定
使用相同的binding key綁定多個隊列也是完全合法的。在我們的例子中,我們可以使用binding key black來在X和Q1之間添加一個綁定。在那種情況下,direct exchange的行為將像fanout一樣,將消息廣播到所有的隊列。一個帶有routing key black的消息將被同時發送到Q1和Q2。
由此看來basic_publish 的routing_key參數指的只是exchange的這個binding key,而不是隊列的名字。
發送logs
我們將在我們的logging系統中使用這個模型。我們將把消息發送給一個direct exchange,而不是fanout。我們將提供log severity作為一個routing key。那樣的話,接收腳本將能夠選擇它感興趣的severity的消息來接收。讓我們先將注意力放在發射logs。
我們總是需要先創建一個exchange:
channel.exchange_declare(exchange='direct_logs', type='direct')
然后我們就為發送消息做好了準備了:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)為了簡化問題,我們將假設'severity'可能是'info','warning','error'中的一個。
訂閱
接受消息的工作方式與前一份教程中的類似,有一個例外 - 我們將能夠為我們感興趣的每個severity創建一個新的綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
代碼綜合
emit_log_direct.py的代碼:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()receive_logs_direct.py的代碼:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[0],) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
如果你只想把'warning'和'error'(而不是'info')的log消息保存進一個文件,則可以打開終端并鍵入:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你想要在你的屏幕上看到所有的log消息,則打開一個新的終端并執行:
$ python receive_logs_direct.py info warning error [*] Waiting for logs. To exit press CTRL+C
而要發射一條error log消息,則只需鍵入:
$ python emit_log_direct.py error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
(emit_log_direct.py和receive_logs_direct.py的完整的源代碼。)
進入tutorial 5來了解如何基于一個模式監聽消息。
生產者,消費者。生產者關心的是exchange和routing key。消費者關心的是exchange、隊列和routing key。
Done。
原文鏈接。