消息隊列——RabbitMQ
1. 寫在前面
昨天簡單學習了一個 消息隊列 項目—— RabbitMQ ,今天趁熱打鐵,將學到的東西記錄下來。
學習的資料主要是官網給出的6個基本的消息發送/接收 模型 ,或者稱為6種不同的使用場景,本文便是對這6種模型加以敘述。
2. Tutorials
在學習6種模型之前,我們首先需要安裝RabbitMQ。RabbitMQ支持多種系統平臺,各平臺的安裝方法。安裝好之后,我們使用如下命令啟用Web端的管理插件: rabbitmq-plugins enable rabbitmq_management ,然后啟動RabbitMQ。接著用瀏覽器訪問 http://localhost:15672/ ,若能看到RabbitMQ相關Web頁面,說明啟動成功。
2.1 Hello World
正所謂萬事開頭難,我們先從最簡單的 Hello World 開始。首先當然是新建一個項目,導入RabiitMQ相關jar。我采用Maven來構建項目,因此只需要在pom文件中添加如下依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
接下來學習最簡單的消息隊列模型,如下圖:
在圖中,P代表 producer,它是消息的 生產者;C代表 consumer ,它是消息的 消費者 ;而紅色的矩形正是我們所謂的 消息隊列 ,它位于 RabbitMQ 中( RabbitMQ 中可以有很多這樣的隊列,并且每個隊列都有一個唯一的名字)。生產者(們)可以將消息發送到消息隊列中,消費者(們)可以從消息隊列中取出消息。
這種模型是不是很簡單呢?下面我們使用 Java ,借助于RabbitMQ來實現這種模型的消息通信。
首先我們介紹如何 send 消息到消息隊列。 send 之前,當然是和RabbitMQ服務器建立連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
接下來我們創建一個 channel ,大多數API都是通過這個對象來調用的:
Channel channel = connection.createChannel();
之后,我們便可以調用 channel 的如下方法去聲明一個隊列:
channel.queueDeclare("hello", false, false, false, null);
該方法的第一個參數是隊列的名稱,其余的參數先不管,之后會介紹。我們可以嘗試著去執行以上的5行代碼,然后打開Web端,可以看到新建了一個叫作 hello 的隊列:
有了隊列,我們便可以向其中發送消息了,同樣還是調用 channel 對象的API:
channel.basicPublish("", "hello", null, "Hello World".getBytes());
以上代碼所做的事情就是發送了一條字符串消息“Hello World”(第4個參數)到消息隊列。你可能注意到我們調用了String對象的 getBytes 方法,沒錯,我們發送的實際上二進制數據。因此,理論上你能夠發送任何數據到消息隊列中,而不僅僅是文本信息。
第2個參數叫做 路由鍵(routingKey) ,在該模型下必須與隊列名相同,至于為什么,和其他參數一樣,之后會了解到。
我們可以修改發送的文本,再次執行上述代碼,然后打開Web端查看,便可以查看到我們發送的消息:
點擊上圖的 name 字段下的 hello ,可以查看 hello 隊列中的具體信息:
接下來,我們去嘗試著去獲取 生產者 發送的消息,和 send 方法一樣,我們同樣需要連接服務器,創建 channel ,聲明隊列:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
之后我們可以調用 channel 的相關方法去監聽隊列,接收消息:
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
以上 basicConsume 方法中,第一個參數是隊列的名字;第二個參數表示是否自動確認消息的接收情況,我們使用true,自動確認;第三個參數需要傳入一個實現了 Consumer 接口的對象,我們簡單的 new 一個默認的 Consumer 的實現類 DefaultConsumer ,然后在 handleDelivery 方法中去處理接收到的消息( handleDelivery 方法會在接收到消息時被回調)。
運行以上代碼,我們可以打印出之前向隊列中 send 的數據:
Hello World
Hello World2
下面是 Hello World 的完整代碼:
public class App {
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "Hello World2".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this){
// 因為以上接收消息的方法是異步的(非阻塞),當采用單元測試方式執行該方法時,程序會在打印消息前結束,因此使用wait來防止程序提前終止。若使用main方法執行,則不需要擔心該問題。
wait();
}
}
}</code></pre>
2.2 Work queues
接下來我們學習第二種模型—— Work Queues 。顧名思義,這種模型描述的是一個生產者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務),如下圖所示:

下面我們用代碼去實現。先是生產者 send 消息到隊列,這次我們多發送些數據:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
for (int i = 0; i < 9; i++) {
channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}</code></pre>
然后是消費者接收數據:
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
try {
// Thread.sleep(1000);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
synchronized (this) {
wait();
}
}</code></pre>
代碼基本上和 Hello World 的代碼一樣,只是加上句 sleep 來模擬消費者(worker)處理消息所花的時間。
我們可以先執行三次 receive 方法(修改 sleep 的時間,其中消費者1 sleep 10s,消費者2,3 sleep 1s),讓三個消費者(worker)一起等待消息的到來,然后執行 send 方法發送9條消息,觀察三個消費者收到的消息情況。
若不出意外,你會看到如下的打印結果:
// 消費者1
0
// 10s 后
3
// 10s 后
6
// 消費者2
1
// 1s 后
4
// 1s 后
7
// 消費者3
2
// 1s 后
5
// 1s 后
8
通過打印結果,我們可以總結出 Work queues 的幾個特點:
- 一條消息只會被一個消費者接收;
- 消息是平均分配給消費者的;
- 消費者只有在處理完某條消息后,才會收到下一條消息。
Work queues(Task Queuess)的概念在一些Web場景的應用中是很有用的,比如我們能夠用它來構建一個master-slave結構的分布式爬蟲系統:系統中有一個master節點和多個slave節點,master節點負責向各個slave節點分配爬取任務。
2.3 Publish/Subscribe
但有些時候,我們可能希望一條消息能夠被多個消費者接受到,比如一些公告信息等,這時候用 Work Queue 模型顯然不合適,而 Publish/Subscribe 模型正是對應這種使用場景的。
在介紹Publish/Subscribe之前,我們快速回顧之前的兩個模型,它們好像都是生產者將消息直接發送到消息隊列,但其實不是這樣的,甚至有可能生產者根本就不知道消息發送到了哪一個消息隊列。
先別著急,下面我們完整地介紹RabbitMQ消息發送/接受的方式。
事實上,生產者是把消息發送到了 交換機(exchange) 中,然后交換機負責(決定)將消息發送到(哪一個)消息隊列中。其模型如下圖:

這時候你可能會疑惑:既然消息是被發送到了交換機中,那我們之前發送的消息是被發送到了哪一個交換機中了?它有沒有機制能夠讓特定的消息發送到指定的隊列?
先回答第一個問題。還記得我們在 Hello World 中寫的發送消息的代碼嗎?
channel.basicPublish("", "hello", null, message.getBytes());
事實上第一個參數便是指定交換機的名字,即指定消息被發送到哪一個交換機。空字符串表示 默認交換機(Default Exchange) ,即我們之前發送的消息都是先發送到 默認交換機 ,然后它再路由到相應的隊列中。其實我們可以通過Web頁面去查看所有存在的交換機:

接著回答第二個問題。路由的依據便是通過第二個參數—— 路由鍵(routing key) 指定的,之前已經提到過。在之前代碼中,我們指定第二個參數為"hello",便是指定消息應該被交換機路由到路由鍵為hello的隊列中。而 默認交換機(Default Exchange) 有一個非常有用的性質:
每一個被創建的隊列都會被自動的綁定到默認交換機上,并且路由鍵就是隊列的名字。
交換機還有4種不同的類型,分別是 direct , fanout , topic , headers ,每種類型路由的策略不同。
direct 類型的交換機要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會將此消息路由到路由鍵K = R的隊列。默認交換機便是該類型。因此,在下圖中,消息會沿著綠色箭頭路由:

fanout 類型的交換機會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。
剩下的兩種類型之后再做介紹。
在以上概念基礎上,我們來看第3種消息模型: Publish/Subscribe 。如下圖:

該模型是要讓所有的消費者都能夠接收到每一條消息。顯然, fanout 類型的交換機更符合我們當前的需求。為此,先創建一個 fanout 類型的交換機。
channel.exchangeDeclare("notice", "fanout");
其中,第一個參數是交換機的名稱;第二個參數是交換機的類型。
然后我們可以 send 消息了:
channel.basicPublish( "notice", "", null, message.getBytes());
對于消費者,我們需要為每一個消費者創建一個獨立的隊列,然后將隊列綁定到剛才指定的交換機上即可:
// 該方法會創建一個名稱隨機的臨時隊列
String queueName = channel.queueDeclare().getQueue();
// 將隊列綁定到指定的交換機("notice")上
channel.queueBind(queueName, "notice", "");
以下完整的代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
channel.basicPublish( "notice", "", null, "Hello China".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}</code></pre>
首先運行兩次 receive 方法,讓兩個消費者等待接收消息,然后可以在Web端查看此時的隊列情況,如下圖所示:

可以看到圖中有兩個名稱隨機的隊列。接著運行 send 方法發送一條消息,最終我們會看到兩個消費者都打印出了 Hello China 。然后停止虛擬機讓消費者斷開連接,再次在Web端查看隊列情況,會發現剛才的兩個隊列被自動刪除了。
2.4 Routing
以上是Publish/Subscribe模式,它已經能讓我們的通知(notice)系統正常運轉了。現在再考慮這樣一個新需求:對于一些機密通知,我們只想讓部分人看到。這就要求交換機對綁定在其上的隊列進行篩選,于是引出了又一個新的模型: Routing 。
之前我們說過,對于 direct 類型的交換機,它會根據 routing key 進行路由,因此我們可以借助它來實現我們的需求,模型結構如下圖:

下面用代碼來實現。先看生產者。
首先要聲明一個 direct 類型的交換機:
// 這里名稱改為notice2
channel.exchangeDeclare("notice2", "direct");
需要注意的是, 因為我們之前聲明了一個 fanout 類型的名叫 notice 的交換機,因此不能再聲明一個同名的類型卻不一樣的交換機。
然后可以發送消息了,我們發送10條消息,其中偶數條消息是秘密消息,只能被routing key 為s的隊列接受,其余的消息所有的隊列都能接受。
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
接下來是消費者:
// 聲明一個名稱隨機的臨時的隊列
String queueName = channel.queueDeclare().getQueue();
// 綁定交換機,同時帶上routing key
channel.queueBind(queueName, "notice2", "n");
// 消費者2號運行時,打開以下注釋
// channel.queueBind(queueName, "notice2", "s");
注意,我們可以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。
以下是完整代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice2", "n");
// channel.queueBind(queueName, "notice2", "s");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}</code></pre>
測試時,我們可以先運行一個 receive ,然后打開 channel.queueBind(queueName, "notice2", "s") 注釋,再運行一次 receive ,這樣就有兩個消費者綁定到notice2交換機上,其中消費者1只能收到normal類型的消息,而消費者2既能收到normal類型的消息,又能收到secret類型的消息。接著可以運行send方法。如不出意外,可以看到如下打印結果:
// 消費者1
1
3
5
7
9
// 消費者2
0
1
2
3
4
5
6
7
8
9
2.5 Topics
有了以上的改進,我們的 notice 系統基本ok了。但有些時候,我們還需要更加靈活的消息刷選方式。比如我們對于電影信息,我們可能需要對它的地區,類型,限制級進行篩選。這時候就要借助 Topics 模型了。
在 Topics 模型中,我們“升級”了 routing key ,它可以由多個關鍵詞組成,詞與詞之間由點號( . )隔開。特別地,規定 * 表示任意的一個詞; # 號表示任意的0個或多個詞 。
假設我們現在需要接收電影信息,每條電影消息附帶的 routingKey 有地區、類型、限制級3個關鍵字,即: district.type.age 。現在想實現的功能如下圖:

如上圖所示,隊列Q1只關心美國適合13歲以上的電影信息,隊列Q2對動作片感興趣,而隊列Q3喜歡中國電影。
下面用 Java 代碼去實現上述功能,相較于之前基本上沒有什么改變,下面直接給出代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());
channel.basicPublish("movie", "Chinese.action.13", null, "臥虎藏龍".getBytes());
channel.basicPublish("movie", "Chinese.comedy.13", null, "大話西游".getBytes());
channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯與祝英臺".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
// 隊列1
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "movie", "American..13");
// 隊列2
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", ".action.*");
// 隊列3
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", "Chinese.#");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}</code></pre>
運行3次 receive 方法,注意打開或關閉相應的注釋;再運行 send 方法,可以看到控制臺輸出如下內容:
// 消費者1
The Bourne Ultimatum
Titanic
// 消費者2
The Bourne Ultimatum
臥虎藏龍
// 消費者3
臥虎藏龍
大話西游
梁山伯與祝英臺
2.6 RPC
第6種模型是用來做RPC(Remote procedure call, 遠程程序調用)的。這里直接貼上代碼,就不做解釋了 。代碼演示的是,客戶端調用服務端的 fib 方法,得到返回結果。
RPCServer.java
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
- Description:
*
- @author derker
- @Time 2016-10-26 18:24
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
AMQP.BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}</code></pre>
RPCClient.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
/**
- Description:
*
- @author derker
@Time 2016-10-26 18:36
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(10)");
response = fibonacciRpc.call("10");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}</code></pre>
來自:http://www.cnblogs.com/dongkuo/p/6001791.html