[RabbitMQ]隊列
來自: http://www.cnblogs.com/w1991/p/5178701.html
RabbitMQ是一個message broker,本質上就是接收producer的消息,傳遞給consumer,其中可以根據給定的需要設置消息路由、緩存、持久化。
簡單隊列
最簡單的隊列如下圖,producer將消息推送到queue中,queue將消息傳遞給consumer。可以有多個producer向同一個queue推送消息,也可以有多個consumer從queue接收消息。
用Java開發producer和consumer需要下載 Java client library 。
producer代碼如下:
// Send.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "SimpleQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
} consumer代碼如下:
// Recv.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "SimpleQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
} 編譯并運行:
javac -cp rabbitmq-client.jar Send.java Recv.java java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
運行結果如下:
任務隊列(task quque)
任務隊列是queue將消息分發給多個consumer進行處理。
// Task.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Task {
private final static String QUEUE_NAME = "TaskQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = argv[0];
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}// Worker.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private final static String QUEUE_NAME = "TaskQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
private static void doWork(String task) throws InterruptedException {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
} 編譯代碼:
javac -cp rabbitmq-client.jar Task.java Worker.java
分別打開三個console,其中兩個運行Worker:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker
另一個console運行Task發送消息:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Task msg
運行結果如下:
使用任務隊列可以實現并行任務處理。默認情況下,RabbitMQ會按消息的順序依次分發給consumer,也就是說平均每個consumer會接收相同數量的消息。這種消息分發方式稱為循環調度(round-robin dispatching)。
消息反饋(message acknowledgment)
如果consumer在執行某個消息的任務過程中失敗了,會怎么樣呢?在當前模式下,RabbitMQ一旦將消息發送給了consumer就從內存中刪除該消息,這種情況下,如果殺掉了worker,那么分配給該worker的未處理的和正在處理中的消息就會丟失。但是我們肯定不希望任務丟失,我們希望一個worker掛掉之后,它未處理完成的任務將分配給其他worker處理。
為了確保消息不會丟失,RabbitMQ支持消息反饋。消息反饋就是consumer告訴RabbitMQ某個消息已經被接收并處理完成了,可以從隊列中刪除了。
如果一個consumer掛掉了(channel或connection被關閉,或者TCP連接丟失),沒有發送反饋(ack),那么RabbitMQ就知道消息沒有被處理完成,就把它重新放入隊列。這時如果有其他consumer在線,那么就把這個消息重新分發。這樣就確保在worker掛掉時消息也不會丟失了。
消息處理是沒有超時機制的。RabbitMQ會在consumer掛掉時才重新分發消息。因此,即使消息處理時間很長很長都沒有關系。
默認消息反饋是開啟的。在上面的例子中通過指定 autoAck=true 關閉了反饋。消息反饋的代碼如下:
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}; PS:可以使用rabbitmqctl查看未反饋的消息。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
之前的消息反饋可以確保consumer掛掉時消息不會丟失,但是如果RabbitMQ服務器停止或者崩潰了,隊列和其中的消息就都丟失了。要確保服務器上的消息不丟失,需要將隊列和消息都標識成持久化的。
首先,確保RabbitMQ不會丟失隊列:
boolean durable = true;
channel.queueDeclare("TaskQueue", durable, false, false, null); 盡管這代碼是正確的,但是目前的環境下卻不起作用。因為之前已經聲明了一個非持久化的隊列TaskQueue。RabbitMQ不允許用不同的參數重新定義已經存在的隊列。換一個新的隊列名稱就行了。
boolean durable = true;
channel.queueDeclare("DurableTaskQueue", durable, false, false, null); 這樣RabbitMQ重啟后隊列DurableTaskQueue是仍然存在的。接下來需要將消息標識成持久化的,通過設置屬性MessageProperties為PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes()); PS:將消息標識成持久化不能完全保證消息不會丟失。RabbitMQ在接收到消息寫入磁盤之間仍有很短的時間間隔,而且RabbitMQ不會每次保存消息時都調用fsync,因此可能消息只被寫入了緩存而還沒有寫入磁盤。如果需要更強的持久化方案,可以使用publisher confirm。
公平分配(fair dispatch)
默認的循環調度方案可能會造成負載不平均。例如,有兩個worker,偶數號的任務處理很耗時,而奇數號的任務處理很快,那么就會導致一個worker處理很慢,積壓很多任務,而另一個worker處理很快,比較空閑。這是因為RabbitMQ在消息進入隊列后就立刻分發出去,它不關心consumer中未反饋的消息數量,只是簡單地將第n個消息分發給第n個consumer。
為了讓每個consumer的負載均勻,通過basicQos方法設置參數prefetchCount為1。這樣,consumer中的消息將不超過一條,當consumer處理完并發送反饋后RabbitMQ才會向它分發新的消息。RabbitMQ在接收到新消息后,如果有空閑的consumer就將消息發給它,否則就一直等待有空閑的consumer再分發。
int prefetchCount = 1; channel.basicQos(prefetchCount);
PS:如果所有的任務都很耗時,worker都很忙,那么可能會導致隊列被塞滿,這時需要增加worker,或者采用其他策略。
最后的完整代碼如下:
// Task.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Task {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = argv[0];
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}// Worker.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1;
channel.basicQos(prefetchCount);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
} 參考資料: