[RabbitMQ]隊列
來自: http://www.cnblogs.com/w1991/p/5178701.html
RabbitMQ是一個message broker,本質上就是接收producer的消息,傳遞給consumer,其中可以根據給定的需要設置消息路由、緩存、持久化。
簡單隊列
最簡單的隊列如下圖,producer將消息推送到queue中,queue將消息傳遞給consumer。可以有多個producer向同一個queue推送消息,也可以有多個consumer從queue接收消息。
用Java開發producer和consumer需要下載 Java client library 。
producer代碼如下:
// Send.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "SimpleQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
consumer代碼如下:
// Recv.java import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private final static String QUEUE_NAME = "SimpleQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
編譯并運行:
javac -cp rabbitmq-client.jar Send.java Recv.java java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
運行結果如下:
任務隊列(task quque)
任務隊列是queue將消息分發給多個consumer進行處理。
// Task.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Task { private final static String QUEUE_NAME = "TaskQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = argv[0]; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
// Worker.java import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private final static String QUEUE_NAME = "TaskQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } private static void doWork(String task) throws InterruptedException { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
編譯代碼:
javac -cp rabbitmq-client.jar Task.java Worker.java
分別打開三個console,其中兩個運行Worker:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker
另一個console運行Task發送消息:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Task msg
運行結果如下:
使用任務隊列可以實現并行任務處理。默認情況下,RabbitMQ會按消息的順序依次分發給consumer,也就是說平均每個consumer會接收相同數量的消息。這種消息分發方式稱為循環調度(round-robin dispatching)。
消息反饋(message acknowledgment)
如果consumer在執行某個消息的任務過程中失敗了,會怎么樣呢?在當前模式下,RabbitMQ一旦將消息發送給了consumer就從內存中刪除該消息,這種情況下,如果殺掉了worker,那么分配給該worker的未處理的和正在處理中的消息就會丟失。但是我們肯定不希望任務丟失,我們希望一個worker掛掉之后,它未處理完成的任務將分配給其他worker處理。
為了確保消息不會丟失,RabbitMQ支持消息反饋。消息反饋就是consumer告訴RabbitMQ某個消息已經被接收并處理完成了,可以從隊列中刪除了。
如果一個consumer掛掉了(channel或connection被關閉,或者TCP連接丟失),沒有發送反饋(ack),那么RabbitMQ就知道消息沒有被處理完成,就把它重新放入隊列。這時如果有其他consumer在線,那么就把這個消息重新分發。這樣就確保在worker掛掉時消息也不會丟失了。
消息處理是沒有超時機制的。RabbitMQ會在consumer掛掉時才重新分發消息。因此,即使消息處理時間很長很長都沒有關系。
默認消息反饋是開啟的。在上面的例子中通過指定 autoAck=true 關閉了反饋。消息反饋的代碼如下:
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } };
PS:可以使用rabbitmqctl查看未反饋的消息。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
之前的消息反饋可以確保consumer掛掉時消息不會丟失,但是如果RabbitMQ服務器停止或者崩潰了,隊列和其中的消息就都丟失了。要確保服務器上的消息不丟失,需要將隊列和消息都標識成持久化的。
首先,確保RabbitMQ不會丟失隊列:
boolean durable = true; channel.queueDeclare("TaskQueue", durable, false, false, null);
盡管這代碼是正確的,但是目前的環境下卻不起作用。因為之前已經聲明了一個非持久化的隊列TaskQueue。RabbitMQ不允許用不同的參數重新定義已經存在的隊列。換一個新的隊列名稱就行了。
boolean durable = true; channel.queueDeclare("DurableTaskQueue", durable, false, false, null);
這樣RabbitMQ重啟后隊列DurableTaskQueue是仍然存在的。接下來需要將消息標識成持久化的,通過設置屬性MessageProperties為PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
PS:將消息標識成持久化不能完全保證消息不會丟失。RabbitMQ在接收到消息寫入磁盤之間仍有很短的時間間隔,而且RabbitMQ不會每次保存消息時都調用fsync,因此可能消息只被寫入了緩存而還沒有寫入磁盤。如果需要更強的持久化方案,可以使用publisher confirm。
公平分配(fair dispatch)
默認的循環調度方案可能會造成負載不平均。例如,有兩個worker,偶數號的任務處理很耗時,而奇數號的任務處理很快,那么就會導致一個worker處理很慢,積壓很多任務,而另一個worker處理很快,比較空閑。這是因為RabbitMQ在消息進入隊列后就立刻分發出去,它不關心consumer中未反饋的消息數量,只是簡單地將第n個消息分發給第n個consumer。
為了讓每個consumer的負載均勻,通過basicQos方法設置參數prefetchCount為1。這樣,consumer中的消息將不超過一條,當consumer處理完并發送反饋后RabbitMQ才會向它分發新的消息。RabbitMQ在接收到新消息后,如果有空閑的consumer就將消息發給它,否則就一直等待有空閑的consumer再分發。
int prefetchCount = 1; channel.basicQos(prefetchCount);
PS:如果所有的任務都很耗時,worker都很忙,那么可能會導致隊列被塞滿,這時需要增加worker,或者采用其他策略。
最后的完整代碼如下:
// Task.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Task { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = argv[0]; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
// Worker.java import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); int prefetchCount = 1; channel.basicQos(prefetchCount); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } private static void doWork(String task) { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
參考資料: