RabbitMQ簡介與使用

jopen 9年前發布 | 54K 次閱讀 RabbitMQ ActiveMQ 消息系統

RabbitMQ簡介與使用

 2013年3月23日  小白  學習筆記

1. AMQP簡介

在了解RabbitMQ之前,首先要了解AMQP協議。AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。

當前各種應用大量使用異步消息模型,并隨之產生眾多消息中間件產品及協議,標準的不一致使應用與中間件之間的耦合限制產品的選擇,并增加維護成本。AMQP是一個提供統一消息服務的應用層標準協議,基于此協議的客戶端與消息中間件可傳遞消息,并不受不同客戶端/中間件產品,不同開發語言等條件的限制。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。

AMQP的實現有:

  • OpenAMQ

    </li>

  • AMQP的開源實現,用C語言編寫,運行于Linux、AIX、Solaris、Windows、OpenVMS

    </li>

  • Apache Qpid

    </li>

  • Apache的開源項目,支持C++、Ruby、Java、JMS、Python和.NET

    </li>

  • Redhat Enterprise MRG

    </li>

  • 實現了AMQP的最新版本0-10,提供了豐富的特征集,比如完全管理、聯合、Active-Active集群,有Web控制臺,還有許多企業級特征,客戶端支持C++、Ruby、Java、JMS、Python和.NET

    </li>

  • RabbitMQ

    </li>

  • 一個獨立的開源實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ發布在Ubuntu、FreeBSD平臺

    </li>

  • AMQP Infrastructure

    </li>

  • Linux下,包括Broker、管理工具、Agent和客戶端

    </li>

  • ?MQ

    </li>

  • 一個高性能的消息平臺,在分布式消息網絡可作為兼容AMQP的Broker節點,綁定了多種語言,包括Python、C、C++、Lisp、Ruby等

    </li>

  • Zyre

    </li>

  • 是一個Broker,實現了RestMS協議和AMQP協議,提供了RESTful HTTP訪問網絡AMQP的能力

    </li> </ul>

    以上是AMQP中的核心概念:

    • Broker

      </li>

    • 消息服務器的實體

      </li>

    • 虛擬主機(Virtual Host)

      </li>

    • 一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。客戶端應用程序在登錄到服務器之后,可以選擇一個虛擬主機。每個連接(包括所有channel)都必須關聯至一個虛擬主機

      </li>

    • 交換器(Exchange)

      </li>

    • 服務器中的實體,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列

      </li>

    • 消息隊列(Message Queue)

      </li>

    • 服務器中的實體,用來保存消息直到發送給消費者

      </li>

    • 生產者(Producer)

      </li>

    • 一個向交換器發布消息的客戶端應用程序

      </li>

    • 消費者(Consumer)

      </li>

    • 一個從消息隊列中請求消息的客戶端應用程序

      </li>

    • 綁定器(Binding)

      </li>

    • 將交換器和隊列連接起來,并且封裝消息的路由信息

      </li> </ul>

      所有這些組件的屬性各不相同,但是只有交換器和隊列被命名。客戶端可以通過交換器的名字來發送消息,也可以通過隊列的名字收取信息。因為AMQ 協議沒有一個通用的標準方法來獲得所有組件的名稱,所以客戶端對隊列和交換器的訪問被限制在僅能使用熟知的或者只有自己知道的名字。

      綁定器沒有名字,它們的生命期依賴于所緊密連接的交換器和隊列。如果這兩者任意一個被刪除掉,那么綁定器便失效了。這就說明,若要知道交換器和隊列的名字,還需要設置消息路由。

      消息是一個不透明的數據包,這些包有如下性質:

      •  元數據,例如內容的編碼或者表明來源的字段

        </li>

      •  標志位,標記消息投遞時候的一些保障機制

        </li>

      •  一個特殊的字段叫做routing key

        </li> </ul>

        發送消息是一個非常簡單的過程。客戶端聲明一個它想要發送消息的目的交換器,然后將消息傳遞給交換器。

        接受消息的最簡單辦法是設置一個訂閱。客戶端需要聲明一個隊列,并且使用一個綁定器將之前的交換器和隊列綁定起來,這樣的話,訂閱就設置完畢。

        交換器的類型:

        • fanout交換器

          </li>

        • 不會解釋任何東西:它只是將消息投遞到所有綁定到它的隊列中

          </li>

        • direct交換器

          </li>

        • 將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上

          </li>

        • topic交換器

          </li>

        • 模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分成單詞。這些單詞之間用點隔開。它同樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,但是不匹配stock.nasdaq

          </li>

        • header交換器

          </li>

        • 根據應用程序消息的特定屬性進行匹配

          </li>

        • failover和system交換器

          </li>

        • 當前RabbitMQ版本中均未實現

          </li> </ul>

          沒有綁定器,哪怕是最簡單的消息,交換器也不能將其投遞到隊列中,只能拋棄它。通過訂閱一個隊列,消費者能夠從隊列中獲取消息,然后在使用過后將其從隊列中刪除。

          不同于隊列的是,交換器有相應的類型,表明它們的投遞方式(通常是在和綁定器協作的時候)。因為交換器是命名實體,所以聲明一個已經存在的交換器, 但是試圖賦予不同類型是會導致錯誤。客戶端需要刪除這個已經存在的交換器,然后重新聲明并且賦予新的類型。

          交換器也有一些性質:

          •  持久性:如果啟用,交換器將會在Broker重啟前都有效

            </li>

          •  自動刪除:如果啟用,那么交換器將會在其綁定的隊列都被刪除掉之后自動刪除掉自身

            </li>

          •  惰性:如果沒有聲明交換器,那么在執行到使用的時候會導致異常,并不會主動聲明

            </li> </ul>

            AMQP Broker都會對其支持的每種交換器類型(為每一個虛擬主機)聲明一個實例。這些交換器的命名規則是amq.前綴加上類型名。例如 amq.fanout。空的交換器名稱等于amq.direct。對這個默認的direct交換器(也僅僅是對這個交換器),Broker將會聲明一個綁定了系統中所有隊列的綁定器。

            這個特點告訴我們,在系統中,任意隊列都可以和默認的direct交換器綁定在一起,只要其routing-key等于隊列名字。

            默認綁定器的行為揭示了多綁定器的存在,將一個或者多個隊列和一個或者多個交換器綁定起來。這使得可以將發送到不同交換器的具有不同routing key(或者其他屬性)的消息發送到同一個隊列中。

            隊列也有以下屬性,這些屬性和交換器所具有的屬性類似。

            •  持久性:如果啟用,隊列將會在Broker重啟前都有效

              </li>

            •  自動刪除:如果啟用,那么隊列將會在所有的消費者停止使用之后自動刪除掉自身

              </li>

            •  惰性:如果沒有聲明隊列,那么在執行到使用的時候會導致異常,并不會主動聲明

              </li>

            •  排他性:如果啟用,隊列只能被聲明它的消費者使用

              </li> </ul>

              這些性質可以用來創建例如排他和自刪除的transient或者私有隊列。這種隊列將會在所有鏈接到它的客戶端斷開連接之后被自動刪除掉 – 它們只是短暫地連接到Broker,但是可以用于實現例如RPC或者在AMQ上的對等通信。

              AMQP上的RPC是這樣的:RPC客戶端聲明一個回復隊列,唯一命名(例如用UUID19), 并且是自刪除和排他的。然后它發送請求給一些交換器,在消息的reply-to字段中包含了之前聲明的回復隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to作為routing key(之前提到過默認綁定器會綁定所有的隊列到默認交換器)發送到默認交換器。注意僅僅是慣例而已。根據和RPC服務器的約定,它可以解釋消息的任何屬性(甚至數據體)來決定回復給誰。

              隊列也可以是持久的,可共享,非自動刪除以及非排他的。使用同一個隊列的多個用戶接收到的并不是發送到這個隊列的消息的一份拷貝,而是這些用戶共享這隊列中的一份數據,然后在使用完之后刪除掉。

              2. RabbitMQ簡介

              RabbitMQ是一個遵循AMQP協議的消息中間件,它從生產者接收消息并遞送給消費者,在這個過程中,根據規則進行路由,緩存與持久化。

              幾個概念說明(完全遵循AMQP中的概念):

              •  Broker:簡單來說就是消息隊列服務器實體

                </li>

              •  Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列

                </li>

              •  Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列

                </li>

              •  Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來

                </li>

              •  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞

                </li>

              •  vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離

                </li>

              •  producer:消息生產者,就是投遞消息的程序

                </li>

              •  consumer:消息消費者,就是接受消息的程序

                </li>

              •  channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

                </li> </ul>

                消息隊列的使用過程大概如下:

                1. 客戶端連接到消息隊列服務器,打開一個channel

                  </li>

                2. 客戶端聲明一個exchange,并設置相關屬性

                  </li>

                3. 客戶端聲明一個queue,并設置相關屬性

                  </li>

                4. 客戶端使用routing key,在exchange和queue之間建立好綁定關系

                  </li>

                5. 客戶端投遞消息到exchange

                  </li> </ol>

                  exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

                  exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

                  RabbitMQ支持消息的持久化,消息隊列持久化包括3個部分:

                  •  exchange持久化,在聲明時指定durable為true

                    </li>

                  •  queue持久化,在聲明時指定durable為true

                    </li>

                  •  消息持久化,在投遞時指定delivery_mode 為2(1是非持久化)

                    </li> </ul>

                    如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

                    RabbitMQ的特性:

                    •  可靠性:包括消息持久化,消費者和生產者的消息確認

                      </li>

                    •  靈活路由:遵循AMQP協議,支持多種Exchange類型實現不同路由策略

                      </li>

                    •  分布式:集群的支持,包括本地網絡與遠程網絡

                      </li>

                    •  高可用性:支持主從備份與鏡像隊列

                      </li>

                    •  多語言支持:支持多語言的客戶端

                      </li>

                    •  WEB界面管理:可以管理用戶權限,exhange,queue,binding,與實時監控

                      </li>

                    •  訪問控制:基于vhosts實現訪問控制

                      </li>

                    •  調試追蹤:支持tracing,方便調試

                      </li> </ul>

                      3. RabbitMQ使用向導

                      3.1. RabbitMQ的安裝

                      3.1.1. 準備工作與安裝

                      因為RabbitMQ由ERLANG實現,安裝RabbitMQ之前要先安裝ERLANG

                      安裝包:otp_src_R15B03-1.tar.gz ERLANG安裝包

                                       rabbitmq-server-generic-unix-3.0.0.tar.gz RabbitMQ服務端

                                       rabbitmq-java-client-bin-3.0.0.tar.gz  RabbitMQ客戶端,包含性能測試腳本

                      以下是上述版本為例的安裝步驟,后續章節描述的內容都對應此版本ERLANG的安裝步驟:

                      tar -zxf otp_src_R15B03-1.tar.gzz
                      cd otp_src_R15B03
                      ./configure
                      make
                      make install

                      RabbitMQ客戶端與服務端的安裝直接解壓安裝包即可,客戶端的目錄中,rabbitmq-client.jar為JAVA版的客戶端,編寫客戶端程序時需要引用,腳本文件為性能測試腳本

                      $RABBIT_MQ_HOME/sbin目錄中的文件說明及命令使用參考http://www.rabbitmq.com/manpages.html

                      RabbitMQ的啟停:

                      rabbitmq-server啟動服務,如要以后臺方式啟動,增加-detached參數

                      rabbitmqctl stop停止服務

                      rabbitmq-plugins enable rabbitmq_management打開WEB管理界面插件,默認訪問地址:

                      http://服務器IP:15672

                      3.1.2. 單臺RabbitMQ的配置

                      通過配置環境變量或者配置文件,修改諸如端口,綁定IP,broker的名稱等,參考配置管理章節

                      例如:

                      修改$RABBIT_MQ_HOME/sbin/rabbitmq-env文件,增加配置:

                      HOSTNAME=broker_138 如果是集群,每臺機器的名稱要不同

                      RABBITMQ_NODE_IP_ADDRESS=192.168.100.138 綁定機器IP

                      3.1.3. RabbitMQ集群的配置

                      RabbitMQ集群的運行需要集群中的所有節點共享erlang.cookie,以其中一臺RabbitMQ中用戶目錄下~/.erlang.cookie文件為準,復制文件內容,將所有節點的erlang.cookie文件都修改為此值。

                      先啟動所有節點的RabbitMQ,然后依次在每臺RabbitMQ中執行命令:

                      ./rabbitmqctl stop_app
                      ./rabbitmqctl join_cluster rabbit@broker_138
                      ./rabbitmqctl start_app

                      rabbit@broker_138為其中一臺RabbitMQ的實例名稱,所有RabbitMQ節點都添加同一節點即可。

                      3.2. 使用客戶端程序發送與接收消息

                      3.2.1. Hello World

                      一個簡單的示例,P是生產者,C是消費者。P發送消息到隊列,C從隊列取消息。代碼如下:

                      生產者:

                      首先建立連接,在連接上建立channel,通常一個連接會建立多個channel,可以提高消息的發送速度。這里只建立了一個連接,建立多個channel時,客戶端可使用多線程,每個線程里使用一個channel

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      //然后聲明隊列,如果隊列沒有預先創建,會創建隊列。消息以字節碼的形式發送,所以在客戶端可以使用任何編碼格式。
                      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                      String message = "Hello World!";
                      channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                      System.out.println(" [x] Sent '" + message + "'");
                      //最后別忘了關閉channel和connection
                      channel.close();
                      connection.close();

                      消費者:

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
                      QueueingConsumer consumer = new QueueingConsumer(channel);
                      channel.basicConsume(QUEUE_NAME, true, consumer);
                       
                      while (true) {
                          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                          String message = new String(delivery.getBody());
                          System.out.println(" [x] Received '" + message + "'");
                      }

                      3.2.2. 工作隊列

                      一個隊列可以有多個消費者,隊列發送消息采用Round-robin方式,即順序發送給每個消費者,在單位時間內,每個消費者收到的消息數量是相同的。

                      以上是假設每個消費者處理消息的速度是一樣的,如果每個消費者處理消息的速度不同,那么Round-robin方式的效率就不高了,這時可以設置prefetch參數。prefetch的值表示在消費者為返回上一條消息的確認信息時,隊列最多發送給此消費者的消息數目。如果消息數目達到prefetch值,隊列就停止發送消息給這個消費者,并隨之發送給不忙的消費者。prefetch通過以下代碼設置:

                      channel.basicQos(prefetchCount);

                      3.2.3. 訂閱/發布

                      在上一個示例中,隊列保證每條消息發送給其中一個消費者,即每個消息只被處理一次。在實際應用中,經常會有這樣的需求,每條消息要同時發送給多個消費者或者更復雜的情況。也就是說消息需要根據一定的規則發送給不同的消費者。

                      為實現消息路由,需要引入Exchange,圖中用X表示。生產者不再直接發送消息給隊列,而是先發送到Exchange。然后Exchange與隊列綁定。這樣消息會根據不同規則發送給不同隊列,最終到達不同的消費者。

                      實現代碼如下:

                      生產者:

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                      String message = getMessage(argv);
                      channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                      System.out.println(" [x] Sent '" + message + "'");
                      channel.close();
                      connection.close();

                      消費者:

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                      String queueName = channel.queueDeclare().getQueue();
                      channel.queueBind(queueName, EXCHANGE_NAME, "");
                      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
                      QueueingConsumer consumer = new QueueingConsumer(channel);
                      channel.basicConsume(queueName, true, consumer);
                       
                      while (true) {
                          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                          String message = new String(delivery.getBody());
                          System.out.println(" [x] Received '" + message + "'");
                      }

                      很多時候,隊列是非持久并且是自動刪除的,這時隊列名稱也就不重要了,可以通過以下代碼,由服務器自動生成隊列名稱。自動生成的隊列以amq開頭

                      String queueName = channel.queueDeclare().getQueue();

                      3.2.4. 消息路由

                      Exchange的類型不同,消息的路由規則也不同,Exchange的類型介紹請參考RabbitMQ簡介。以下是以direct類型的Exchange為例的生產者代碼實現, 最重要的兩步就是聲明Exhange類型與發送時指定routeKey

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                      String severity = getSeverity(argv); //返回info,error,warning作為routeKey
                      String message = getMessage(argv);
                      channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                      System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
                      channel.close();
                      connection.close();

                      3.2.5. 主題訂閱

                      以下是以topic類型的Exchange為例的生產者代碼實現:

                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
                      String routingKey = getRouting(argv);
                      String message = getMessage(argv);
                      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                      connection.close();

                      一條以quick.orange.rabbit為routeKey的消息,Q1和Q2都會收到,lazy.orange.elephant也是。quick.orange.fox只能發送到Q1,lazy.brown.fox只能到Q2. lazy.pink.rabbit雖然符合兩個匹配規則,但只發送到Q2,因為先匹配的lasy.#規則。quick.brown.fox則Q1和Q2都收不到,會被直接丟棄。

                      3.2.6. RPC

                      以上示例都是異步的,即生產者不需要等待消費者的反饋。在實際情況中,有些時候在消息處理比較快,且需要及時反饋時,則需要同步的方式,生產者發送消息,在收到消費者的反饋前一直處于阻塞狀態。因為等待的返回來自遠程主機,這種方式也被稱為RPC(Remote procedure call)。RPC的實現有很多,比如JAVA平臺下的RMI,JMX。

                      以下是在RabbitMQ中的實現:

                      RPCClient:

                      private Connection connection;
                      private Channel channel;
                      private String requestQueueName = "rpc_queue";
                      private String replyQueueName;
                      private QueueingConsumer consumer;
                       
                      public RPCClient() throws Exception {
                         ConnectionFactory factory = new ConnectionFactory();
                         factory.setHost("localhost");
                         connection = factory.newConnection();
                         channel = connection.createChannel();
                         replyQueueName = channel.queueDeclare().getQueue();
                         consumer = new QueueingConsumer(channel);
                         channel.basicConsume(replyQueueName, true, consumer);
                      }
                       
                      public String call(String message) throws Exception {
                         String response = null;
                         String corrId = java.util.UUID.randomUUID().toString();
                         BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
                         channel.basicPublish("", requestQueueName, props, message.getBytes());
                       
                         while (true) {
                             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                             if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                                 response = new String(delivery.getBody());
                                 break;
                             }
                         }
                         return response;
                      }
                       
                      public void close() throws Exception {
                         connection.close();
                      }

                      RPCServer:

                      private static final String RPC_QUEUE_NAME = "rpc_queue";
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("localhost");
                      Connection connection = factory.newConnection();
                      Channel channel = connection.createChannel();
                      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                      channel.basicQos(1);
                      QueueingConsumer consumer = new QueueingConsumer(channel);
                      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
                      System.out.println(" [x] Awaiting RPC requests");
                       
                      while (true) {
                          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                          BasicProperties props = delivery.getProperties();
                          BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
                          String message = new String(delivery.getBody());
                          int n = Integer.parseInt(message);
                          System.out.println(" [.] fib(" + message + ")");
                          String response = "" + fib(n);
                          channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
                          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                      }

                      工作流程如下:

                      1. 客戶端啟動,建立發送隊列與反饋隊列

                        </li>

                      2. 當RPC客戶端發送消息時,設置replyTo和correlationId參數。replyTo參數為反饋隊列的名稱,correlationId作為一次請求的唯一標識,要每次請求都不同,用于關聯服務端的反饋消息

                        </li>

                      3. 請求發送到rpc_queue

                        </li>

                      4. 服務端等待請求,當收到請求后,處理請求,并將反饋通過replyTo指定的反饋隊列發送回去

                        </li>

                      5. 客戶端收到反饋,并校驗correlationId的值是否與發送的一致,如果一致,則一次請求完成

                        </li> </ol>

                        4. 消息的可靠傳遞

                        4.1. 連接失敗的處理

                        RabbitMQ不支持連接的failover,所以需要客戶端自己實現失敗重連。

                        4.2. 服務器的可靠性

                        為保證消息的可靠傳遞,服務器使用持久化保證消息不丟失。包括exchange與queue必須定義為持久的,同時發送消息時,也要設置消息為持久消息。

                        在代碼中可以通過以下語句設置發送持久消息:

                        channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)

                        或者:

                        BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
                         // deliveryMode為1是非持久
                        channel.basicPublish(exchangeName, routeKey, basicProperties, msg)

                        4.3. 生產者的可靠性

                        生產者的消息確認叫做confirm,confirm確保消息已經發送到MQ中。當connection或channel異常時,會重新發送消息,如果消息是持久的,并不能一定保證消息持久化到磁盤中,因為消息可能存在與磁盤的緩存中。為進一步提高可靠性,可以使用事務。Confirm與事務不能同時使用。

                        當生產者收不到confirm時,消息可能會重復,所以如果消息不允許重復,則消費者需要自己實現消息去重。

                        使用以下代碼打開confirm,默認是關閉的

                        channel.confirmSelect();

                        4.4. 消費者的可靠性

                        消費者的消息確認叫做Acknowledgements,Acknowledgements確保消費者已經處理了消息,如果收不到消費者的Acknowledgements,MQ會重新發送消息。

                        默認Acknowledgements是自動確認,如需客戶端控制,在消費者的代碼中設置:

                        channel.basicConsume(queueName,false,consumer);//聲明隊列時,設置autoack為false
                        。。。
                        //消息處理代碼
                        。。。
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //發送確認

                        同樣,MQ也可能收不到消費者的Acknowledgements,就會重復發送消息,若要避免,消費者需要自己實現消息去重。

                        4.5. 分布式的RabbitMQ

                        RabbitMQ提供了3中分布式的解決方案,cluster,federation,shovel。cluster用于可靠的本地局域網,后兩種用于不可靠的網絡。

                        5. 分布式

                        5.1. Cluster

                        Cluster將多臺機器連接為一個邏輯broker,各機器之間使用Erlang消息通信,所以cluster中各機器必須有一樣的Erlang cookie,并且機器之間的網絡要是可靠的,并且都運行相同版本的Erlang。

                        Virtual hosts,exchanges,用戶及權限都在所有節點同步,queues可以位于本機,也可以作為鏡像隊列,在各個機器之間同步。

                        通常使用cluster來提高可靠性與增加吞吐量。

                        5.2. Federation

                        Federation允許一個exchange從另外一臺機器或者cluster的exchange中接收消息,因為是兩個exchange聯合起來,所以必須有相同的用戶權限。

                        聯合起來的exchange是單向的點對點的連接。

                        通常應該在通過internet連接broker的時候使用Federation

                        5.3. The Shovel

                        Shovel與Federation的概念類似,只是工作在更低的層次。

                        Federation是從一個exchange到另一個exchange,而Shovel是從一邊的queue中取走消息并發送到另一個exchange。

                        通常在通過internet連接broker的時,并且需要獲得比Federation更多控制權的時候使用Shovel。

                        以下是三種分布式模式的簡要對比:

                        </tr>

                        </tr>

                        </tr>

                        </tr>

                        </tr>

                        </tr>

                        </tr>

                        </tr> </tbody> </table>

                        6. 流量控制

                        6.1. 基于連接的流量控制

                        當生產者發送消息的速率大于消息被路由到queue的速率時,會觸發流量控制,發送速率受到限制,但不會完全阻塞。

                        6.2. 基于內存的流量控制

                        當內存使用達到vm_memory_high_watermark的值時,會觸發流量控制,生產者被阻塞。vm_memory_high_watermark的默認值是系統內存的40%,這個值可以在配置文件中修改。

                        [{rabbit, [{vm_memory_high_watermark, 0.4}]}].

                        或者在運行時通過命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改立即生效,但下次重啟后恢復。所以要永久修改,必須同時修改配置文件。

                        6.3. 基于磁盤的流量控制

                        當磁盤剩余空間小于disk_free_limit的值時,觸發流量控制,生產者被阻塞。disk_free_limit的默認值是1GB,可在配置文件中修改。

                        [{rabbit, [{disk_free_limit, 25000000000}]}].

                        7. 內存使用

                        通過命令rabbitmqctl status可以查看內存使用狀態,或者在WEB管理界面中點擊節點后查看。

                        其中Queues表示隊列中消息占用的內存

                        Mnesia表示MQ中定義的exchange,queue,bindings,用戶及權限占用的內存

                        詳細說明請參考http://www.rabbitmq.com/memory-use.html

                        8. 配置管理

                        RabbitMQ的默認配置在大部分情況下是最佳配置,如果服務運行良好,不需要修改。RabbitMQ支持3種方式修改配置:環境變量、配置文件、運行時參數與策略。

                        環境變量可以配置到shell環境變量中,也可以在RabbitMQ的環境變量中配置。例如:配置服務綁定IP,可以在shell環境變量里配置RABBITMQ_NODE_IP_ADDRESS的值,也可以在RabbitMQ的環境變量中配置NODE_IP_ADDRESS的值,即RabbitMQ的環境變量中變量名稱要去掉RABBITMQ_。RabbitMQ的環境變量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的優先級為shell環境變量優先于RabbitMQ的環境變量,RabbitMQ的環境變量優先于RabbitMQ默認的環境變量。

                        通過配置文件配置,要先在環境變量中指定配置文件路徑,例如:

                        CONFIG_FILE=/etc/rabbitmq/rabbitmq.config

                        然后添加配置,例如:

                        [
                         
                            {mnesia, [{dump_log_write_threshold, 1000}]},
                         
                            {rabbit, [{tcp_listeners, [5673]}]}
                         
                        ].

                        通過rabbitmqctl命令可以在運行時修改配置,例如修改vm_memory_high_watermark。還有些配置,比如鏡像隊列,是通過管理界面或命令配置策略實現的。

                        詳細的配置項請參考http://www.rabbitmq.com/configure.html

                        9. 高可用性

                        9.1. 主從備份

                        RabbitMQ支持主從備份,當主服務器不可用時,存在磁盤中的消息可以由從服務器恢復。

                        也可以在集群的基礎上配置主從備份。主從備份依賴Pacemaker來管理資源,主從備份的方式已不推薦使用,而鏡像隊列則更容易使用,且可靠性更高。

                        9.2. 鏡像隊列

                        雖然使用cluster可以提高可靠性,exchange,binding在各個機器是共享的,但是queue中的消息實際上還是存在單獨的機器,如果一臺機器不可用,那么在這臺機器恢復前,這臺機器中存儲的消息也是不可用的。

                        為解決這樣的問題,引入了鏡像隊列,鏡像隊列是在集群中為隊列建立的一個或多個物理鏡像,這些鏡像分別存儲在主節點之外的其他節點,所有節點中的隊列共同組成一個邏輯隊列。將一個隊列做鏡像后,即使此機器不可用,RabbitMQ會自動從鏡像中選擇一個繼續使用,不會導致隊列中的消息不可用。

                        如果為一個隊列建立多個鏡像,前者稱為主節點,后者稱為從節點。如果主節點有問題,那么RabbitMQ會從從節點中選擇最早同步的一個作為新的主節點,以保證盡量不丟失消息,然而原主節點中同步之前的消息還是會丟失。

                        鏡像隊列運行在cluster中,不建議通過WAN使用,也就是不建議在Federation和Shovel中使用。

                        鏡像隊列是通過策略配置的,添加一個策略,匹配相應的隊列,然后指定一個key為ha-mode的參數,例如:

                        rabbitmqctl set_policy ha-all "^ha." '{"ha-mode":"all"}'

                        這個策略設置所有的節點都為ha.開頭的隊列做鏡像。這個設置也可以在管理界面中添加,詳細信息請參考http://www.rabbitmq.com/ha.html

                        10. 性能

                        10.1. 性能測試

                        RabbitMQ的JAVA客戶端中附帶了性能測試腳本,以下數據都由此腳本測試得到。

                        硬件環境:CPU::Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz

                        內存:4G

                        磁盤:500G    10000轉/分

                        軟件環境:otp_src_R15B03-1.tar.gz

                              rabbitmq-server-generic-unix-3.0.0.tar.gz (單臺)

                              rabbitmq-java-client-bin-3.0.0.tar.gz

                               Red Hat 4.1.2-48 (Linux version 2.6.18)

                        以下是發送0.5KB大小消息的測試結果:

                        producer  consumer  confirm(max unconfirmed publishes 100)  ack  persistent  throughput (msg/s)

                        1 1 N     N  N      17650

                        1 1 Y     N      N 15640

                        1 1 N     Y      N 17100

                        1 1 N     N      Y 17368

                        1 1 Y     N      Y 15635

                        1 1 N     Y      Y 9154

                        1 1 Y     Y      N 15266

                        1 1 Y     Y      Y 6111

                        max unconfirmed publishes的值對于吞吐量的影響較大.

                        在發送持久消息與打開消費者的acknowledgements時,吞吐量變化明顯。

                        關于性能,請參考以下文章:

                        http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/

                        http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

                        10.2. 隊列的性能

                        RabbitMQ中的隊列性能是一個值得關注的地方。在設計方案時就應該考慮到。隊列只有在保持隊列中不積壓消息時,性能才是最佳的,隊列中積壓的消息越多,性能下降越多。

                        例如生產者發送消息的速度是600msg/s,消費者接收的速度是1200msg/s,正常情況下,是沒有性能問題的。這時如果停止消費者一段時間,讓消息在隊列中積壓,然后在打開消費者。按理消費者的速度大于生產者速度,可以轉發新消息,并把老消息也取走,最終隊列又回到為空的狀態。但實際情況則不是,隊列中的消息會繼續積壓,而且會繼續變多,而這時消費者的速度就不如之前的了。

                        RabbitMQ中的隊列,在實現上又分為多個小的隊列,每個隊列里存儲著不同狀態的消息。當消息不積壓時,消息由交換器到達隊列,就會被直接發送給消費者。而當消息堆積時,由于占用較多內存,RabbitMQ會把消息放入更深層次的隊列,例如將內存中的消息換出到磁盤上(不管消息是否持久化),而這些操作會消耗更多的CPU等系統資源,從而導致影響隊列中消息的發送。

                        為了不使消息積壓,可以采取兩種方法:

                        1. 停止向隊列發送消息

                          </li> </ol>

                          停止發送消息,讓系統資源都集中到向消費者發送消息,隊列中的消息逐漸減少,隊列最終會恢復至為空狀態。

                          1. 轉移負載

                            </li> </ol>

                            有些時候不能停止生產者,這時可以改變綁定,讓新消息發送到新的隊列,新隊列必須位于新的機器上。當然也需要新的消費者來連接。這樣可以讓老隊列中的消息慢慢取走,也不影響新消息的發送。

                            11. 生產實例


                            默認的集群模式下,雖然消息可以發送到一臺機器,然后從另一臺機器取出,但是因為每臺機器的queue實際上消息是本地存儲,所以消息發到A的queue,從B中取,首先需要從A再次發送到B中,這樣會導致取消息的效率不高。

                            如果使用鏡像模式,A中的消息會同步到B中,消費者從B中取消息,消息是從本地取了,但是隊列做鏡像依然對性能影響很大,尤其是鏡像的數目增加,性能會成倍下降。鏡像隊列優于普通模式的地方在于可靠性,普通模式中,A如果有故障,那么A中的消息就無法取出。鏡像模式中,A有故障,消息依然可以從B中取出。

                            以下是我們生產環境的集群配置方案,因為對于吞吐量要求很高,單臺RabbitMQ無法滿足性能要求,所以選擇使用cluster,而鏡像模式對于性能影響很大,只能采取其他方案:假設3臺RabbitMQ組成一個集群。然后建立多個queue,exchange使用direct類型,并綁定所有queue,routeKey為0到2(和MQ的數量一致)中隨機發送。生產者發送消息到exchange,并路由到各個queue,消費者也有多個,同時從各個queue獲取消息。生產者與消費者使用多channel提高速度,同時消費者使用異步接收方式。

                            使用多個隊列,可以顯著提高集群的吞吐量,每個隊列要位于不同的物理機器上。考慮性能優先,也取消了消息持久化。但是在可靠性方面,如果某個隊列不可用,那么發送給這個隊列的消息就會被丟棄。為避免這種情況,采用備用綁定與備用隊列的方式,即建立多個綁定,默認情況exchange通過routeKey 0,1,2綁定隊列a,b,c(橙色線路) ,備用綁定是exchange通過routeKey 0,1,2 綁定隊列d(紫色線路)。比如當隊列a不可用時,默認的綁定routeKey為0的消息就無法發送到a隊列,這時備用策略自動生效,routeKey為0的消息會被發送到隊列d上(走紫色線路),routeKey為1和2的消息照常發到b和c(還是橙色線路)。這樣就可以確保消息不丟失。若要進一步提高可靠性,降低備用隊列的壓力,可以建立多個備用隊列,然后將綁定分散開來。

                            12. 類似產品對比

                            12.1. 功能特性對比

                            12.2. 性能對比

                            1百萬條1K的消息

                            附錄:參考資源

                            [1] http://www.rabbitmq.com/documentation.html

                            [2] http://www.infoq.com/cn/articles/AMQP-RabbitMQ#ftn.26

                            [3] http://langyu.iteye.com/blog/759663/

                            [4] http://mysql.taobao.org/index.php/Rabbitmq

                            [5] http://blog.163.com/clevertanglei900@126/blog/static/111352259201011121041853/

                            [6] http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/

                            [7] http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

                            [8] http://www.rabbitmq.com/blog/2011/10/27/performance-of-queues-when-less-is-more/

                            [9] http://www.rabbitmq.com/blog/2011/09/24/sizing-your-rabbits/

                            [10] http://www.oschina.net/news/17973/message-queue-shootout

                            轉自:http://changmengnan.com/284.html

                             本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
                             轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
                             本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
                        Federation
                        / Shovel
                        Clustering
                        Brokers
                        are logically separate and may have different owners.
                        A
                        cluster forms a single logical broker.
                        Brokers
                        can run different versions of RabbitMQ and Erlang.
                        Nodes
                        must run the same version of RabbitMQ, and frequently Erlang.
                        Brokers
                        can run different versions of RabbitMQ and Erlang.
                        Brokers
                        must be connected via reliable LAN links. Communication is via Erlang
                        internode messaging, requiring a shared Erlang cookie.
                        Brokers
                        can be connected in whatever topology you arrange. Links can be one- or two-way.
                        All
                        nodes connect to all other nodes in both directions.
                        Brokers
                        can be connected in whatever topology you arrange. Links can be one- or two-way.
                        Chooses
                        Consistency and Availability from the CAP theorem.
                        Some
                        exchanges in a broker may be federated while some may be local.
                        Clustering
                        is all-or-nothing.
                        A
                        client connecting to any broker can only see queues in that broker.
                        A
                        client connecting to any node can see queues on all nodes.
sesese色