RabbitMQ之工作隊列

jopen 9年前發布 | 25K 次閱讀 RabbitMQ 消息系統

上一篇博客中我們寫了通過一個命名的隊列發送和接收消息的Hello World示例。這篇中我們將會創建一個工作隊列用來在consumer間分發耗時任務。

工作隊列的主要任務是:避免立刻執行資源消耗密集型任務并且必須要等待其完成。相反地,我們進行任務調度:我們把任務封裝為消息發送給隊列。consumer運行在后臺并不斷的從隊列中取出任務執行。當你運行了多個consumer進程時,任務隊列中的任務將會被consumer進程共享執行。

1. 準備</p>

我們使用Thread.sleep來模擬耗時的任務。我們在發送到隊列的消息的末尾添加一定數量的點,每個點代表在工作線程(consumer)中需要耗時1秒,例如hello...將會需要等待3秒。

發送端:NewTask.java

package cc.openscanner;

import java.io.IOException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

public class NewTask {     //隊列名稱     private final static String QUEUE_NAME = "workqueue";     public static void main(String[] args) throws IOException {         //創建連接和通道         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("localhost");         Connection connection = factory.newConnection();         Channel channel = connection.createChannel();         //聲明隊列         channel.queueDeclare(QUEUE_NAME, false, false, false, null);         //發送10條消息,依次在消息后面附加1~10個點         for (int i=0; i<10;i++) {             StringBuilder dots = new StringBuilder();             for(int j=0;j<=i;j++){                 dots.append(".");             }             String message = "helloworld" + dots.toString();             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());             System.out.println(" [x] Sent '" + message + "'");         }         //關閉通道和資源         channel.close();         connection.close();     } }</pre>

接收端:Work.java

package cc.openscanner;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;

public class Work {     //隊列名稱     private final static String QUEUE_NAME = "workqueue";     public static void main(String[] args)               throws java.io.IOException,java.lang.InterruptedException{         //區分不同工作進程的輸出         int hashCode = Work.class.hashCode();         //創建連接和通道         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("localhost");         Connection connection = factory.newConnection();         Channel channel = connection.createChannel();         //聲明隊列         channel.queueDeclare(QUEUE_NAME, false, false, false, null);         System.out.println(hashCode + " [] Waiting for messages. To exit press CTRL+C");         QueueingConsumer consumer = new QueueingConsumer(channel);         //指定消費隊列         channel.basicConsume(QUEUE_NAME, true, consumer);         while(true){             QueueingConsumer.Delivery delivery = consumer.nextDelivery();             String message = new String(delivery.getBody());             System.out.println(hashCode + " [x] Received '" + message + "'");             doWork(message);             System.out.println(hashCode + " [x] done");         }     }     private static void doWork(String task) throws InterruptedException {         int n = 0;         for(char ch :task.toCharArray()){             if(ch == '.') n++;         }         if(n > 0) Thread.sleep(n1000);     } }</pre>

Round-robin轉發

使用任務隊列的好處是能夠很容易的并行工作。如果我們積壓了很多工作,我們僅僅通過增加更多的工作者(consumer)就可以解決問題,使系統的伸縮性更加容易。 下面我們先運行3個工作者(Work.java)實例,然后運行NewTask.java,3個工作者實例都會得到信息。但是如何分配呢?讓我們來看看輸出結果:

 [x] Sent 'helloworld.'
 [x] Sent 'helloworld..'
 [x] Sent 'helloworld...'
 [x] Sent 'helloworld....'
 [x] Sent 'helloworld.....'
 [x] Sent 'helloworld......'
 [x] Sent 'helloworld.......'
 [x] Sent 'helloworld........'
 [x] Sent 'helloworld.........'
 [x] Sent 'helloworld..........'

工作者1:

1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld.'
1028566121 [x] done
1028566121 [x] Received 'helloworld....'
1028566121 [x] done
1028566121 [x] Received 'helloworld.......'
1028566121 [x] done
1028566121 [x] Received 'helloworld..........'
1028566121 [x] done
工作者2:

1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld..'
1028566121 [x] done
1028566121 [x] Received 'helloworld.....'
1028566121 [x] done
1028566121 [x] Received 'helloworld........'
1028566121 [x] done

工作者3:

1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld...'
1028566121 [x] done
1028566121 [x] Received 'helloworld......'
1028566121 [x] done
1028566121 [x] Received 'helloworld.........'
1028566121 [x] done
可以看到,默認的,RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,并非一個一個分配。平均的每個消費者將會獲得相等數量的消息。這樣分發消息的方式叫做round-robin。

2. 消息應答(message acknowledgments)<br />

執行一個任務需要花費幾秒鐘。你可能會擔心當一個工作者在執行任務時發生中斷。我們上面的代碼,一旦RabbitMQ交付了一個信息給消費者,會馬上從內存中移除這個信息。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正處理的信息。我們也會丟失已經轉發給這個工作者且它還未執行的消息。上面的例子,我們首先開啟兩個任務,然后執行發送任務的代碼(NewTask.java),然后立即關閉第二個任務,結果為:</p>

1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld..'
1028566121 [x] done
1028566121 [x] Received 'helloworld....'
1028566121 [x] done
1028566121 [x] Received 'helloworld......'
1028566121 [x] done
1028566121 [x] Received 'helloworld........'
1028566121 [x] done
1028566121 [x] Received 'helloworld..........'
1028566121 [x] done
與:

1028566121 [x] Waiting for messages. To exit press CTRL+C
1028566121 [x] Received 'helloworld.'
1028566121 [x] done
1028566121 [x] Received 'helloworld...'
1028566121 [x] done
1028566121 [x] Received 'helloworld.....'
可以看到,丟失了一些任務。但是,我們不希望丟失任何任務(信息)。當某個工作者(接收者)被殺死,我們希望將任務傳遞給另一個工作者(consumer)。為了保證消息永遠不會丟失,RabbitMQ支持消息應答(message acknowledgments)。消費者發送應答給RabbitMQ,告訴它信息已經被接收和處理,然后RabbitMQ可以自由的進行信息刪除。如果消費者被殺死而沒有發送應答,RabbitMQ會認為該信息沒有被完全的處理,然后將會重新轉發給別的消費者。通過這種方式,你可以確認信息不會被丟失,即使消費者偶爾被殺死。
這種機制并沒有超時時間這么一說,RabbitMQ只有在消費者連接斷開時重新轉發此信息。如果消費者處理一個信息需要耗費特別長的時間是允許的。

消息應答默認是打開的。上面的代碼中我們通過顯示的設置autoAsk=true關閉了這種機制。下面我們修改代碼(Work.java):

boolean ack = false;    //打開應答機制
Channel.basicConsume(QUEUE_NAME,ack,consumer);        
//另外需要在每次處理完成一個消息后,手動發送一次應答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

完整修改后的Work.java:

package cc.openscanner;

import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

public class NewTask {     //隊列名稱     private final static String QUEUE_NAME = "workqueue";     public static void main(String[] args) throws IOException {         //創建連接和通道         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("localhost");         Connection connection = factory.newConnection();         Channel channel = connection.createChannel();         //聲明隊列         channel.queueDeclare(QUEUE_NAME, false, false, false, null);         //發送10條消息,依次在消息后面附加1~10個點         for (int i=0; i<10;i++) {             StringBuilder dots = new StringBuilder();             for(int j=0;j<=i;j++){                 dots.append(".");             }             String message = "helloworld" + dots.toString();             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());             System.out.println(" [x] Sent '" + message + "'");         }         //關閉通道和資源         channel.close();         connection.close();     } }</pre>

這個就不測試了,大家自己去嘗試下。

3. 消息持久化(Message durability)</p>

我們已經學習了即使消費者被殺死,消息也不會被丟失。但是如果此時RabbitMQ服務被停止,我們的消息仍然會丟失。當RabbitMQ退出或者異常退出,將會丟失所有的隊列和信息,除非你告訴它不要丟失。我們需要做兩件事來確保信息不會被丟:我們需要給所有的隊列和消息設置持久化標志。

第一,我們需要確認RabbitMQ永遠不會丟失我們的隊列。為了這樣,我們需要聲明它為持久化的:

boolean durable = true;
channel.queueDeclare("task_queue",durable,false,false,null);

注:RabbitMQ不允許使用不同的參數重新定義一個隊列,所以已經存在的隊列,我們無法修改其屬性。

第二,我們需要標識我們的信息為持久化的。通過設置MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN:

channel.basicPublish("","task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

現在你可以執行一個發送消息的程序,然后關閉服務,再重新啟動服務,運行消費者程序做下實驗。

4. 公平轉發(Fair dispatch)</p>
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!