RabbitMQ入門之安裝配置與簡單實例

jopen 10年前發布 | 23K 次閱讀 RabbitMQ 消息系統

簡介


   由于某些原因,今天接觸了一下一個新的東西RabbitMQ( http://www.rabbitmq.com/)總的來說給人的感覺就是安裝簡單方便,同時功能強大。而且官網也給出了幾個相當實用的例子, 不管關于消息隊列的持久化卻并沒有提及,關于持久化的問題我會在后面的文章中再詳細說明。不過在天朝想要直接訪問RabbitMQ官網有些困難,所以建議 還是安裝一下fangqiang工具goagent 在Windows和Linux下都可以使用,具體信息還是自己看官網https://code.google.com/p/goagent/


RabbitMQ的主要特點

    支持一對多方式

        多個Queue綁定到一個Exchange后,通過向Exchange發送消息,就可將信息轉發到多個綁定到Exchange的Queue中,

    消息持久化

        如果對消息進行了持久話處理,那么消息隊列都將保存到服務器中,即使RabbitMQ服務器停止,下一次啟動消息依然存在

    消息序列一致性

        每個消費者對自己的Queue操作,由于Queue是消息是隊列形式保存,所以可以保證綁定到同一Exchange的消息隊列的信息序列是一致的

    狀態一致性

        對于消費者可以通過設置信息分發方式,讓消費者每次只從隊列種取出一條信息,操作完成并確認后才發送下一條信息,當操作出現異常如宕機,未完成的消息依然保存在服務器,可以保證在下一次消費者程序啟動后可以從上一次操作未完成的位置繼續執行。

 

 

Linux下安裝RabbitMQ



   rabbitMQ是一個消息中間件,負責消息的接受和傳遞。openstack中貌似也是使用rabbitMQ作為消息中間件,這也是我們選擇rabbitMQ的主要原因。保持一致性嘛。

   關于rabbitMQ的安裝這里使用APT的安裝方式,只要網絡比較好,安裝起來還是很快的

   首先添加一下內容到 /etc/apt/sources.list中


1
</td>

deb http: //www .rabbitmq.com /debian/ testing main
</div> </td> </tr> </tbody> </table> </div> </div>

為了避免安裝時出現錯誤我們需要將rabbitMQ的公鑰添加到我們的信任列表中


  • 1

    2
    </td>

    wget http: //www .rabbitmq.com /rabbitmq-signing-key-public .asc

    sudo apt-key add rabbitmq-signing-key-public.asc
    </div> </td> </tr> </tbody> </table> </div> </div> 直接運行 apt-get update </div>

    最后直接安裝即可


    1
    </td>

    sudo apt-get install rabbitmq-server
    </div> </td> </tr> </tbody> </table> </div> </div> 通過這種方式安裝可以避免繁瑣的配置方式,對于需要快速了解rabbitMQ的人,可以避免安裝過程產生的各種麻煩事情。 </div>

    安裝完成后Rabbit服務器將自動啟動,RabbitMQ提供了一些簡單實用的命令用于管理服務器運行狀態,以及查詢服務器狀態信息

    下面說幾個比較常用的,熟悉這些命令對之后分析服務器狀態,以及持久化內容都有一定幫組。RabbitMQ的命令都需要在管理員權限下執行

    查看服務器運行狀態: rabbitmq-server status

    啟動服務器:rabbitmq-server start

    停止服務器:rabbitmq-server stop


    查看服務器中所有的消息隊列信息 :rabbitmqctl list_queues

    查看服務器種所有的路由信息: rabbitmqctl list_exchanges

    查看服務器種所有的路由與消息隊列綁定信息 :rabbitmq list_bindings




    RabbitMQ 之Hello World

         

        rabbitMQ作為一個消息服務中間件負責接受消息和轉發,下面給出一個簡單的例子用來說明這種工作方式。

    一個簡單的實現主要包含3種角色生產者(Producing),消息隊列(queue),消費者(consuming)。當然rabbitMQ 的核心并不在于此,rabbitMQ核心主要包含Exchange和queue,消息持久化操作也是通過這兩個核心來完成的。后面文章會有詳細說明,現在 先看一下簡單的消息隊列實現的例子。

    RabbitMQ入門之安裝配置與簡單實例


        生產者 Send.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Send {

     //消息隊列名稱
     private final static String QUEUE_NAME= "hello" ;
    
     public static void main(String[] args)  throws java.io.IOException{
    
         //創建鏈接工程
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost( "localhost" );
         //創建鏈接
         Connection connection = factory.newConnection();
    
         //創建消息通道
         Channel channel = connection.createChannel();
    
         //生命一個消息隊列
         channel.queueDeclare(QUEUE_NAME, false , false , false , null );
    
         String message = "Hello World" ;
    
         //發布消息,第一個參數表示路由(Exchange名稱),未""則表示使用默認消息路由
         channel.basicPublish( "" , QUEUE_NAME, null , message.getBytes());
    
         System.out.println( " [x] Sent '" +message+ "'" );
    
        //關閉消息通道和鏈接
         channel.close();
         connection.close();
    
     }
    
    

    }</pre>
    </div> </div>     消費者Recv.java </div>

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;

    public class Recv {

     //消息隊列名稱
     private final static String QUEUE_NAME= "hello" ;
    
     public static void main(String[] args)  throws java.io.IOException,java.lang.InterruptedException{
    
         //創建鏈接工廠
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost( "localhost" );
         //創建鏈接
         Connection connection = factory.newConnection();
    
         //創建消息信道
         Channel channel = connection.createChannel();
    
         //生命消息隊列
         channel.queueDeclare(QUEUE_NAME, false , false , false , null );
         System.out.println( "[*] Waiting for message. To exist press CTRL+C" );
    
         //消費者用于獲取消息信道綁定的消息隊列中的信息
         QueueingConsumer consumer = new QueueingConsumer(channel);
    
         channel.basicConsume(QUEUE_NAME, true ,consumer);
    
         while ( true ){
    
             //循環獲取消息隊列中的信息
             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
             String message = new String(delivery.getBody());
             System.out.println( "[x] Received '" +message+ "'" );
    
         }
    
     }
    
    

    }</pre>
    </span></div> </div> </div>     首先運行Send.java 發送消息到服務器種,再運行Recv.java即可獲得消息隊列中的信息。 </div>     這里并沒有做消息的持久化操作,由于某些原因在程序運行過程中可能出現服務器宕機,以及其他外在因素使程序運行出現出錯,消息的持久化就是使服務 器即使停止后下一次啟動依然能保持上一次操作消息隊列的狀態。可以保證程序依然能從異常開始的地方重新執行。也就是為什么要對消息進行持久化的主要原因

     本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
     轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
     本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
  • sesese色