RabbitMQ消息隊列:發布/訂閱(Publish/Subscribe)

jopen 8年前發布 | 39K 次閱讀 RabbitMQ 消息系統

        前面我們把每個Message都是deliver到某個單一的Consumer。今天我們將了解如何把同一個Message deliver到多個Consumer中。這個模式也被稱作 "publish / subscribe"。
    首先我們將創建一個日志系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到并打印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。

 

1. (轉發器)Exchanges

    關于exchange的概念在在這里做一下簡單的介紹。

    RabbitMQ 的Messaging Model就是Producer并不會直接發送Message到queue。實際上,Producer并不知道它發送的Message是否已經到達queue。

   RabbitMQ消息模型的核心理念是生產者永遠不會直接發送給任何的消息隊列,一般情況下Producer是不知道消息應該發送到那個隊列的。Producer發送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然后投遞到queue中。Exchange需要知道如何處理Message,是把它放到某個queue中,還是放到多個queue中?這個rule是通過Exchange 的類型定義的。

   我們知道有三種類型的Exchange:direct, topic ,Headers和fanout。fanout就是廣播模式,會將所有的Message都放到它所知道的queue中。創建一個名字為logs,類型為fanout的Exchange:

channel.exchange_declare(exchange='logs',type='fanout');

fanout類型轉發器特別簡單,吧所有他接受到的消息,廣播多有的他知道的隊列。

前面說到的生產者只能發送詳細給轉發器(Exchange),但是我們之前的例子中并沒有使用到轉發器啊,我們仍然可以發送和接收消息,這是為什么呢?是匿名轉發器(nameless exchange)搞的鬼。因為我們使用了一個默認轉發器。他的標識為" ".

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一個參數為轉發器名,第二個為消息隊列名,如果不為空由其決定發送到那個隊列中。

現在我們可以指定消息發送到轉發器中。

channel.basicPublish( "logs","", null, message.getBytes());

Listing exchanges

通過rabbitmqctl可以列出當前所有的Exchange:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默認創建的。

現在我們可以通過exchange,而不是routing_key來publish Message了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

2. 臨時隊列(Temporary queues)

    截至現在,我們用的queue都是有名字的,能夠為隊列命名對我們來說很關鍵。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。

    但是對于我們將要構建的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
    1) 每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。

    2)當Consumer關閉連接時,這個queue要被deleted。 

 

String queueName = channel.queueDeclare().getQueue();

通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj1wLg

3. 綁定Bindings

現在我們已經創建了fanout類型的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。那exchange怎么樣知道它的Message發送到哪個queue呢?答案就是通過bindings:綁定。

channel.queueBind(queueName, “logs”, ””)參數1:隊列名稱 ;參數2:轉發器名稱

現在logs的exchange就將它的Message附加到我們創建的queue了。Listing bindings

使用命令rabbitmqctl list_bindings。

4. 最終版本

    我們最終實現的數據流圖如下:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] args) throws IOException
 {
  // 創建連接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 聲明轉發器和類型
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
  
  String message = new Date().toLocaleString()+" : log something";
  // 往轉發器上發送消息
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

  System.out.println(" [x] Sent '" + message + "'");

  channel.close();
  connection.close();

 }

}

 

 接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToSave
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 創建連接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 創建一個非持久的、唯一的且自動刪除的隊列
  String queueName = channel.queueDeclare().getQueue();
  // 為轉發器指定隊列,設置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二個參數為自動應答,無需手動應答
  channel.basicConsume(queueName, true, consumer);

  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

   print2File(message);
  }

 }

 private static void print2File(String msg)
 {
  try
  {
   String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
   String logFileName = new SimpleDateFormat("yyyy-MM-dd")
     .format(new Date());
   File file = new File(dir, logFileName+".txt");
   FileOutputStream fos = new FileOutputStream(file, true);
   fos.write((msg + "\r\n").getBytes());
   fos.flush();
   fos.close();
  } catch (FileNotFoundException e)
  {
   e.printStackTrace();
  } catch (IOException e)
  {
   e.printStackTrace();
  }
 }
}

 

接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToConsole
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 創建連接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 創建一個非持久的、唯一的且自動刪除的隊列
  String queueName = channel.queueDeclare().getQueue();
  // 為轉發器指定隊列,設置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二個參數為自動應答,無需手動應答
  channel.basicConsume(queueName, true, consumer);

  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
   System.out.println(" [x] Received '" + message + "'");

  }

 }

}

來自: http://my.oschina.net/u/267665/blog/547369

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!