RabbitMQ的使用
1.首先我從官方下載了rabbitmq-server
http://www.rabbitmq.com/download.html
當然公司還是以java開發為主,因此下載的版本是基于linux下面的,不過我再這里下載一個ubuntu版本的,因為我開發的機子就是ubuntu.
下載好后,安裝很快,ubuntu下直接dpkg -i rabbitmq-server_3.5.0-1_all.deb
當然其他linux系統可以下載tar.gz包解壓,但是還需要安裝erlang.解壓后可以執行以下命令就可以啟動rabbitmq了
解壓的目錄sbin下找到rabbitmq-server運行即可,當然這里不需要詳細說安裝了,可以參考官方文檔進行安裝.
http://www.rabbitmq.com/install-generic-unix.html
當然,為了我們能可視化查看rabbitmq-server的運行情況和滿足其他開發需求我們可以啟用插件
rabbitmq-plugins enables,這樣我們可以打開http://127.0.0.1:15672/
使用guest,guest登陸后可以看到http管理界面.
2.理解rabbitmq的工作原理
第一步安裝好服務后,接下來我們就要進行生產消息和消費消息了.
但是要開發必須要理解rabbitmq的工作原理.當然如果我長篇大論記下來估計要記錄幾十頁,因此如果英語比較好的可以先看看官方文檔.
http://www.rabbitmq.com/getstarted.html
當然如果英語實在是渣的話,不妨參考下面的鏈接:
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
看完這些文檔之后,估計會對rabbitmq理解的差不多了.
當然網上也有很多人寫的博客,都可以參照的.
3.java客戶端開發
當然按照上述文檔我們可以開發出自己需要的客戶端,但是這里我主要介紹如何使用spring-amqp來進行開發.
首先我們需要引入spring-amqp包,為了能查看調試信息,我們也引入log4j
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency> 引入后就要進行開發了,當然spring-amqp如何配置這里不想長篇大論,因此這里只介紹兩種發送方式
異步發送消息
其實這種方式是很容易的,大多數網上文章都是基于配置文件的,但是如果我們使用代碼來創建,那么更加會理解spring-amqp的設計原理的,能使用代碼創建也就更加能使用配置文件了
CachingConnectionFactory factory=new CachingConnectionFactory(); //創建鏈接工廠,使用帶緩存功能的工廠可以基于Channel創建多個Channel來處理
//設置rabbitmq-server鏈接地址
factory.setAddresses("localhost:5672");
//設置登陸用戶名,可以在rabbit-server http頁面創建用戶
factory.setUsername("guest");
//設置登陸密碼(在本機上可以不用設置)
factory.setPassword("guest");
//啟動channel的個數并緩存
factory.setChannelCacheSize(channelSize); //創建admin進行隊列綁定,exchange綁定 RabbitAdmin admin=new RabbitAdmin(factory);
//創建名字為qingting-queue的隊列,durable是否是持久化消息到服務端
Queue queue=new Queue("qingting-queue",durable);
//此時會在rabbitmq-servers上創建一個queue
admin.declareQueue(queue);
//創建exchange并進行訂閱到服務端,這里創建的是topic類型的exchange,也就是路由key可以是正則表達式
Exchange exchange=new TopicExchange("qingting-exchange",durable,autoDelete);
//rabbitmq-server上會創建一個名稱交exchangeName的exchange,同時類型為topic,而且特征有D表示durable
admin.declareExchange(exchange); /**
* 使用routeKey將隊列綁定到exchange上面,這個時候,rabbitmq-server上的queue和exchange會有一個通過routeKey的映射關系
*/ admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("qingting.*").noargs());
//創建一個Template來進行發消息,也可以收消息等操作
template = new RabbitTemplate(factory);
//這里討論發消息,我們通過exchange來進行發消息,同時設置routeKey,這時候因為是topic類型的exchange,通過
比對設置的routeKey跟上面綁定的bindRouteKey做正則匹配,如果匹配上了就會把消息推送到queueName的這個隊列
template.setExchange("qingting-exchange");
template.setRoutingKey("qingting.route.key");
//設置將消息轉換為json類型,當然自己可以設置其他的轉換器或者自己實現
template.setMessageConverter(new Jackson2JsonMessageConverter()); 有了上面配置后,那么我們可以進行發送同步消息了
template.convertAndSend();
因為上述設置了消息轉換器,因此最好使用改方法,當然還有template.send()方法其實使用該方法確實結果Jackson2JsonMessageConverter將消息內容體處理成json并轉換為字節數組,
同時還會將消息的屬性配置中conentType設置為application/json 并且head中設置一些消息內容的對象類型等信息,之所以這樣做是因為,消費者接收消息,也可以設置json轉換器直接將拿接收的消息屬性中head的值將消息體內容進行轉換成對象,可以在rabbitmq管理頁面看到發送的消息會附帶如下信息
Exchange: qingting-exchange //上述template設置的exchange
Routing Key: qingting.route.key //template設置的routeKey
Properties
headers:
__KeyTypeId__:java.lang.Object //轉json附帶的信息,方便轉換
__TypeId__:java.util.HashMap
__ContentTypeId__:java.lang.Object
content_encoding:UTF-8
content_type:application/json
Payload
{"username":"81a40c08-5a01-46fa-93c3-981fcccb3c8c","index":0} 因此可以看到http頁面上在綁定的隊列會有一條消息到綁定的隊列,因為這時候沒有消費者訂閱隊列,我們設置的是持久化durable,因此會一一直留在server中