基于HTTP的消息隊列
之前介紹過基于TCP的消息隊列,這里在寫個基于HTTP的消息隊列。代碼僅僅演示整個程序員的框架。不會去考慮性能和實用性。簡單起見,我們只考慮固定URI的情況。當然,對于不同URI去存取不同的消息隊列也是意見很簡單的事情。
我們還是用腳本來實現。這里采用Mojolicious框架來作為我們的基礎模塊。
Mojolicious是基于EV的perl web框架。性能是非常不錯的。而且,符合PSGI規范,寫出來的程序可以獨立運行,也可以作為FCGI運行。
這里,我們使用一個數組來作為消息隊列的容器。消息隊列入對,對應數組的push操作,出隊列,對應于數組的shift操作(從數組的開頭取出一個元素,這樣才符合消息隊列先進先出的規矩)。
我們用另外一個數組來保存暫時沒有得到消息的用戶請求。
注意,因為使用的是內存數組來存放消息,所以,這個服務只能但進程運行。如果希望能多進程提供服務,需要考慮別的途徑來在進程之間共享消息隊列載體。
代碼很簡單,這主要歸功于Mojo的框架。
#!/usr/bin/env perl use Mojolicious::Lite; use Data::Dumper; my $jobs = []; my $reqs = []; get '/' => sub { my $self = shift; if ( scalar(@{$jobs}) == 0 ) { $self->render_later; my $timeout=$self->param('timeout') || 20; Mojo::IOLoop->stream($self->tx->connection)->timeout($timeout); push(@{$reqs},$self); } else { my $job = shift @{$jobs}; $self->render(text => $job ); } }; post '/' => sub { my $self=shift; my $msg = $self->req->body; $self->render(text => 'ok'); if ( scalar(@{$reqs}) > 0 ) { my $s = shift @{$reqs}; $s->render(text => $msg); } else { push(@{$jobs},$msg); printf STDERR "%d\r",scalar(@{$jobs}); } }; app->start;
運行這個腳本比較簡單,我們可以在命令行如下運行:
$perl ./mq_srv.pl daemon -l "http://*:8080"
這樣,我們就在8080端口啟動了一個HTTP服務。
如果我們這個時候直接去用瀏覽器或者curl去訪問/,大家會發現,服務器不會給你回應。因為此時隊列為空。
如果這個時候有另外用戶恰好扔了一個消息過來,那么剛才等待的請求會得到這個消息。
整個程序的QPS取決于兩個地方,第一個是Mojo本身在處理HTTP的速度。第二個是程序存取內存數組的速度。如果是正經寫MQ,通常我們會使用內存+硬盤的方案。比如說,內存可以存放10萬個消息。對出來的放到磁盤上去。那么這個時候,效率將取決于如何設計這個硬盤文件格式以及如何在文件和內存中來回倒騰。這些問題也沒啥太多的技術含量,純粹是經驗和技巧。
大家看著樂一樂就行了。不用較真我這個程序的效率。
另外,如果真有人希望比較效率的話,可以把上面的程序作為nginx的FCGI來運行。這樣效率會高不少。