RabbitMQ之路由選擇

ymc4 9年前發布 | 25K 次閱讀 RabbitMQ 消息系統

上篇博客我們建立了一個簡單的日志系統,我們能夠廣播日志消息給所有你的接收者。本篇博客我們準備給日志系統添加新的特性,讓日志接收者能夠訂閱部分消息。例如,我們可以僅僅將致命的錯誤寫入日志文件,然而仍然在控制臺上打印出所有的其他類型的日志消息。

1. 綁定(Bindings)

在上一篇博客中我們已經使用過綁定。類似下面的代碼:

channel.queueBind(queueName,EXCHANGE_NAME,"");

綁定表示轉發器與隊列之間的關系。我們也可以簡單的認為:隊列對該轉發器上的消息感興趣。

綁定可以附帶一個額外的參數routingKey。為了避免與basicPublish方法(發布消息的方法)的參數混淆,我們準備把它稱作綁定鍵(Binding key)。下面展示如何使用綁定鍵(binding key)來創建一個綁定:

channel.queueBind(queueName,EXCHANGE_NAME,"black");

綁定鍵的意義依賴于轉發器的類型。對于fanout類型,忽略此參數

2. 直接轉發(Direct exchange)

上一篇的日志系統廣播所有的消息給所有的消費者。我們希望可以對其擴展,來允許根據日志的嚴重性進行過濾日志。例如:我們可能希望把致命類型的錯誤寫入硬盤,而不把硬盤空間浪費在警告或者消息類型的日志上。之前我們使用fanout類型的轉發器,但是并沒有給我們帶來更多的靈活性,僅僅可以愚蠢的轉發。

我們將會使用direct類型轉發器進行替代。direct類型的轉發器背后的路由轉發算法很簡單:消息會被推送至綁定鍵(binding key)和消息發布附帶的選擇鍵(routing key)完全匹配的隊列。圖解:

RabbitMQ之路由選擇

上圖,我們可以看到direct類型的轉發器與兩個隊列綁定。第一個隊列與綁定鍵orange綁定,第二個隊列與轉發器間有兩個綁定,一個與綁定鍵black綁定,另一個與green綁定鍵綁定。

這樣的話,當一個消息附帶一個選擇鍵(routing key)orange發布至轉發器將會被導向到隊列Q1。消息附帶一個選擇器(routing key)black或者green將會被導向到Q2,其余所有的其他消息將會被丟棄。

3. 多重綁定(multiple bindings)

RabbitMQ之路由選擇

使用一個綁定鍵(binding key)綁定多個隊列是完全合法的。如上圖,一個附帶選擇鍵(routing key)的消息將會被轉發到Q1和Q2。

4. 發送日志(Emittinglogs)

我們準備將這種模式用于我們的日志系統。我們將消息發送到direct類型的轉發器而不是fanout類型。我們將把日志的嚴重性作為選擇鍵(routing key)。這樣的話,接收程序可以根據嚴重性來選擇接收。我們首先關注發送日志的代碼。

像以前一樣,我們需要先創建一個轉發器:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

然后,我們準備發送一條消息:

channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());

為了簡化代碼,我們假定'severity'是"info",“warning”,“error”中的一個。

5. 訂閱

接收消息的代碼和前面的博客中的類似,只有一點不同:我們給我們所感興趣的嚴重性類型的日志創建一個綁定。

String queueName = channel.queueDeclare().getQueue();
for(String severity : args){
    channel.queueBind(queueName,EXCHANGE_NAME,severity);
}

6. 完整的實例

RabbitMQ之路由選擇

發送端:EmitLogDirect.java:

package cc.openscanner;

import java.util.Random;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "ex_logs_direct";
    private static final String[] SEVERITIES = {"info","warning","error"};
    public static void main(String[] args) throws java.io.IOException {
        //創建連接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明轉發器的類型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //發送6條消息
        for(int i=0;i<6;i++){
            String severity = getSeverity();
            String message = severity + "_log : " + UUID.randomUUID().toString();
            //發布消息至轉發器,指定routing key
            channel.basicPublish(EXCHANGE_NAME, severity, null,message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");  
        }
        channel.close();
        connection.close();
    }
    private static String getSeverity(){
        Random random = new Random();
        int ranVal = random.nextInt(3);
        return SEVERITIES[ranVal];
    }
}

隨機發送6條隨機類型(routing key)的日志給轉發器。

接收端:ReceivelLogsDirect.java:

package cc.openscanner;

import java.util.Random;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "ex_logs_direct";
    private static final String[] SEVERITIES = {"info","warning","error"};
    public static void main(String[] args) 
            throws java.io.IOException,java.lang.InterruptedException{
        //創建連接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明direct類型轉發器
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        String queueName = channel.queueDeclare().getQueue();
        String severity = getSeverity();
        //指定binding key
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
        System.out.println(" [*] Waiting for " + severity + 
            " logs. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
    private static String getSeverity(){
        Random random = new Random();
        int ranVal = random.nextInt(3);
        return SEVERITIES[ranVal];
    }
}

接收端隨機設置一個日志嚴重級別(binding key)。

我開啟了3個接收端程序,兩個準備接收error類型日志,一個接收info類型日志,然后運行發送端程序運行結果:

[x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
 [x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'

------------------------------------------------------------------------------------

 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'

------------------------------------------------------------------------------------

 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'

------------------------------------------------------------------------------------

 [*] Waiting for info logs. To exit press CTRL+C
 [x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'

注意,使用同一個binding key綁定多個隊列,多個隊列都會收到該消息(message),如上例!可以看到,我們實現了博客開頭所描述的特性,接收者可以自定義自己感興趣類型的日志。

其實文章這么長就在說:發送消息時可以設置routing key,接收隊列與轉發器間可以設置binding key,接收者接收routing key與binding key相同的消息

 本文由用戶 ymc4 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!