[RabbitMQ]隊列

cbuewxn83 8年前發布 | 23K 次閱讀 RabbitMQ 消息系統

來自: http://www.cnblogs.com/w1991/p/5178701.html

RabbitMQ是一個message broker,本質上就是接收producer的消息,傳遞給consumer,其中可以根據給定的需要設置消息路由、緩存、持久化。

簡單隊列

最簡單的隊列如下圖,producer將消息推送到queue中,queue將消息傳遞給consumer。可以有多個producer向同一個queue推送消息,也可以有多個consumer從queue接收消息。

用Java開發producer和consumer需要下載 Java client library

producer代碼如下:

// Send.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

  private final static String QUEUE_NAME = "SimpleQueue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}

consumer代碼如下:

// Recv.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {

  private final static String QUEUE_NAME = "SimpleQueue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
  }
}

編譯并運行:

javac -cp rabbitmq-client.jar Send.java Recv.java

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

運行結果如下:

任務隊列(task quque)

任務隊列是queue將消息分發給多個consumer進行處理。

// Task.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Task {

  private final static String QUEUE_NAME = "TaskQueue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = argv[0];
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}
// Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private final static String QUEUE_NAME = "TaskQueue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");

        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
        }
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
  }

  private static void doWork(String task) throws InterruptedException {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException _ignored) {
      Thread.currentThread().interrupt();
    }
  }
}

編譯代碼:

javac -cp rabbitmq-client.jar Task.java Worker.java

分別打開三個console,其中兩個運行Worker:

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker

另一個console運行Task發送消息:

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Task msg

運行結果如下:

使用任務隊列可以實現并行任務處理。默認情況下,RabbitMQ會按消息的順序依次分發給consumer,也就是說平均每個consumer會接收相同數量的消息。這種消息分發方式稱為循環調度(round-robin dispatching)。

消息反饋(message acknowledgment)

如果consumer在執行某個消息的任務過程中失敗了,會怎么樣呢?在當前模式下,RabbitMQ一旦將消息發送給了consumer就從內存中刪除該消息,這種情況下,如果殺掉了worker,那么分配給該worker的未處理的和正在處理中的消息就會丟失。但是我們肯定不希望任務丟失,我們希望一個worker掛掉之后,它未處理完成的任務將分配給其他worker處理。

為了確保消息不會丟失,RabbitMQ支持消息反饋。消息反饋就是consumer告訴RabbitMQ某個消息已經被接收并處理完成了,可以從隊列中刪除了。

如果一個consumer掛掉了(channel或connection被關閉,或者TCP連接丟失),沒有發送反饋(ack),那么RabbitMQ就知道消息沒有被處理完成,就把它重新放入隊列。這時如果有其他consumer在線,那么就把這個消息重新分發。這樣就確保在worker掛掉時消息也不會丟失了。

消息處理是沒有超時機制的。RabbitMQ會在consumer掛掉時才重新分發消息。因此,即使消息處理時間很長很長都沒有關系。

默認消息反饋是開啟的。在上面的例子中通過指定 autoAck=true 關閉了反饋。消息反饋的代碼如下:

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

PS:可以使用rabbitmqctl查看未反饋的消息。

rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化

之前的消息反饋可以確保consumer掛掉時消息不會丟失,但是如果RabbitMQ服務器停止或者崩潰了,隊列和其中的消息就都丟失了。要確保服務器上的消息不丟失,需要將隊列和消息都標識成持久化的。

首先,確保RabbitMQ不會丟失隊列:

boolean durable = true;
channel.queueDeclare("TaskQueue", durable, false, false, null);

盡管這代碼是正確的,但是目前的環境下卻不起作用。因為之前已經聲明了一個非持久化的隊列TaskQueue。RabbitMQ不允許用不同的參數重新定義已經存在的隊列。換一個新的隊列名稱就行了。

boolean durable = true;
channel.queueDeclare("DurableTaskQueue", durable, false, false, null);

這樣RabbitMQ重啟后隊列DurableTaskQueue是仍然存在的。接下來需要將消息標識成持久化的,通過設置屬性MessageProperties為PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

PS:將消息標識成持久化不能完全保證消息不會丟失。RabbitMQ在接收到消息寫入磁盤之間仍有很短的時間間隔,而且RabbitMQ不會每次保存消息時都調用fsync,因此可能消息只被寫入了緩存而還沒有寫入磁盤。如果需要更強的持久化方案,可以使用publisher confirm。

公平分配(fair dispatch)

默認的循環調度方案可能會造成負載不平均。例如,有兩個worker,偶數號的任務處理很耗時,而奇數號的任務處理很快,那么就會導致一個worker處理很慢,積壓很多任務,而另一個worker處理很快,比較空閑。這是因為RabbitMQ在消息進入隊列后就立刻分發出去,它不關心consumer中未反饋的消息數量,只是簡單地將第n個消息分發給第n個consumer。

為了讓每個consumer的負載均勻,通過basicQos方法設置參數prefetchCount為1。這樣,consumer中的消息將不超過一條,當consumer處理完并發送反饋后RabbitMQ才會向它分發新的消息。RabbitMQ在接收到新消息后,如果有空閑的consumer就將消息發給它,否則就一直等待有空閑的consumer再分發。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

PS:如果所有的任務都很耗時,worker都很忙,那么可能會導致隊列被塞滿,這時需要增加worker,或者采用其他策略。

最后的完整代碼如下:

// Task.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Task {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = argv[0];

    channel.basicPublish("", TASK_QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
// Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
  }

  private static void doWork(String task) {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException _ignored) {
      Thread.currentThread().interrupt();
    }
  }
}

參考資料:

 本文由用戶 cbuewxn83 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!