RabbitMQ的使用
1.首先我從官方下載了rabbitmq-server
http://www.rabbitmq.com/download.html
當然公司還是以java開發為主,因此下載的版本是基于linux下面的,不過我再這里下載一個ubuntu版本的,因為我開發的機子就是ubuntu.
下載好后,安裝很快,ubuntu下直接dpkg -i rabbitmq-server_3.5.0-1_all.deb
當然其他linux系統可以下載tar.gz包解壓,但是還需要安裝erlang.解壓后可以執行以下命令就可以啟動rabbitmq了
解壓的目錄sbin下找到rabbitmq-server運行即可,當然這里不需要詳細說安裝了,可以參考官方文檔進行安裝.
http://www.rabbitmq.com/install-generic-unix.html
當然,為了我們能可視化查看rabbitmq-server的運行情況和滿足其他開發需求我們可以啟用插件
rabbitmq-plugins enables,這樣我們可以打開http://127.0.0.1:15672/
使用guest,guest登陸后可以看到http管理界面.
2.理解rabbitmq的工作原理
第一步安裝好服務后,接下來我們就要進行生產消息和消費消息了.
但是要開發必須要理解rabbitmq的工作原理.當然如果我長篇大論記下來估計要記錄幾十頁,因此如果英語比較好的可以先看看官方文檔.
http://www.rabbitmq.com/getstarted.html
當然如果英語實在是渣的話,不妨參考下面的鏈接:
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
看完這些文檔之后,估計會對rabbitmq理解的差不多了.
當然網上也有很多人寫的博客,都可以參照的.
3.java客戶端開發
當然按照上述文檔我們可以開發出自己需要的客戶端,但是這里我主要介紹如何使用spring-amqp來進行開發.
首先我們需要引入spring-amqp包,為了能查看調試信息,我們也引入log4j
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring-rabbit.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
引入后就要進行開發了,當然spring-amqp如何配置這里不想長篇大論,因此這里只介紹兩種發送方式
異步發送消息
其實這種方式是很容易的,大多數網上文章都是基于配置文件的,但是如果我們使用代碼來創建,那么更加會理解spring-amqp的設計原理的,能使用代碼創建也就更加能使用配置文件了
CachingConnectionFactory factory=new CachingConnectionFactory(); //創建鏈接工廠,使用帶緩存功能的工廠可以基于Channel創建多個Channel來處理 //設置rabbitmq-server鏈接地址 factory.setAddresses("localhost:5672"); //設置登陸用戶名,可以在rabbit-server http頁面創建用戶 factory.setUsername("guest"); //設置登陸密碼(在本機上可以不用設置) factory.setPassword("guest"); //啟動channel的個數并緩存 factory.setChannelCacheSize(channelSize); //創建admin進行隊列綁定,exchange綁定 RabbitAdmin admin=new RabbitAdmin(factory); //創建名字為qingting-queue的隊列,durable是否是持久化消息到服務端 Queue queue=new Queue("qingting-queue",durable); //此時會在rabbitmq-servers上創建一個queue admin.declareQueue(queue); //創建exchange并進行訂閱到服務端,這里創建的是topic類型的exchange,也就是路由key可以是正則表達式 Exchange exchange=new TopicExchange("qingting-exchange",durable,autoDelete); //rabbitmq-server上會創建一個名稱交exchangeName的exchange,同時類型為topic,而且特征有D表示durable admin.declareExchange(exchange); /** * 使用routeKey將隊列綁定到exchange上面,這個時候,rabbitmq-server上的queue和exchange會有一個通過routeKey的映射關系 */ admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("qingting.*").noargs()); //創建一個Template來進行發消息,也可以收消息等操作 template = new RabbitTemplate(factory); //這里討論發消息,我們通過exchange來進行發消息,同時設置routeKey,這時候因為是topic類型的exchange,通過 比對設置的routeKey跟上面綁定的bindRouteKey做正則匹配,如果匹配上了就會把消息推送到queueName的這個隊列 template.setExchange("qingting-exchange"); template.setRoutingKey("qingting.route.key"); //設置將消息轉換為json類型,當然自己可以設置其他的轉換器或者自己實現 template.setMessageConverter(new Jackson2JsonMessageConverter());
有了上面配置后,那么我們可以進行發送同步消息了
template.convertAndSend();
因為上述設置了消息轉換器,因此最好使用改方法,當然還有template.send()方法其實使用該方法確實結果Jackson2JsonMessageConverter將消息內容體處理成json并轉換為字節數組,
同時還會將消息的屬性配置中conentType設置為application/json 并且head中設置一些消息內容的對象類型等信息,之所以這樣做是因為,消費者接收消息,也可以設置json轉換器直接將拿接收的消息屬性中head的值將消息體內容進行轉換成對象,可以在rabbitmq管理頁面看到發送的消息會附帶如下信息
Exchange: qingting-exchange //上述template設置的exchange Routing Key: qingting.route.key //template設置的routeKey Properties headers: __KeyTypeId__:java.lang.Object //轉json附帶的信息,方便轉換 __TypeId__:java.util.HashMap __ContentTypeId__:java.lang.Object content_encoding:UTF-8 content_type:application/json Payload {"username":"81a40c08-5a01-46fa-93c3-981fcccb3c8c","index":0}
因此可以看到http頁面上在綁定的隊列會有一條消息到綁定的隊列,因為這時候沒有消費者訂閱隊列,我們設置的是持久化durable,因此會一一直留在server中