消息隊列RabbitMQ入門介紹

jopen 11年前發布 | 51K 次閱讀 RabbitMQ 消息系統

(一)基本概念

RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,后來沒堅持。RabbitMQ是 AMQP(高級消息隊列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這里簡單介紹。

RabbitMQ的結構圖如下:

消息隊列RabbitMQ入門介紹

幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,并設置相關屬性。
(3)客戶端聲明一個queue,并設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符 號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還 有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

(二)應用實際

我使用Linux服務器(ubuntu 9.10 64位),安裝RabbitMQ非常方便。

先運行如下命令安裝erlang:

apt-get install erlang-nox

再到rabbitmq.com上下載RabbitMQ的安裝包,如下安裝:

dpkg -i rabbitmq-server_2.6.1-1_all.deb

安裝完后,使用

/etc/init.d/rabbitmq-server start|stop|restart

來啟動、停止、重啟rabbitmq。

在正式應用之前,我們先在RabbitMQ里創建一個vhost,加一個用戶,并設置該用戶的權限。

使用rabbitmqctl客戶端工具,在根目錄下創建”/pyhtest”這個vhost:

rabbitmqctl add_vhost /pyhtest

創建一個用戶名”pyh”,設置密碼”pyh1234″:

rabbitmqctl add_user pyh pyh1234

設置pyh用戶對/pyhtest這個vhost擁有全部權限:

rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”

后面三個”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部權限

設置好后,開始編程,我用Perl寫一個消息投遞程序(producer):

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;
use UUID::Tiny;

my $channel = 1000; # channel ID,可以隨意指定,只要不沖突
my $queuename = “pyh_queue”; # 隊列名
my $exchange = “pyh_exchange”; # 交換機名
my $routing_key = “test”; # routing key

my $mq = Net::RabbitMQ->new(); # 創建一個RabbitMQ對象

$mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連接
$mq->channel_open($channel); # 打開一個channel
$mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個持久化的交換機
$mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個持久化的隊列
$mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機和隊列間建立綁定

for (my $i=0;$i<10000000;$i++) { # 循環1000萬次
my $string = create_UUID_as_string(UUID_V1); # 產生一條UUID作為消息主體
$mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將消息結合key以持久化模式投遞到交換機
}

$mq->disconnect(); # 斷開連接

消息接受程序(consumer)大概如下:

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;

my $channel = 1001;
my $queuename = “pyh_queue”;
my $mq = Net::RabbitMQ->new();

$mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ });
$mq->channel_open($channel);

while (1) {
my $hashref = $mq->get($channel, $queuename);
last unless defined $hashref;
print $hashref->{message_count}, “: “, $hashref->{body},”\n”;
}

$mq->disconnect();

consumer連接后只要指定隊列就可獲取到消息。

上述程序共投遞1000萬條消息,每條消息36字節(UUID),打開持久化,共耗時17分多鐘(包括產生UUID的時間),每秒投遞消息約9500條。測試機器是8G內存、8核志強CPU。

投遞完后,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目錄,產生2G多的持久化消息數據。在運行consumer程序后,這些數據都會消失,因為消息已經被消費了。

原文地址:http://www.nsbeta.info/archives/200

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!