ZeroMQ,史上最快的消息隊列 - ZMQ的學習和研究
ZeroMQ,史上最快的消息隊列
—– ZMQ的學習和研究
一、ZeroMQ的背景介紹
引用官方的說法: “ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網絡協議棧的一部分,之后進入Linux內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統”BSD套接字之上的一 層封裝。ZMQ讓編寫高性能網絡應用程序極為簡單和有趣。”
近幾年有關”Message Queue”的項目層出不窮,知名的就有十幾種,這主要是因為后摩爾定律時代,分布式處理逐漸成為主流,業界需要一套標準來解決分布式計算環境中節點之間的消息通信。幾年的競爭下來,Apache基金會旗下的符合AMQP/1.0標準的RabbitMQ已經得到了廣泛的認可,成為領先的MQ項目。
與RabbitMQ相比,ZMQ并不像是一個傳統意義上的消息隊列服務器,事實上,它也根本不是一個服務器,它更像是一個底層的網絡通訊庫,在Socket API之上做了一層封裝,將網絡通訊、進程通訊和線程通訊抽象為統一的API接口。
二、ZMQ是什么?
閱讀了ZMQ的Guide文檔后,我的理解是,這是個類似于Socket的一系列接口,他跟Socket的區別是:普通的socket是端到端的(1:1的關系),而ZMQ卻是可以N:M 的關系,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網絡編程更為簡單。ZMQ用于node與node間的通信,node可以是主機或者是進程。
三、本文的目的
在集群對外提供服務的過程中,我們有很多的配置,需要根據需要隨時更新,那么這個信息如果推動到各個節點?并且保證信息的一致性和可靠性?本文在介紹ZMQ基本理論的基礎上,試圖使用ZMQ實現一個配置分發中心。從一個節點,將信息無誤的分發到各個服務器節點上,并保證信息正確性和一致性。
四、ZMQ的三個基本模型
ZMQ提供了三個基本的通信模型,分別是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”,我們從這三種模式一窺ZMQ的究竟
ZMQ的hello world!
由Client發起請求,并等待Server回應請求。請求端發送一個簡單的hello,服務端則回應一個world。請求端和服務端都可以是 1:N 的模型。通常把 1 認為是 Server ,N 認為是Client 。ZMQ 可以很好的支持路由功能(實現路由功能的組件叫作Device),把 1:N 擴展為N:M (只需要加入若干路由節點)。如圖1所示:

圖1:ZMQ的Request-Reply 通信
服務端的php程序如下:
<?php
/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while(true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
// Send reply back to client
$responder->send("World");
} Client程序如下:
<?php
/*
* Hello World client
* Connects REQ socket to tcp://localhost:5555
* Sends "Hello" to server, expects "World" back
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to talk to server
echo "Connecting to hello world server...\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5555");
for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
printf ("Sending request %d...\n", $request_nbr);
$requester->send("Hello");
$reply = $requester->recv();
printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
} 從以上的過程,我們可以了解到使用ZMQ寫基本的程序的方法,需要注意的是:
a) 服務端和客戶端無論誰先啟動,效果是相同的,這點不同于Socket。
b) 在服務端收到信息以前,程序是阻塞的,會一直等待客戶端連接上來。
c) 服務端收到信息以后,會send一個“World”給客戶端。值得注意的是一定是client連接上來以后,send消息給Server,然后Server再rev然后響應client,這種一問一答式的。如果Server先send,client先rev是會報錯的。
d) ZMQ通信通信單元是消息,他除了知道Bytes的大小,他并不關心的消息格式。因此,你可以使用任何你覺得好用的數據格式。Xml、Protocol Buffers、Thrift、json等等。
e) 雖然可以使用ZMQ實現HTTP協議,但是,這絕不是他所擅長的。
ZMQ的Publish-subscribe模式
我們可以想象一下天氣預報的訂閱模式,由一個節點提供信息源,由其他的節點,接受信息源的信息,如圖2所示:

圖2:ZMQ的Publish-subscribe
示例代碼如下 :
Publisher:
<?php
/*
* Weather update server
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
// Prepare our context and publisher
$context = new ZMQContext();
$publisher = $context->getSocket(ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5556");
while (true) {
// Get values that will fool the boss
$zipcode = mt_rand(0, 100000);
$temperature = mt_rand(-80, 135);
$relhumidity = mt_rand(10, 60);
// Send message to all subscribers
$update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);
$publisher->send($update);
}</pre>
Subscriber
<pre><?php
/*
* Weather update client
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds avg temp in zipcode
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to talk to server
echo "Collecting updates from weather server…", PHP_EOL;
$subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
$filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";
$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
// Process 100 updates
$total_temp = 0;
for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {
$string = $subscriber->recv();
sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);
$total_temp += $temperature;
}
printf ("Average temperature for zipcode '%s' was %dF\n",
$filter, (int) ($total_temp / $update_nbr)); 這段代碼講的是,服務器端生成隨機數zipcode、temperature、relhumidity分別代表城市代碼、溫度值和濕度值。然后不斷的廣播信息,而客戶端通過設置過濾參數,接受特定城市代碼的信息,收集完了以后,做一個平均值。
a) 與Hello World不同的是,Socket的類型變成SOCKET_PUB和SOCKET_SUB類型。
b) 客戶端需要$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);設置一個過濾值,相當于設定一個訂閱頻道,否則什么信息也收不到。
c) 服務器端一直不斷的廣播中,如果中途有Subscriber端退出,并不影響他繼續的廣播,當Subscriber再連接上來的時候,收到的就是后來發送的新的信息了。這對比較晚加入的,或者是中途離開的訂閱者,必然會丟失掉一部分信息,這是這個模式的一個問題,所謂的Slow joiner。稍后,會解決這個問題。
d) 但是,如果Publisher中途離開,所有的Subscriber會hold住,等待Publisher再上線的時候,會繼續接受信息。
ZMQ的PipeLine模型
想象一下這樣的場景,如果需要統計各個機器的日志,我們需要將統計任務分發到各個節點機器上,最后收集統計結果,做一個匯總。PipeLine比較適合于這種場景,他的結構圖,如圖3所示。

圖3:ZMQ的PipeLine模型
Parallel task ventilator in PHP
<?php
/*
* Task ventilator
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to send messages on
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->bind("tcp://*:5557");
echo "Press Enter when the workers are ready: ";
$fp = fopen('php://stdin', 'r');
$line = fgets($fp, 512);
fclose($fp);
echo "Sending tasks to workers…", PHP_EOL;
// The first message is "0" and signals start of batch
$sender->send(0);
// Send 100 tasks
$total_msec = 0; // Total expected cost in msecs
for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
// Random workload from 1 to 100msecs
$workload = mt_rand(1, 100);
$total_msec += $workload;
$sender->send($workload);
}
printf ("Total expected cost: %d msec\n", $total_msec);
sleep (1); // Give 0MQ time to deliver Parallel task worker in PHP
<?php
/*
* Task worker
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to receive messages on
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://localhost:5557");
// Socket to send messages to
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("tcp://localhost:5558");
// Process tasks forever
while (true) {
$string = $receiver->recv();
// Simple progress indicator for the viewer
echo $string, PHP_EOL;
// Do the work
usleep($string * 1000);
// Send results to sink
$sender->send("");
} Parallel task sink in PHP
<?php
/*
* Task sink
* Binds PULL socket to tcp://localhost:5558
* Collects results from workers via that socket
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
// Prepare our context and socket
$context = new ZMQContext();
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->bind("tcp://*:5558");
// Wait for start of batch
$string = $receiver->recv();
// Start our clock now
$tstart = microtime(true);
// Process 100 confirmations
$total_msec = 0; // Total calculated cost in msecs
for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
$string = $receiver->recv();
if($task_nbr % 10 == 0) {
echo ":";
} else {
echo ".";
}
}
$tend = microtime(true);
$total_msec = ($tend - $tstart) * 1000;
echo PHP_EOL;
printf ("Total elapsed time: %d msec", $total_msec);
echo PHP_EOL; 從程序中,我們可以看到,task ventilator使用的是SOCKET_PUSH,將任務分發到Worker節點上。而Worker節點上,使用SOCKET_PULL從上游接受任務,并使用SOCKET_PUSH將結果匯集到Slink。值得注意的是,任務的分發的時候也同樣有一個負載均衡的路由功能,worker可以隨時自由加入,task ventilator可以均衡將任務分發出去。
五、其他擴展模式
通常,一個節點,即可以作為Server,同時也能作為Client,通過PipeLine模型中的Worker,他向上連接著任務分發,向下連接著結果搜集的Sink機器。因此,我們可以借助這種特性,豐富的擴展原有的三種模式。例如,一個代理Publisher,作為一個內網的Subscriber接受信息,同時將信息,轉發到外網,其結構圖如圖4所示。

圖4:ZMQ的擴展模式
六、多個服務器
ZMQ和Socket的區別在于,前者支持N:M的連接,而后者則只是1:1的連接,那么一個Client連接多個Server的情況是怎樣的呢,我們通過圖5來說明。

圖5:ZMQ的N:1的連接情況
我們假設Client有R1,R2,R3,R4四個任務,我們只需要一個ZMQ的Socket,就可以連接四個服務,他能夠自動均衡的分配任務。如圖5所示,R1,R4自動分配到了節點A,R2到了B,R3到了C。如果我們是N:M的情況呢?這個擴展起來,也不難,如圖6所示。

圖6:N:M的連接
我們通過一個中間結點(Broker)來進行負載均衡的功能。我們通過代碼了解,其中的Client和我們的Hello World的Client端是一樣的,而Server端的不同是,他不需要監聽端口,而是需要連接Broker的端口,接受需要處理的信息。所以,我們重點閱讀Broker的代碼:
<?php
/*
* Simple request-reply broker
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
// Prepare our context and sockets
$context = new ZMQContext();
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$frontend->bind("tcp://*:5559");
$backend->bind("tcp://*:5560");
// Initialize poll set
$poll = new ZMQPoll();
$poll->add($frontend, ZMQ::POLL_IN);
$poll->add($backend, ZMQ::POLL_IN);
$readable = $writeable = array();
// Switch messages between sockets
while(true) {
$events = $poll->poll($readable, $writeable);
foreach($readable as $socket) {
if($socket === $frontend) {
// Process all parts of the message
while(true) {
$message = $socket->recv();
// Multipart detection
$more = $socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
$backend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
if(!$more) {
break; // Last message part
}
}
}
else if($socket === $backend) {
$message = $socket->recv();
// Multipart detection
$more = $socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
$frontend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
if(!$more) {
break; // Last message part
}
}
}
} Broker監聽了兩個端口,接受從多個Client端發送過來的數據,并將數據,轉發給Server。在Broker中,我們監聽了兩個端口,使用了兩個Socket,那么對于多個Socket的情況,我們是不需要通過輪詢的方式去處理數據的,在之前,我們可以使用libevent實現,異步的信息處理和傳輸。而現在,我們只需要使用ZMQ的$poll->poll以實現多個Socket的異步處理。
七、進程間的通信
ZMQ不僅能通過TCP完成節點間的通信,也可以通過Socket文件完成進程間的通信。如圖7所示,我們fork三個PHP進程,將進程1的數據,通過Socket文件發送到進程3。

圖7:進程間的通信
<?php
function step1() {
$context = new ZMQContext();
// Signal downstream to step 2
$sender = new ZMQSocket($context, ZMQ::SOCKET_PAIR);
$sender->connect("ipc://step2.ipc");
$sender->send("hello ,i am step1");
}
function step2() {
$pid = pcntl_fork();
if($pid == 0) {
step1();
exit();
}
$context = new ZMQContext();
// Bind to ipc: endpoint, then start upstream thread
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PAIR);
$receiver->bind("ipc://step2.ipc");
// Wait for signal
sleep(10);
$strings = $receiver->recv();
echo "step2 receiver is $strings". PHP_EOL;
sleep(10);
// Signal downstream to step 3
$sender = new ZMQSocket($context, ZMQ::SOCKET_PAIR);
$sender->connect("ipc://step3.ipc");
$sender->send($strings);
}
// Start upstream thread then bind to icp: endpoint
$pid = pcntl_fork();
if($pid == 0) {
step2();
exit();
}
$context = new ZMQContext();
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PAIR);
$receiver->bind("ipc://step3.ipc");
// Wait for signal
$sr = $receiver->recv();
echo "the result is {$sr}".PHP_EOL; 在運行中,我們可以看到多了兩個文件,如圖8所示。

圖8:運行過程中生成的文件
八、利用ZeroMQ實現一個配置推送中心
當我們將WEB代碼部署到集群上的時候,如果需要實時的將最新的配置信息,主動的推送到各個機器節點。在此過程中,我們一定要保證,各個節點收到的信息的一致性和正確性,如果使用HTTP,由于他的無狀態性,我們無法保證信息的一致性,當然,你可以使用HTTP來實現,只是更復雜,為什么不用ZMQ?他能讓你更簡單的實現這些功能。
我們使用ZMQ的信息訂閱模式。在那個模式中,我們注意到,對于后來的加入節點,始終會丟失在他加入之前,已經發送的信息(Slow joiner)。我們可以開啟另外一個ZMQ的通信通道,用于報告當前節點的情況(節點的身份、準備狀態等),其結構如圖9所示。

圖9:擴展ZMQ的訂閱者模式
我們通過$context->getSocket(ZMQ::SOCKET_REQ);設置一個新的Request-Reply連接,來用于Subscriber向Publisher報告自己的身份信息,而Publisher則等待所有的Subscriber都連接上的時候,再選擇Publish自己的信息。
Subscriber端的程序如下:
<?php
$hostname = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "s1";
$context = new ZMQContext(2);
$sub = new ZMQSocket($context,ZMQ::SOCKET_SUB);
$sub->connect("tcp://localhost:5561");
//$subscriber->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $hostname);
$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,"");
$client = $context->getSocket(ZMQ::SOCKET_REQ);
$client->connect("tcp://localhost:5562");
while(1) {
//$client->connect("tcp://localhost:5562");
$client->send($hostname);
$version = $client->recv();
echo $version."\r\n";
if (!empty($version)) {
$recive = $sub->recv();
$vars = json_decode($recive);
var_dump($vars);
}
} Publisher端的程序如下:
<?php
$CONFIG["TAOKE_BTS"]["ENABLE"] = true;
$CONFIG["QP_BTS"]["ENABLE"] = true;
$CONFIG["QP_BTS"]["TK_TEST"] = 13;
$string = json_encode($CONFIG);
$clients = array("s2","s1","s3");
$context = new ZMQContext(10);
//Socket talk to clients
$publisher = new ZMQSocket($context,ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5561");
//Socket to publish message
$server = new ZMQSocket($context,ZMQ::SOCKET_REP);
$server->bind("tcp://*:5562");
while(count($clients)!=0) {
$client_name = $server->recv();
echo "{$client_name} is connect!\r\n";
if (in_array($client_name, $clients)) { //coming one client
$key = array_search($client_name, $clients);
unset($clients[$key]);
echo "$client_name has come in!\r\n";
$server->send("Version is 2.0");
} else {
$server->send("You are a stranger!");
}
}
$publisher->send($string);
?> 每個節點通過5562端口,使用Rep模式和Publisher連接,通過這個連接告之Publisher自己的機器名,而Publisher端通過白名單的方式,維護一個機器列表,當機器列表中所有的機器連接上來以后,通過5561端口,將最新的配置信息發送出去。
后續的處理,Subscriber可以選擇將配置信息寫入到APC緩存,程序將始終從緩存中讀取部分配置信息,Subscriber并將更新后的狀態信息,實時的通過5562報告給Publisher。
雖然,在本示例中不會出現,但是,如果需要發布的信息量過大,在接受信息的過程中,Subscriber端突然中斷網絡(或者是程序崩潰),那么當他在連接上來的時候,有部分信息就會丟失?ZMQ考慮到這個問題,通過$subscriber->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $hostname);設置一個id,當這個id的Subscriber重新連接上來的時候,他可以從上次中斷的地方,繼續接受信息,當然,節點的中斷,不會影響其他的節點繼續的接受信息。
那么ZMQ是怎么實現斷線重連后,繼續發送信息呢 ?他會將斷開的Subscriber應該接受到的信息發到內存中,等待他重新上線后,將緩存的信息,繼續發送給他。當然,內存必然是有限的,過多就會出現內存溢出。ZMQ通過
SetSockOpt(ZMQ::SOCKOPT_SWAP, 250000)設置Swap空間的大小,來防止out of memory and crash。最終,我們的程序運行結果,如圖10所示。

圖10:配置中心的運行結果
當然,這只是一個大體的思路,如果應用到實際的成產環境中,還需要考慮更多的問題,包含穩定性,容錯等等。然而,ZMQ由于高并發,以及穩定性和易用性,前景不錯,他的目標是進入Linux內核,我們期待那一天的到來。
參考資料 :
http://www.infoq.com/cn/news/2010/09/introduction-zero-mq Infoq對zeromq的簡介
http://zguide.zeromq.org/page:all ZeroMQ的guide文檔
來自: www.searchtb.com