[譯]RabbitMQ系列教程(一):Hello World

HugRenner 8年前發布 | 38K 次閱讀 ActiveMQ 消息系統

來自: http://my.oschina.net/andylucc/blog/605746


RabbitMQ是一個消息中間件。基本思想非常簡單:接收和轉發消息。你可以把它想象成一個郵局:當我們往郵箱里面投遞一個信件的時候,我們非常確信郵政快遞員能夠把我們的郵件送達到接受者的手中。使用這個隱喻,RabbitMQ是一個郵箱,是一個郵政局,是一個郵政快遞員。

與傳統的郵政局不同的是,RabbitMQ處理的不是紙質的郵件,而是二進制數據構成的消息。

關于RabbitMQ和消息的一些行話:

1,生產消息即發送,一個發送消息的進程叫Producer。我們用下面的圖表示(一個被標記為P的圓圈):

2,queue代表接消息的郵箱,存在于RabbitMQ中。盡管消息在應用和RabbitMQ之間流動,但是他們只能被存儲在queue之中。queue是沒有任何限制的,它可以用來存儲任何你想存儲的消息,它本質上是一個無限制的buffer。多個producer可以向一個queue中發送消息。多個consumer可以從一個queeue中接收消息。隊列可以用下面的圖來描述:


3,consuming意味著接收,consumer是一個想要接收消息的進程,同樣,我們可以用下面的圖來表達consumer:



值得注意的是,producer、consumer和broker不一定是在同一臺機器上,的確在通常的使用中很少有人在同一臺機器上使用它。


Hello World!

(使用pika 0.10.0 Python client)

下面的例子不會太復雜——我們發送一個消息,接收它并把它打印在終端。為了完成這件小事,我們需要寫兩端小程序:一個是用來發送消息的,另一個是用來接收消息并打印的。

我們的設計大概可以用下面的圖來表示:

producer生產消息發送至hello queue,consumer從hello queue中接收消息。

RabbitMQ libraries

RabbitMQ是基于AMQP 0.9.1版本,它是一個開放的,用于處理消息的協議。現在已經有基于不同語言的多種版本的RabbitMQ客戶端,我們將會使用Pika,它是RabbitMQ團隊推薦的一個客戶端,我們可以使用pip包管理工具來安裝它。

發送消息:

我們的第一個小程序是send.py,我們將會向隊列發送一個消息。這個時候我們需要做的第一件事情就是和RabbitMQ服務器建立連接。

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

通過上面的代碼我們和本覅RabbitMQ的broker建立了一條物理連接,如果想要和不同的機器建立連接,這里只需要修改一下IP地址或域名即可。接下來,在發送消息之前,我們需要確保接收隊列存在。如果我們將消息發送至一個不存在的隊列,RabbitMQ會直接將消息丟棄。我們先創建一個叫“hello”的隊列:

channel.queue_declare(queue='hello')

這個時候我們已經可以發送消息了,我們的第一個消息只包含一個字符串“Hello World”,我們將會把它發送至“hello”隊列。

在RabbitMQ中,消息不是被直接送到隊列的,而是首先被送到exchange中。我們目前不需要了解的那么詳細,或者我們可以通過第三方的教程來學習有關exchanges的知識。我們現在所需要知道的是如何使用空字符串標識的默認exchange。這個exchange是比較特殊的,它使得我們可以指定消息應該被送往哪里。隊列名字需要在routing_key參數中指定。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')print(" [x] Sent 'Hello World!'")

在退出程序之前,我們需要確認網絡緩沖區的內容被刷新,內容被發送出去,我們可以通過關閉連接來達到效果:

connection.close()

接收消息:

我們的第二個程序receive.py將會送隊列中接收消息并且將消息的內容打印出來。因此,我們需要再次連接RabbitMQ服務器,相關代碼和之前相同。下一步,和之前一樣,我們必須確保隊列存在。因此我們可以用queue_declare。對于queue_declare接口,我們可以多次調用它,而結果是只有一個隊列會被創建。

channel.queue_declare(queue='hello')

你可能有點疑問,我們為什么要再一次declare隊列,在之前的代碼中,我們已經declare了呀。主要是因為我們一定要確認在發送消息前隊列是存在的,否則消息將會被丟棄。

我們可以通過命令行來查看隊列:
$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.

從消息隊列接收消息是相對復雜的一件事,我們通過給隊列綁定一個回調函數,當我們接收到一條消息的時候,回調函數會被pika庫調用,下面這個回調函數將會把消息打印出來。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下來,接下來我們將要告訴RabbitMQ這個回調函數將會從指定的“hello”隊列來接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

no_ack參數將會在后面介紹。

最后,我們進入了一個死循環,監聽消息,當有消息的時候我們將消息打印出來,然后繼續監聽。

print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

下面的完整的代碼:

send.py

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

現在我們可以在終端試運行我們的程序,首先,我們來向隊列發送一條消息:

$ python send.py 
[x] Sent 'Hello World!'

然后來接收消息:

 $ python receive.py 
 [*] Waiting for messages. To exit press CTRL+C 
 [x] Received 'Hello World!'

好了,例子到這里結束,下一節會介紹如何創建一個工作隊列。

 本文由用戶 HugRenner 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!