RabbitMQ消息隊列入門篇(環境配置+Java實例+基礎概念)

jopen 8年前發布 | 84K 次閱讀 RabbitMQ 消息系統

一、消息隊列使用場景或者其好處

消息隊列一般是在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。

在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息隊列在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。消息隊列可以解決這樣一個問題,也就是其解耦性。解耦伴隨的好處就是降低冗余,靈活,易于擴展。

峰值處理能力:當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不同尋常的水平。在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。
消息隊列還有可恢復性、異步通信、緩沖………等各種好處,在此不一一介紹,用到自然理解。

二、RabbitMQ來源

RabbitMQ是用Erlang實現的一個高并發高可靠AMQP消息隊列服務器。

顯然,RabbitMQ跟Erlang和AMQP有關。下面簡單介紹一下Erlang和AMQP。

Erlang是一門動態類型的函數式編程語言,它也是一門解釋型語言,由Erlang虛擬機解釋執行。從語言模型上說,Erlang是基于Actor模型的實現。在Actor模型里面,萬物皆Actor,每個Actor都封裝著內部狀態,Actor相互之間只能通過消息傳遞這一種方式來進行通信。對應到Erlang里,每個Actor對應著一個Erlang進程,進程之間通過消息傳遞進行通信。相比共享內存,進程間通過消息傳遞來通信帶來的直接好處就是消除了直接的鎖開銷(不考慮Erlang虛擬機底層實現中的鎖應用)。

AMQP(Advanced Message Queue Protocol)定義了一種消息系統規范。這個規范描述了在一個分布式的系統中各個子系統如何通過消息交互。而RabbitMQ則是AMQP的一種基于erlang的實現。AMQP將分布式系統中各個子系統隔離開來,子系統之間不再有依賴。子系統僅依賴于消息。子系統不關心消息的發送者,也不關心消息的接受者。

這里不必要對Erlang和AMQP作過于深入介紹,畢竟本文RabbitMQ才是主角哦,哈哈。下面直接看主角表演(實例)啦,至于主角的一些不得不深入介紹的點我們放到最后面。

三、RabbitMQ實例(Java)

3.1、環境配置

RabbitMQ的運行需要erlang的支持,因此我們先安裝erlang。
32位下載地址:http://www.erlang.org/download/otp_win64_18.2.1.exe
64位下載地址:http://www.erlang.org/download/otp_win32_18.2.1.exe
雙擊選擇默認安裝就好。

前面我們也講到RabbitMQ就是一個服務器,下面我們就安裝對應服務器。
下載地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe
雙擊選擇默認安裝就好,安裝好之后需要啟動服務,cmd,進入到安裝目錄的sbin文件夾下,命令如下:

cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.4\sbin
rabbitmq-server start

這里寫圖片描述

博主的之前啟動過了,所以報錯,如果你的也啟動了就沒問題了。

接下來自然是jar包依賴,本文工程采用eclipse + maven,maven依賴如下:

<!-- rabbitmq相關依賴 -->
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.0.4</version>
</dependency>
<!-- 序列化相關依賴 -->
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>2.6</version>
</dependency>

因為后續例子里面有用到序列化的,因此加上序列化工具包相關依賴。

3.2、例子一代碼和效果

新建發送者Send.java,代碼如下:

package com.luo.rabbit.test.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    //隊列名稱 
    private final static String QUEUE_NAME = "queue";  

    public static void main(String[] argv) throws java.io.IOException  
    {  
        /** * 創建連接連接到MabbitMQ */  
        ConnectionFactory factory = new ConnectionFactory();  
        //設置MabbitMQ所在主機ip或者主機名 
        factory.setHost("127.0.0.1"); 
        //創建一個連接 
        Connection connection = factory.newConnection();  
        //創建一個頻道 
        Channel channel = connection.createChannel();  
        //指定一個隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //發送的消息 
        String message = "hello world!";  
        //往隊列中發出一條消息 
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println("Sent '" + message + "'");  
        //關閉頻道和連接 
        channel.close();  
        connection.close();  
     }  
}

新建接收者Recv.java,代碼如下:

package com.luo.rabbit.test.one;

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.QueueingConsumer; 

public class Recv  { 
    //隊列名稱  
    private final static String QUEUE_NAME = "queue"; 
    public static void main(String[] argv) throws java.io.IOException,
    java.lang.InterruptedException  
    {  
        //打開連接和創建頻道,與發送端一樣  
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名  
        factory.setHost("127.0.0.1"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
        System.out.println("Waiting for messages. To exit press CTRL+C"); 

        //創建隊列消費者  
        QueueingConsumer consumer = new QueueingConsumer(channel); 
        //指定消費隊列  
        channel.basicConsume(QUEUE_NAME, true, consumer); 
        while (true)  
        {  
            //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
            String message = new String(delivery.getBody()); 
            System.out.println("Received '" + message + "'");
        }  

    }  
} 

分別運行這兩個類,先后順序沒有關系,先運行發送者再運行接收者,效果如下:

這里寫圖片描述

這里寫圖片描述

3.3、例子二代碼和效果

例子一可能通俗易懂,但是并不是很規范,而且有些缺陷,比如我要發送一個對象過去呢?下面看另外一個例子:

首先建一個連接類,因為發送者和接收者的連接代碼都是一樣的,之后讓二者繼承這個連接類即可。連接類代碼BaseConnector.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class BaseConnector {
    protected Channel channel;
    protected Connection connection;
    protected String queueName;
    public BaseConnector(String queueName) throws IOException{
        this.queueName = queueName;
        //打開連接和創建頻道
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名 127.0.0.1即localhost
        factory.setHost("127.0.0.1");
        //創建連接 
        connection = factory.newConnection();
        //創建頻道 
        channel = connection.createChannel();
        //聲明創建隊列
        channel.queueDeclare(queueName, false, false, false, null);
    }
}

發送者Sender.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;

public class Sender extends BaseConnector {
    public Sender(String queueName) throws IOException {
        super(queueName);
    }

    public void sendMessage(Serializable object) throws IOException {
        channel.basicPublish("",queueName, null, SerializationUtils.serialize(object));
    }   
}

前面講過,我們想發送一個對象給接受者,因此,我們先新建一個對象,因為發送過程需要序列化,因此這里需要實現java.io.Serializable接口:

package com.luo.rabbit.test.two;

import java.io.Serializable;

public class MessageInfo implements Serializable {
    private static final long serialVersionUID = 1L;
    //渠道
    private String channel;
    //來源
    private String content;
    public String getChannel() {
        return channel;
    }
    public void setChannel(String channel) {
        this.channel = channel;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
}

關于序列化,這里小寶鴿就再嘮叨兩句,序列化就是將一個對象的狀態(各個屬性量)保存起來,然后在適當的時候再獲得。序列化分為兩大部分:序列化和反序列化。序列化是這個過程的第一部分,將數據分解成字節流,以便存儲在文件中或在網絡上傳輸。反序列化就是打開字節流并重構對象。對象序列化不僅要將基本數據類型轉換成字節表示,有時還要恢復數據。恢復數據要求有恢復數據的對象實例。

接收者代碼Receiver.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Receiver extends BaseConnector implements Runnable, Consumer {

    public Receiver(String queueName) throws IOException {
        super(queueName);
    }

    //實現Runnable的run方法
    public void run() {
         try {
            channel.basicConsume(queueName, true,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /** * 下面這些方法都是實現Consumer接口的 **/    
    //當消費者注冊完成自動調用
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer "+consumerTag +" registered");
    }
    //當消費者接收到消息會自動調用
    public void handleDelivery(String consumerTag, Envelope env,
                BasicProperties props, byte[] body) throws IOException {
        MessageInfo messageInfo = (MessageInfo)SerializationUtils.deserialize(body);
        System.out.println("Message ( "
                + "channel : " + messageInfo.getChannel() 
                + " , content : " + messageInfo.getContent() 
                + " ) received.");

    }
    //下面這些方法可以暫時不用理會
    public void handleCancelOk(String consumerTag) {
    }
    public void handleCancel(String consumerTag) throws IOException {
    }
    public void handleShutdownSignal(String consumerTag,
            ShutdownSignalException sig) {
    }
    public void handleRecoverOk(String consumerTag) {
    }
}

這里,接收者實現了,Runnable接口和com.rabbitmq.client.Consumer接口。

實現Runnable接口的目的是為了實現多線程,java實現多線程的方式有兩種:一種是繼承Thread類,一種是實現Runnable接口。詳情請看這篇文章:http://developer.51cto.com/art/201203/321042.htm

實現Consumer接口的目的是什么呢?猿友們應有看到實例一中的接收者代碼:

//指定消費隊列 
channel.basicConsume(QUEUE_NAME, true, consumer);

最后一個參數是需要傳遞com.rabbitmq.client.Consumer參數的,實現了Consumer接口之后我們只需要傳遞this就好了。另外,Consumer有很多方法,上面代碼除了構造方法和run方法(run是實現Runnable接口的),其他都是實現Consumer接口的,這些方法的具體含義,大家可以直接看com.rabbitmq.client.Consumer源碼。

接下來就是測試類了Test.java:

package com.luo.rabbit.test.two;

public class Test {
    public static void main(String[] args) throws Exception{
        Receiver receiver = new Receiver("testQueue");
        Thread receiverThread = new Thread(receiver);
        receiverThread.start();
        Sender sender = new Sender("testQueue");
        for (int i = 0; i < 5; i++) {
            MessageInfo messageInfo = new MessageInfo();
            messageInfo.setChannel("test");
            messageInfo.setContent("msg" + i);
            sender.sendMessage(messageInfo);
        }
    }
}

運行效果:

這里寫圖片描述

記得運行完成之后一定要把進程關掉,不然你每運行一次Test.java就會開啟一個進程,之后會出現什么問題呢?我是十分建議大家試試,會有驚喜哦,哈哈,驚喜就是,發送的消息會平均(數量平均)的出現到各個接收者的控制臺。不妨將發送的數量改大一點試試。

四、RabbitMQ使用的道具的具體介紹

RabbitMQ是用Erlang實現的一個高并發高可靠AMQP消息隊列服務器。

Erlang就是RabbitMQ的一個依賴環境,這里沒什么好說的。我們更加關注它的一身表演技巧哪里來的,這里就看AMQP吧,看完AMQP之后估計你會對RabbitMQ的理解更加深刻。

開始吧
AMQP當中有四個概念非常重要:虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。如果這就夠了,那現在就可以開始了。

交換機,隊列,還有綁定……天哪!
剛開始我思維的列車就是在這里脫軌的…… 這些鬼東西怎么結合起來的?

隊列(Queues)是你的消息(messages)的終點,可以理解成裝消息的容器。消息就一直在里面,直到有客戶端(也就是消費者,Consumer)連接到這個隊列并且將其取走為止。不過。你可以將一個隊列配置成這樣的:一旦消息進入這個隊列,biu~,它就煙消云散了。這個有點跑題了……

需要記住的是,隊列是由消費者(Consumer)通過程序建立的,不是通過配置文件或者命令行工具。這沒什么問題,如果一個消費者試圖創建一個已經存在的隊列,RabbitMQ就會起來拍拍他的腦袋,笑一笑,然后忽略這個請求。因此你可以將消息隊列的配置寫在應用程序的代碼里面。這個概念不錯。

OK,你已經創建并且連接到了你的隊列,你的消費者程序正在百無聊賴的敲著手指等待消息的到來,敲啊,敲啊…… 沒有消息。發生了什么?你當然需要先把一個消息放進隊列才行。不過要做這個,你需要一個交換機(Exchange)……

交換機可以理解成具有路由表的路由程序,僅此而已。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes),例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列當中去。先不討論這個,我們有點超前了。

你的消費者程序要負責創建你的交換機們(復數)。啥?你是說你可以有多個交換機?是的,這個可以有,不過為啥?很簡單,每個交換機在自己獨立的進程當中執行,因此增加多個交換機就是增加多個進程,可以充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,可以創建5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的集群當中,你可以用類似的思路來擴展交換機一邊獲取更高的吞吐量。

OK,你已經創建了一個交換機。但是他并不知道要把消息送到哪個隊列。你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:將交換機“desert(沙漠)”當中具有路由鍵“阿里巴巴”的消息送到隊列“hideout(山洞)”里面去。換句話說,一個綁定就是一個基于路由鍵將交換機和隊列連接起來的路由規則。例如,具有路由鍵“audit”的消息需要被送到兩個隊列,“log-forever”和“alert-the-big-dude”。要做到這個,就需要創建兩個綁定,每個都連接一個交換機和一個隊列,兩者都是由“audit”路由鍵觸發。在這種情況下,交換機會復制一份消息并且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

現在復雜的東西來了:交換機有多種類型。他們都是做路由的,不過接受不同類型的綁定。為什么不創建一種交換機來處理所有類型的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個“topic”類型的交換機試圖將消息的路由鍵與類似“dogs.*”的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類型的交換機)要消耗更多的CPU。如果你不需要“topic”類型的交換機帶來的靈活性,你可以通過使用“direct”類型的交換機獲取更高的處理效率。那么有哪些類型,他們又是怎么處理的呢?

上面這些都是參考另外一篇文章的,http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/,當然這篇文章的實例是Python的,但是我們不看他的實例,只看他吹水的那部分,哈哈。

五、源碼工程下載

http://download.csdn.net/detail/u013142781/9396830

小寶鴿向來有個壞習慣,即便博客里面已經將全部代碼貼出來了,還是會提供源碼工程供大家下載,哈哈。

有些時候有些猿友經常會問,寫一篇博客很花時間吧,我不能假裝跟你說不花時間。雖然花時間,但是當你看到方向,看到了目標,可以將自己學習的東西分享出來,你就會很有動力了,根本停不下來。

本博客自己查資料,建實例驗證,動手寫博客,約花了8個小時左右吧。不過當我了解到RabbitMQ的博大精深,這些時間都不是事,歡迎關注,雖然剛畢業半年,但小寶鴿會繼續將工作中遇到的技術點分享給大家。

來自: http://blog.csdn.net//u013142781/article/details/50487028

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!