RabbitMQ之工作隊列
上一篇博客中我們寫了通過一個命名的隊列發送和接收消息的Hello World示例。這篇中我們將會創建一個工作隊列用來在consumer間分發耗時任務。
工作隊列的主要任務是:避免立刻執行資源消耗密集型任務并且必須要等待其完成。相反地,我們進行任務調度:我們把任務封裝為消息發送給隊列。consumer運行在后臺并不斷的從隊列中取出任務執行。當你運行了多個consumer進程時,任務隊列中的任務將會被consumer進程共享執行。
1. 準備</p>
我們使用Thread.sleep來模擬耗時的任務。我們在發送到隊列的消息的末尾添加一定數量的點,每個點代表在工作線程(consumer)中需要耗時1秒,例如hello...將會需要等待3秒。
發送端:NewTask.java
package cc.openscanner;import java.io.IOException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class NewTask { //隊列名稱 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] args) throws IOException { //創建連接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送10條消息,依次在消息后面附加1~10個點 for (int i=0; i<10;i++) { StringBuilder dots = new StringBuilder(); for(int j=0;j<=i;j++){ dots.append("."); } String message = "helloworld" + dots.toString(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉通道和資源 channel.close(); connection.close(); } }</pre>
接收端:Work.java
package cc.openscanner;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;
public class Work { //隊列名稱 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] args) throws java.io.IOException,java.lang.InterruptedException{ //區分不同工作進程的輸出 int hashCode = Work.class.hashCode(); //創建連接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(hashCode + " [] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); //指定消費隊列 channel.basicConsume(QUEUE_NAME, true, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(hashCode + " [x] Received '" + message + "'"); doWork(message); System.out.println(hashCode + " [x] done"); } } private static void doWork(String task) throws InterruptedException { int n = 0; for(char ch :task.toCharArray()){ if(ch == '.') n++; } if(n > 0) Thread.sleep(n1000); } }</pre>
Round-robin轉發
使用任務隊列的好處是能夠很容易的并行工作。如果我們積壓了很多工作,我們僅僅通過增加更多的工作者(consumer)就可以解決問題,使系統的伸縮性更加容易。 下面我們先運行3個工作者(Work.java)實例,然后運行NewTask.java,3個工作者實例都會得到信息。但是如何分配呢?讓我們來看看輸出結果:
[x] Sent 'helloworld.'
[x] Sent 'helloworld..'
[x] Sent 'helloworld...'
[x] Sent 'helloworld....'
[x] Sent 'helloworld.....'
[x] Sent 'helloworld......'
[x] Sent 'helloworld.......'
[x] Sent 'helloworld........'
[x] Sent 'helloworld.........'
[x] Sent 'helloworld..........'工作者1:
1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld.'
1028566121 [x] done
1028566121 [x] Received 'helloworld....'
1028566121 [x] done
1028566121 [x] Received 'helloworld.......'
1028566121 [x] done
1028566121 [x] Received 'helloworld..........'
1028566121 [x] done
工作者2:1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld..'
1028566121 [x] done
1028566121 [x] Received 'helloworld.....'
1028566121 [x] done
1028566121 [x] Received 'helloworld........'
1028566121 [x] done工作者3:
1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld...'
1028566121 [x] done
1028566121 [x] Received 'helloworld......'
1028566121 [x] done
1028566121 [x] Received 'helloworld.........'
1028566121 [x] done
可以看到,默認的,RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,并非一個一個分配。平均的每個消費者將會獲得相等數量的消息。這樣分發消息的方式叫做round-robin。2. 消息應答(message acknowledgments)<br />
執行一個任務需要花費幾秒鐘。你可能會擔心當一個工作者在執行任務時發生中斷。我們上面的代碼,一旦RabbitMQ交付了一個信息給消費者,會馬上從內存中移除這個信息。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正處理的信息。我們也會丟失已經轉發給這個工作者且它還未執行的消息。上面的例子,我們首先開啟兩個任務,然后執行發送任務的代碼(NewTask.java),然后立即關閉第二個任務,結果為:</p>
1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld..'
1028566121 [x] done
1028566121 [x] Received 'helloworld....'
1028566121 [x] done
1028566121 [x] Received 'helloworld......'
1028566121 [x] done
1028566121 [x] Received 'helloworld........'
1028566121 [x] done
1028566121 [x] Received 'helloworld..........'
1028566121 [x] done
與:1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld.'
1028566121 [x] done
1028566121 [x] Received 'helloworld...'
1028566121 [x] done
1028566121 [x] Received 'helloworld.....'
可以看到,丟失了一些任務。但是,我們不希望丟失任何任務(信息)。當某個工作者(接收者)被殺死,我們希望將任務傳遞給另一個工作者(consumer)。為了保證消息永遠不會丟失,RabbitMQ支持消息應答(message acknowledgments)。消費者發送應答給RabbitMQ,告訴它信息已經被接收和處理,然后RabbitMQ可以自由的進行信息刪除。如果消費者被殺死而沒有發送應答,RabbitMQ會認為該信息沒有被完全的處理,然后將會重新轉發給別的消費者。通過這種方式,你可以確認信息不會被丟失,即使消費者偶爾被殺死。
這種機制并沒有超時時間這么一說,RabbitMQ只有在消費者連接斷開時重新轉發此信息。如果消費者處理一個信息需要耗費特別長的時間是允許的。消息應答默認是打開的。上面的代碼中我們通過顯示的設置autoAsk=true關閉了這種機制。下面我們修改代碼(Work.java):
boolean ack = false; //打開應答機制 Channel.basicConsume(QUEUE_NAME,ack,consumer); //另外需要在每次處理完成一個消息后,手動發送一次應答。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);完整修改后的Work.java:
package cc.openscanner;import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class NewTask { //隊列名稱 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] args) throws IOException { //創建連接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送10條消息,依次在消息后面附加1~10個點 for (int i=0; i<10;i++) { StringBuilder dots = new StringBuilder(); for(int j=0;j<=i;j++){ dots.append("."); } String message = "helloworld" + dots.toString(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉通道和資源 channel.close(); connection.close(); } }</pre>
這個就不測試了,大家自己去嘗試下。
3. 消息持久化(Message durability)</p>
我們已經學習了即使消費者被殺死,消息也不會被丟失。但是如果此時RabbitMQ服務被停止,我們的消息仍然會丟失。當RabbitMQ退出或者異常退出,將會丟失所有的隊列和信息,除非你告訴它不要丟失。我們需要做兩件事來確保信息不會被丟:我們需要給所有的隊列和消息設置持久化標志。
第一,我們需要確認RabbitMQ永遠不會丟失我們的隊列。為了這樣,我們需要聲明它為持久化的:
boolean durable = true; channel.queueDeclare("task_queue",durable,false,false,null);注:RabbitMQ不允許使用不同的參數重新定義一個隊列,所以已經存在的隊列,我們無法修改其屬性。
第二,我們需要標識我們的信息為持久化的。通過設置MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN:
channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());現在你可以執行一個發送消息的程序,然后關閉服務,再重新啟動服務,運行消費者程序做下實驗。
4. 公平轉發(Fair dispatch)</p>