RabbitMQ的使用

jopen 9年前發布 | 42K 次閱讀 RabbitMQ 消息系統

1.首先我從官方下載了rabbitmq-server

http://www.rabbitmq.com/download.html

當然公司還是以java開發為主,因此下載的版本是基于linux下面的,不過我再這里下載一個ubuntu版本的,因為我開發的機子就是ubuntu.

下載好后,安裝很快,ubuntu下直接dpkg -i rabbitmq-server_3.5.0-1_all.deb

當然其他linux系統可以下載tar.gz包解壓,但是還需要安裝erlang.解壓后可以執行以下命令就可以啟動rabbitmq了

解壓的目錄sbin下找到rabbitmq-server運行即可,當然這里不需要詳細說安裝了,可以參考官方文檔進行安裝.

http://www.rabbitmq.com/install-generic-unix.html

當然,為了我們能可視化查看rabbitmq-server的運行情況和滿足其他開發需求我們可以啟用插件

rabbitmq-plugins enables,這樣我們可以打開http://127.0.0.1:15672/

使用guest,guest登陸后可以看到http管理界面.

2.理解rabbitmq的工作原理

第一步安裝好服務后,接下來我們就要進行生產消息和消費消息了.

但是要開發必須要理解rabbitmq的工作原理.當然如果我長篇大論記下來估計要記錄幾十頁,因此如果英語比較好的可以先看看官方文檔.

http://www.rabbitmq.com/getstarted.html

當然如果英語實在是渣的話,不妨參考下面的鏈接:

http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

看完這些文檔之后,估計會對rabbitmq理解的差不多了.

當然網上也有很多人寫的博客,都可以參照的.

3.java客戶端開發

當然按照上述文檔我們可以開發出自己需要的客戶端,但是這里我主要介紹如何使用spring-amqp來進行開發.

首先我們需要引入spring-amqp包,為了能查看調試信息,我們也引入log4j

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>${spring-rabbit.version}</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

引入后就要進行開發了,當然spring-amqp如何配置這里不想長篇大論,因此這里只介紹兩種發送方式

異步發送消息

其實這種方式是很容易的,大多數網上文章都是基于配置文件的,但是如果我們使用代碼來創建,那么更加會理解spring-amqp的設計原理的,能使用代碼創建也就更加能使用配置文件了

CachingConnectionFactory factory=new CachingConnectionFactory(); //創建鏈接工廠,使用帶緩存功能的工廠可以基于Channel創建多個Channel來處理

//設置rabbitmq-server鏈接地址
factory.setAddresses("localhost:5672");

//設置登陸用戶名,可以在rabbit-server http頁面創建用戶
factory.setUsername("guest");

//設置登陸密碼(在本機上可以不用設置)
factory.setPassword("guest");

//啟動channel的個數并緩存
factory.setChannelCacheSize(channelSize); //創建admin進行隊列綁定,exchange綁定 RabbitAdmin admin=new RabbitAdmin(factory);

//創建名字為qingting-queue的隊列,durable是否是持久化消息到服務端
Queue queue=new Queue("qingting-queue",durable);

//此時會在rabbitmq-servers上創建一個queue
admin.declareQueue(queue);

//創建exchange并進行訂閱到服務端,這里創建的是topic類型的exchange,也就是路由key可以是正則表達式
Exchange exchange=new TopicExchange("qingting-exchange",durable,autoDelete);

//rabbitmq-server上會創建一個名稱交exchangeName的exchange,同時類型為topic,而且特征有D表示durable
admin.declareExchange(exchange); /**
* 使用routeKey將隊列綁定到exchange上面,這個時候,rabbitmq-server上的queue和exchange會有一個通過routeKey的映射關系
*/ admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("qingting.*").noargs());

//創建一個Template來進行發消息,也可以收消息等操作
template = new RabbitTemplate(factory);

//這里討論發消息,我們通過exchange來進行發消息,同時設置routeKey,這時候因為是topic類型的exchange,通過
比對設置的routeKey跟上面綁定的bindRouteKey做正則匹配,如果匹配上了就會把消息推送到queueName的這個隊列
template.setExchange("qingting-exchange");
template.setRoutingKey("qingting.route.key");

//設置將消息轉換為json類型,當然自己可以設置其他的轉換器或者自己實現
template.setMessageConverter(new Jackson2JsonMessageConverter()); 

有了上面配置后,那么我們可以進行發送同步消息了

template.convertAndSend();

因為上述設置了消息轉換器,因此最好使用改方法,當然還有template.send()方法其實使用該方法確實結果Jackson2JsonMessageConverter將消息內容體處理成json并轉換為字節數組,

同時還會將消息的屬性配置中conentType設置為application/json 并且head中設置一些消息內容的對象類型等信息,之所以這樣做是因為,消費者接收消息,也可以設置json轉換器直接將拿接收的消息屬性中head的值將消息體內容進行轉換成對象,可以在rabbitmq管理頁面看到發送的消息會附帶如下信息

Exchange:        qingting-exchange  //上述template設置的exchange
Routing Key:    qingting.route.key       //template設置的routeKey
Properties
    headers:
        __KeyTypeId__:java.lang.Object     //轉json附帶的信息,方便轉換
        __TypeId__:java.util.HashMap
        __ContentTypeId__:java.lang.Object
    content_encoding:UTF-8
    content_type:application/json       
Payload
    {"username":"81a40c08-5a01-46fa-93c3-981fcccb3c8c","index":0}

    因此可以看到http頁面上在綁定的隊列會有一條消息到綁定的隊列,因為這時候沒有消費者訂閱隊列,我們設置的是持久化durable,因此會一一直留在server中

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!