消息隊列——RabbitMQ

25r9n4qy8 8年前發布 | 34K 次閱讀 RabbitMQ 消息系統

1. 寫在前面

昨天簡單學習了一個 消息隊列 項目—— RabbitMQ ,今天趁熱打鐵,將學到的東西記錄下來。

學習的資料主要是官網給出的6個基本的消息發送/接收 模型 ,或者稱為6種不同的使用場景,本文便是對這6種模型加以敘述。

2. Tutorials

在學習6種模型之前,我們首先需要安裝RabbitMQ。RabbitMQ支持多種系統平臺,各平臺的安裝方法。安裝好之后,我們使用如下命令啟用Web端的管理插件: rabbitmq-plugins enable rabbitmq_management ,然后啟動RabbitMQ。接著用瀏覽器訪問 http://localhost:15672/ ,若能看到RabbitMQ相關Web頁面,說明啟動成功。

2.1 Hello World

正所謂萬事開頭難,我們先從最簡單的 Hello World 開始。首先當然是新建一個項目,導入RabiitMQ相關jar。我采用Maven來構建項目,因此只需要在pom文件中添加如下依賴:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

接下來學習最簡單的消息隊列模型,如下圖:

在圖中,P代表 producer,它是消息的 生產者;C代表 consumer ,它是消息的 消費者 ;而紅色的矩形正是我們所謂的 消息隊列 ,它位于 RabbitMQ 中( RabbitMQ 中可以有很多這樣的隊列,并且每個隊列都有一個唯一的名字)。生產者(們)可以將消息發送到消息隊列中,消費者(們)可以從消息隊列中取出消息。

這種模型是不是很簡單呢?下面我們使用 Java ,借助于RabbitMQ來實現這種模型的消息通信。

首先我們介紹如何 send 消息到消息隊列。 send 之前,當然是和RabbitMQ服務器建立連接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

接下來我們創建一個 channel ,大多數API都是通過這個對象來調用的:

Channel channel = connection.createChannel();

之后,我們便可以調用 channel 的如下方法去聲明一個隊列:

channel.queueDeclare("hello", false, false, false, null);

該方法的第一個參數是隊列的名稱,其余的參數先不管,之后會介紹。我們可以嘗試著去執行以上的5行代碼,然后打開Web端,可以看到新建了一個叫作 hello 的隊列:

有了隊列,我們便可以向其中發送消息了,同樣還是調用 channel 對象的API:

channel.basicPublish("", "hello", null, "Hello World".getBytes());

以上代碼所做的事情就是發送了一條字符串消息“Hello World”(第4個參數)到消息隊列。你可能注意到我們調用了String對象的 getBytes 方法,沒錯,我們發送的實際上二進制數據。因此,理論上你能夠發送任何數據到消息隊列中,而不僅僅是文本信息。

第2個參數叫做 路由鍵(routingKey) ,在該模型下必須與隊列名相同,至于為什么,和其他參數一樣,之后會了解到。

我們可以修改發送的文本,再次執行上述代碼,然后打開Web端查看,便可以查看到我們發送的消息:

點擊上圖的 name 字段下的 hello ,可以查看 hello 隊列中的具體信息:

接下來,我們去嘗試著去獲取 生產者 發送的消息,和 send 方法一樣,我們同樣需要連接服務器,創建 channel ,聲明隊列:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);

之后我們可以調用 channel 的相關方法去監聽隊列,接收消息:

channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }
});

以上 basicConsume 方法中,第一個參數是隊列的名字;第二個參數表示是否自動確認消息的接收情況,我們使用true,自動確認;第三個參數需要傳入一個實現了 Consumer 接口的對象,我們簡單的 new 一個默認的 Consumer 的實現類 DefaultConsumer ,然后在 handleDelivery 方法中去處理接收到的消息( handleDelivery 方法會在接收到消息時被回調)。

運行以上代碼,我們可以打印出之前向隊列中 send 的數據:

Hello World
Hello World2

下面是 Hello World 的完整代碼:

public class App {

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    channel.basicPublish("", "hello", null, "Hello World2".getBytes());

    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }

    });
    synchronized (this){
        // 因為以上接收消息的方法是異步的(非阻塞),當采用單元測試方式執行該方法時,程序會在打印消息前結束,因此使用wait來防止程序提前終止。若使用main方法執行,則不需要擔心該問題。
        wait();
    }
}

}</code></pre>

2.2 Work queues

接下來我們學習第二種模型—— Work Queues 。顧名思義,這種模型描述的是一個生產者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務),如下圖所示:

下面我們用代碼去實現。先是生產者 send 消息到隊列,這次我們多發送些數據:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

for (int i = 0; i < 9; i++) {
    channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
}

channel.close();
connection.close();

}</code></pre>

然后是消費者接收數據:

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

channel.basicConsume("hello", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
        try {
        //  Thread.sleep(1000);
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
synchronized (this) {
    wait();
}

}</code></pre>

代碼基本上和 Hello World 的代碼一樣,只是加上句 sleep 來模擬消費者(worker)處理消息所花的時間。

我們可以先執行三次 receive 方法(修改 sleep 的時間,其中消費者1 sleep 10s,消費者2,3 sleep 1s),讓三個消費者(worker)一起等待消息的到來,然后執行 send 方法發送9條消息,觀察三個消費者收到的消息情況。

若不出意外,你會看到如下的打印結果:

// 消費者1
0
// 10s 后
3
// 10s 后
6
// 消費者2
1
// 1s 后
4
// 1s 后
7
// 消費者3
2
// 1s 后
5
// 1s 后
8

通過打印結果,我們可以總結出 Work queues 的幾個特點:

  1. 一條消息只會被一個消費者接收;
  2. 消息是平均分配給消費者的;
  3. 消費者只有在處理完某條消息后,才會收到下一條消息。

Work queues(Task Queuess)的概念在一些Web場景的應用中是很有用的,比如我們能夠用它來構建一個master-slave結構的分布式爬蟲系統:系統中有一個master節點和多個slave節點,master節點負責向各個slave節點分配爬取任務。

2.3 Publish/Subscribe

但有些時候,我們可能希望一條消息能夠被多個消費者接受到,比如一些公告信息等,這時候用 Work Queue 模型顯然不合適,而 Publish/Subscribe 模型正是對應這種使用場景的。

在介紹Publish/Subscribe之前,我們快速回顧之前的兩個模型,它們好像都是生產者將消息直接發送到消息隊列,但其實不是這樣的,甚至有可能生產者根本就不知道消息發送到了哪一個消息隊列。

先別著急,下面我們完整地介紹RabbitMQ消息發送/接受的方式。

事實上,生產者是把消息發送到了 交換機(exchange) 中,然后交換機負責(決定)將消息發送到(哪一個)消息隊列中。其模型如下圖:

這時候你可能會疑惑:既然消息是被發送到了交換機中,那我們之前發送的消息是被發送到了哪一個交換機中了?它有沒有機制能夠讓特定的消息發送到指定的隊列?

先回答第一個問題。還記得我們在 Hello World 中寫的發送消息的代碼嗎?

channel.basicPublish("", "hello", null, message.getBytes());

事實上第一個參數便是指定交換機的名字,即指定消息被發送到哪一個交換機。空字符串表示 默認交換機(Default Exchange) ,即我們之前發送的消息都是先發送到 默認交換機 ,然后它再路由到相應的隊列中。其實我們可以通過Web頁面去查看所有存在的交換機:

接著回答第二個問題。路由的依據便是通過第二個參數—— 路由鍵(routing key) 指定的,之前已經提到過。在之前代碼中,我們指定第二個參數為"hello",便是指定消息應該被交換機路由到路由鍵為hello的隊列中。而 默認交換機(Default Exchange) 有一個非常有用的性質:

每一個被創建的隊列都會被自動的綁定到默認交換機上,并且路由鍵就是隊列的名字。

交換機還有4種不同的類型,分別是 direct , fanout , topic , headers ,每種類型路由的策略不同。

direct 類型的交換機要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會將此消息路由到路由鍵K = R的隊列。默認交換機便是該類型。因此,在下圖中,消息會沿著綠色箭頭路由:

fanout 類型的交換機會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。

剩下的兩種類型之后再做介紹。

在以上概念基礎上,我們來看第3種消息模型: Publish/Subscribe 。如下圖:

該模型是要讓所有的消費者都能夠接收到每一條消息。顯然, fanout 類型的交換機更符合我們當前的需求。為此,先創建一個 fanout 類型的交換機。

channel.exchangeDeclare("notice", "fanout");

其中,第一個參數是交換機的名稱;第二個參數是交換機的類型。

然后我們可以 send 消息了:

channel.basicPublish( "notice", "", null, message.getBytes());

對于消費者,我們需要為每一個消費者創建一個獨立的隊列,然后將隊列綁定到剛才指定的交換機上即可:

// 該方法會創建一個名稱隨機的臨時隊列
String queueName = channel.queueDeclare().getQueue();
// 將隊列綁定到指定的交換機("notice")上
channel.queueBind(queueName, "notice", "");

以下完整的代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice", "fanout");
    channel.basicPublish( "notice", "", null, "Hello China".getBytes());
    channel.close();
    connection.close();
}

@Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "notice", "");

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
    }

});
synchronized (this) {
    wait();
}

}</code></pre>

首先運行兩次 receive 方法,讓兩個消費者等待接收消息,然后可以在Web端查看此時的隊列情況,如下圖所示:

可以看到圖中有兩個名稱隨機的隊列。接著運行 send 方法發送一條消息,最終我們會看到兩個消費者都打印出了 Hello China 。然后停止虛擬機讓消費者斷開連接,再次在Web端查看隊列情況,會發現剛才的兩個隊列被自動刪除了。

2.4 Routing

以上是Publish/Subscribe模式,它已經能讓我們的通知(notice)系統正常運轉了。現在再考慮這樣一個新需求:對于一些機密通知,我們只想讓部分人看到。這就要求交換機對綁定在其上的隊列進行篩選,于是引出了又一個新的模型: Routing

之前我們說過,對于 direct 類型的交換機,它會根據 routing key 進行路由,因此我們可以借助它來實現我們的需求,模型結構如下圖:

下面用代碼來實現。先看生產者。

首先要聲明一個 direct 類型的交換機:

// 這里名稱改為notice2
channel.exchangeDeclare("notice2", "direct");

需要注意的是, 因為我們之前聲明了一個 fanout 類型的名叫 notice 的交換機,因此不能再聲明一個同名的類型卻不一樣的交換機。

然后可以發送消息了,我們發送10條消息,其中偶數條消息是秘密消息,只能被routing key 為s的隊列接受,其余的消息所有的隊列都能接受。

for (int i = 0; i < 10; i++) {
            String routingKey = "n"; // normal
            if (i % 2 == 0) {
                routingKey = "s"; // secret
            }
            channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
        }

接下來是消費者:

// 聲明一個名稱隨機的臨時的隊列
String queueName = channel.queueDeclare().getQueue();
// 綁定交換機,同時帶上routing key
channel.queueBind(queueName, "notice2", "n");
// 消費者2號運行時,打開以下注釋
// channel.queueBind(queueName, "notice2", "s");

注意,我們可以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。

以下是完整代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

channel.exchangeDeclare("notice2", "direct");
for (int i = 0; i < 10; i++) {
    String routingKey = "n"; // normal
    if (i % 2 == 0) {
        routingKey = "s"; // secret
    }
    channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();

}

@Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice2", "direct");

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice2", "n");
// channel.queueBind(queueName, "notice2", "s");

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
    }
});
synchronized (this) {
    wait();
}

}</code></pre>

測試時,我們可以先運行一個 receive ,然后打開 channel.queueBind(queueName, "notice2", "s") 注釋,再運行一次 receive ,這樣就有兩個消費者綁定到notice2交換機上,其中消費者1只能收到normal類型的消息,而消費者2既能收到normal類型的消息,又能收到secret類型的消息。接著可以運行send方法。如不出意外,可以看到如下打印結果:

// 消費者1
1
3
5
7
9
// 消費者2
0
1
2
3
4
5
6
7
8
9

2.5 Topics

有了以上的改進,我們的 notice 系統基本ok了。但有些時候,我們還需要更加靈活的消息刷選方式。比如我們對于電影信息,我們可能需要對它的地區,類型,限制級進行篩選。這時候就要借助 Topics 模型了。

Topics 模型中,我們“升級”了 routing key ,它可以由多個關鍵詞組成,詞與詞之間由點號( . )隔開。特別地,規定 * 表示任意的一個詞; # 號表示任意的0個或多個詞

假設我們現在需要接收電影信息,每條電影消息附帶的 routingKey 有地區、類型、限制級3個關鍵字,即: district.type.age 。現在想實現的功能如下圖:

如上圖所示,隊列Q1只關心美國適合13歲以上的電影信息,隊列Q2對動作片感興趣,而隊列Q3喜歡中國電影。

下面用 Java 代碼去實現上述功能,相較于之前基本上沒有什么改變,下面直接給出代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

channel.exchangeDeclare("movie", "topic");

channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());

channel.basicPublish("movie", "Chinese.action.13", null, "臥虎藏龍".getBytes());
channel.basicPublish("movie", "Chinese.comedy.13", null, "大話西游".getBytes());
channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯與祝英臺".getBytes());

channel.close();
connection.close();

}

@Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("movie", "topic"); // 隊列1 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "movie", "American..13"); // 隊列2 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", ".action.*"); // 隊列3 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", "Chinese.#");

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
    }

});
synchronized (this) {
    wait();
}

}</code></pre>

運行3次 receive 方法,注意打開或關閉相應的注釋;再運行 send 方法,可以看到控制臺輸出如下內容:

// 消費者1
The Bourne Ultimatum
Titanic
// 消費者2
The Bourne Ultimatum
臥虎藏龍
// 消費者3
臥虎藏龍
大話西游
梁山伯與祝英臺

2.6 RPC

第6種模型是用來做RPC(Remote procedure call, 遠程程序調用)的。這里直接貼上代碼,就不做解釋了 。代碼演示的是,客戶端調用服務端的 fib 方法,得到返回結果。

RPCServer.java

import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**

  • Description: *
  • @author derker
  • @Time 2016-10-26 18:24 */ public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n - 1) + fib(n - 2);
}

public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            String response = null;
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            AMQP.BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();
            try {
                String message = new String(delivery.getBody(), "UTF-8");
                int n = Integer.parseInt(message);
                System.out.println(" [.] fib(" + message + ")");
                response = "" + fib(n);
            } catch (Exception e) {
                System.out.println(" [.] " + e.toString());
                response = "";
            } finally {
                channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception ignore) {
            }
        }
    }
}

}</code></pre>

RPCClient.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

/**

  • Description: *
  • @author derker
  • @Time 2016-10-26 18:36 */ public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer;

    public RPCClient() throws Exception {

     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     connection = factory.newConnection();
     channel = connection.createChannel();
    
     replyQueueName = channel.queueDeclare().getQueue();
     consumer = new QueueingConsumer(channel);
     channel.basicConsume(replyQueueName, true, consumer);
    

    }

    public String call(String message) throws Exception {

     String response = null;
     String corrId = UUID.randomUUID().toString();
    
     BasicProperties props = new BasicProperties
             .Builder()
             .correlationId(corrId)
             .replyTo(replyQueueName)
             .build();
    
     channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    
     while (true) {
         QueueingConsumer.Delivery delivery = consumer.nextDelivery();
         if (delivery.getProperties().getCorrelationId().equals(corrId)) {
             response = new String(delivery.getBody(), "UTF-8");
             break;
         }
     }
    
     return response;
    

    }

    public void close() throws Exception {

     connection.close();
    

    }

    public static void main(String[] argv) {

     RPCClient fibonacciRpc = null;
     String response = null;
     try {
         fibonacciRpc = new RPCClient();
         System.out.println(" [x] Requesting fib(10)");
         response = fibonacciRpc.call("10");
         System.out.println(" [.] Got '" + response + "'");
     } catch (Exception e) {
         e.printStackTrace();
     } finally {
         if (fibonacciRpc != null) {
             try {
                 fibonacciRpc.close();
             } catch (Exception ignore) {
             }
         }
     }
    

    } }</code></pre>

     

    來自:http://www.cnblogs.com/dongkuo/p/6001791.html

     

 本文由用戶 25r9n4qy8 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!