[譯]RabbitMQ系列教程(一):Hello World
來自: http://my.oschina.net/andylucc/blog/605746
RabbitMQ是一個消息中間件。基本思想非常簡單:接收和轉發消息。你可以把它想象成一個郵局:當我們往郵箱里面投遞一個信件的時候,我們非常確信郵政快遞員能夠把我們的郵件送達到接受者的手中。使用這個隱喻,RabbitMQ是一個郵箱,是一個郵政局,是一個郵政快遞員。
與傳統的郵政局不同的是,RabbitMQ處理的不是紙質的郵件,而是二進制數據構成的消息。
關于RabbitMQ和消息的一些行話:
1,生產消息即發送,一個發送消息的進程叫Producer。我們用下面的圖表示(一個被標記為P的圓圈):
2,queue代表接消息的郵箱,存在于RabbitMQ中。盡管消息在應用和RabbitMQ之間流動,但是他們只能被存儲在queue之中。queue是沒有任何限制的,它可以用來存儲任何你想存儲的消息,它本質上是一個無限制的buffer。多個producer可以向一個queue中發送消息。多個consumer可以從一個queeue中接收消息。隊列可以用下面的圖來描述:
3,consuming意味著接收,consumer是一個想要接收消息的進程,同樣,我們可以用下面的圖來表達consumer:
值得注意的是,producer、consumer和broker不一定是在同一臺機器上,的確在通常的使用中很少有人在同一臺機器上使用它。
Hello World!
(使用pika 0.10.0 Python client)
下面的例子不會太復雜——我們發送一個消息,接收它并把它打印在終端。為了完成這件小事,我們需要寫兩端小程序:一個是用來發送消息的,另一個是用來接收消息并打印的。
我們的設計大概可以用下面的圖來表示:
producer生產消息發送至hello queue,consumer從hello queue中接收消息。
RabbitMQ libraries RabbitMQ是基于AMQP 0.9.1版本,它是一個開放的,用于處理消息的協議。現在已經有基于不同語言的多種版本的RabbitMQ客戶端,我們將會使用Pika,它是RabbitMQ團隊推薦的一個客戶端,我們可以使用pip包管理工具來安裝它。
發送消息:
我們的第一個小程序是send.py,我們將會向隊列發送一個消息。這個時候我們需要做的第一件事情就是和RabbitMQ服務器建立連接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
通過上面的代碼我們和本覅RabbitMQ的broker建立了一條物理連接,如果想要和不同的機器建立連接,這里只需要修改一下IP地址或域名即可。接下來,在發送消息之前,我們需要確保接收隊列存在。如果我們將消息發送至一個不存在的隊列,RabbitMQ會直接將消息丟棄。我們先創建一個叫“hello”的隊列:
channel.queue_declare(queue='hello')
這個時候我們已經可以發送消息了,我們的第一個消息只包含一個字符串“Hello World”,我們將會把它發送至“hello”隊列。
在RabbitMQ中,消息不是被直接送到隊列的,而是首先被送到exchange中。我們目前不需要了解的那么詳細,或者我們可以通過第三方的教程來學習有關exchanges的知識。我們現在所需要知道的是如何使用空字符串標識的默認exchange。這個exchange是比較特殊的,它使得我們可以指定消息應該被送往哪里。隊列名字需要在routing_key參數中指定。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")
在退出程序之前,我們需要確認網絡緩沖區的內容被刷新,內容被發送出去,我們可以通過關閉連接來達到效果:
connection.close()
接收消息:
我們的第二個程序receive.py將會送隊列中接收消息并且將消息的內容打印出來。因此,我們需要再次連接RabbitMQ服務器,相關代碼和之前相同。下一步,和之前一樣,我們必須確保隊列存在。因此我們可以用queue_declare。對于queue_declare接口,我們可以多次調用它,而結果是只有一個隊列會被創建。
channel.queue_declare(queue='hello')
你可能有點疑問,我們為什么要再一次declare隊列,在之前的代碼中,我們已經declare了呀。主要是因為我們一定要確認在發送消息前隊列是存在的,否則消息將會被丟棄。
我們可以通過命令行來查看隊列: $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.
從消息隊列接收消息是相對復雜的一件事,我們通過給隊列綁定一個回調函數,當我們接收到一條消息的時候,回調函數會被pika庫調用,下面這個回調函數將會把消息打印出來。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
接下來,接下來我們將要告訴RabbitMQ這個回調函數將會從指定的“hello”隊列來接收消息:
channel.basic_consume(callback, queue='hello', no_ack=True)
no_ack參數將會在后面介紹。
最后,我們進入了一個死循環,監聽消息,當有消息的時候我們將消息打印出來,然后繼續監聽。
print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
下面的完整的代碼:
send.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
現在我們可以在終端試運行我們的程序,首先,我們來向隊列發送一條消息:
$ python send.py [x] Sent 'Hello World!'
然后來接收消息:
$ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'
好了,例子到這里結束,下一節會介紹如何創建一個工作隊列。