RabbitMQ與java、Spring結合實例詳細講解
林炳文Evankaka原創作品。轉載請注明出處http://blog.csdn.net/evankaka
摘要:本文介紹了rabbitMq,提供了如何在Ubuntu下安裝RabbitMQ 服務的方法。最好以RabbitMQ與java、Spring結合的兩個實例來演示如何使用RabbitMQ。
一、rabbitMQ簡介
1.1、rabbitMQ的優點(適用范圍)
- 基于erlang語言開發具有高可用高并發的優點,適合集群服務器。
- 健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全。
- 有消息確認機制和持久化機制,可靠性高。
- 開源
其他MQ的優勢: - Apache ActiveMQ曝光率最高,但是可能會丟消息。
- ZeroMQ延遲很低、支持靈活拓撲,但是不支持消息持久化和崩潰恢復。</p>
1.2、幾個概念說明
producer&Consumer
producer指的是消息生產者,consumer消息的消費者。
Queue
消息隊列,提供了FIFO的處理機制,具有緩存消息的能力。rabbitmq中,隊列消息可以設置為持久化,臨時或者自動刪除。
設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
設置為臨時隊列,queue中的數據在系統重啟之后就會丟失
設置為自動刪除的隊列,當不存在用戶連接到server,隊列中的數據會被自動刪除ExchangeExchange類似于數據通信網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別:
Direct
直接交換器,工作方式類似于單播,Exchange會將消息發送完全匹配ROUTING_KEY的Queue
fanout
廣播是式交換器,不管消息的ROUTING_KEY設置為什么,Exchange都會將消息轉發給所有綁定的Queue。
topic
主題交換器,工作方式類似于組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列,比如,ROUTING_KEY為user.stock的Message會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
headers
消息體的header匹配(ignore)
Binding
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
virtual host
在rabbitmq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當于物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。producer和consumer連接rabbit server需要指定一個vhost。1.3、消息隊列的使用過程
- 客戶端連接到消息隊列服務器,打開一個channel。
- 客戶端聲明一個exchange,并設置相關屬性。
- 客戶端聲明一個queue,并設置相關屬性。
- 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
- 客戶端投遞消息到exchange。
- exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里
</p>
二、環境配置與安裝
1、Erlang環境安裝
RabbitMQ是基于Erlang的,所以首先必須配置Erlang環境。
從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。然后:
$ tar xvzf otp_src_R14B03.tar.gz $ cd otp_src_R14B03 $ ./configure
編譯后的輸出
如下圖:
注:
可能會報錯 configure: error: No curses library functions found
configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts
原因是缺少ncurses包
解決:在ubuntu系統下
apt-cache search ncurses apt-get install libncurses5-dev
然后重新執行
./configure
提示沒有wxWidgets和fop、ssh、odbc、ssl,但是問題不大。繼續:
make
然后:
sudo make install
配置erlang環境變量
修改/etc/profile文件,增加下面的環境變量:(vim profile i插入 編輯完畢ESC退出 wq!強制修改)
#set erlang environment export PATH=$PATH:/usr/erlang/bin:$PATH source profile使得文件生效
下面是我的
2、RabbitMQ-Server安裝
安裝完Erlang,開始安裝RabbitMQ-Server。安裝方法有三種,這里筆者三者都試過了,就只有以下這個方法成功了。
直接使用:
apt-get install rabbitmq-server
安裝完成后會自動打開:
使用命令查看rabbitmq運行狀態:
rabbitmqctl status
停止
rabbitmqctl stop
開啟
rabbitmq-server start
3、rabbitmq web管理頁面插件安裝
輸入以下命令
cd /usr/lib/rabbitmq/bin/ rabbitmq-plugins enable rabbitmq_management
這里筆者一直安裝不成功。
如果安裝成功打開瀏覽器,輸入 http://[server-name]:15672/ 如 http://localhost:15672/ ,會要求輸入用戶名和密碼,用默認的guest/guest即可(guest/guest用戶只能從localhost地址登錄,如果要配置遠程登錄,必須另創建用戶)。
如果要從遠程登錄怎么做呢?處于安全考慮,guest這個默認的用戶只能通過http://localhost:15672來登錄,其他的IP無法直接用這個guest帳號。這里我們可以通過配置文件來實現從遠程登錄管理界面,只要編輯/etc/rabbitmq/rabbitmq.config文件(沒有就新增),添加以下配置就可以了。4、添加用戶
vim /etc/rabbitmq/rabbitmq.config
然后添加
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ].
注意上面有個點號
現在添加了一個新授權用戶asdf,可以遠程使用這個用戶名。記得要先用命令添加這個命令才行:
cd /usr/lib/rabbitmq/bin/
#用戶名與密碼</span>
sudo rabbitmqctl add_user asdf 123456
用戶設置為administrator才能遠程訪問</span>
sudo rabbitmqctl set_user_tags asdf administrator sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"
其實也可以通過管理平臺頁面直接添加用戶和密碼等信息。如果還不能遠程訪問或遠程登錄檢查是不是5672, 15672端口沒有開放!!!!!!</span>
5、開放端口
ufw allow 5672
三、簡單Java實例
</div>下面來演示一個使用java的簡單實例:1、首先是消息生產者和提供者的基類package com.lin;
import java.io.IOException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
/**
- 功能概要: EndPoint類型的隊列
- @author linbingwen
@since 2016年1月11日 */ public abstract class EndPoint{
protected Channel channel; protected Connection connection; protected String endPointName;
public EndPoint(String endpointName) throws IOException{
this.endPointName = endpointName; //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("10.75.4.25"); factory.setPort(5672); factory.setUsername("asdf"); factory.setPassword("123456"); //getting a connection connection = factory.newConnection(); //creating a channel channel = connection.createChannel(); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null);
}
/** * 關閉channel和connection。并非必須,因為隱含是自動調用的。 * @throws IOException */ public void close() throws IOException{ this.channel.close(); this.connection.close(); }
}</pre>
2、消息提供者 </div>package com.lin.producer; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; import com.lin.EndPoint; /** * * 功能概要:消息生產者 * * @author linbingwen * @since 2016年1月11日 */ public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
3、消息消費者package com.lin.consumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.lin.EndPoint; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * * 功能概要:讀取隊列的程序端,實現了Runnable接口 * * @author linbingwen * @since 2016年1月11日 */ public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} }
4、測試package com.lin.test; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import com.lin.consumer.QueueConsumer; import com.lin.producer.Producer; public class Test { public Test() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 1000000; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception{ new Test(); } }
其中引入的jar包:<!-- rabbitmq客戶端 --> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> </dependencies>
測試結果:在提供消息在消費消息
然后同時打開rabbitmq的服務端,輸入如下:rabbitmqctl list_queues
這個命令是用來查看服務端中有多處個消息隊列的。可以看到有個名為queue的消息隊列(更好的方法是安裝好web監控插件,筆者一直安裝失敗,所以這里就不展示了)
四、Rbbitmq與Spring結合使用
首先建立一個maven工程,整個項目的結構如下:
下面將具體來講講整個過程1、jar包的引入pom.xml配置即可,如下:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lin</groupId> <artifactId>rabbit_c2</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <!-- spring版本號 --> <spring.version>3.2.8.RELEASE</spring.version> <!-- log4j日志文件管理包版本 --> <slf4j.version>1.6.6</slf4j.version> <log4j.version>1.2.12</log4j.version> <!-- junit版本號 --> <junit.version>4.10</junit.version> </properties> <dependencies> <!-- 添加Spring依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <!--單元測試依賴 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- 日志文件管理包 --> <!-- log start --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <!-- log end --> <!--spring單元測試依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <!--rabbitmq依賴 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>javax.validation</groupId> <artifactId>validation-api</artifactId> <version>1.1.0.Final</version> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>5.0.1.Final</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/resources</directory> <targetPath>${basedir}/target/classes</targetPath> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>true</filtering> </resource> <resource> <directory>src/main/resources</directory> <targetPath>${basedir}/target/resources</targetPath> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <version>2.1.1</version> <configuration> <warSourceExcludes>${warExcludes}</warSourceExcludes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.4.3</version> <configuration> <testFailureIgnore>true</testFailureIgnore> </configuration> </plugin> <plugin> <inherited>true</inherited> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2、消息生產者package com.lin.producer; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; /** * 功能概要:消息產生,提交到隊列中去 * * @author linbingwen * @since 2016年1月15日 */ @Service public class MessageProducer { private Logger logger = LoggerFactory.getLogger(MessageProducer.class); @Resource private AmqpTemplate amqpTemplate; public void sendMessage(Object message){ logger.info("to send message:{}",message); amqpTemplate.convertAndSend("queueTestKey",message); } }
3、消息消費者package com.lin.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * 功能概要:消費接收 * * @author linbingwen * @since 2016年1月15日 */ public class MessageConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { logger.info("receive message:{}",message); } }
4、rabbitMq.xml配置信息<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!--配置connection-factory,指定連接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" username="asdf" password="123456" host="10.75.4.25" port="5672" /> <!--定義rabbit template用于數據的接收和發送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" /> <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定義queue --> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" /> <!-- 定義direct exchange,綁定queueTest --> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="messageReceiver" class="com.lin.consumer.MessageConsumer"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageReceiver"/> </rabbit:listener-container> </beans>
5、spring集成rabbiqMq。application.xml內容如下:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitmq.xml" /> <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods并把所注釋的注冊為Spring Beans --> <context:component-scan base-package="com.lin.consumer,com.lin.producer" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
6、最后,為了方便,打印了日志,log4j.properties配置如下log4j.rootLogger=DEBUG,Console,Stdout #Console log4j.appender.Console=org.apache.log4j.ConsoleAppender log4j.appender.Console.layout=org.apache.log4j.PatternLayout log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n log4j.logger.java.sql.ResultSet=INFO log4j.logger.org.apache=INFO log4j.logger.java.sql.Connection=DEBUG log4j.logger.java.sql.Statement=DEBUG log4j.logger.java.sql.PreparedStatement=DEBUG log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender log4j.appender.Stdout.File = E://logs/log.log log4j.appender.Stdout.Append = true log4j.appender.Stdout.Threshold = DEBUG log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
接著運行整個工程即可:
下面是運行的結果:
一會發一會收:因為在同一工程,所以發消息和接消息是交替出現的
我們出可以去rabbitMq 服務器上看:可以看到,我們配置的隊列已存在了:
到此,整個工程結束。
本文由用戶 80301983 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!相關經驗
相關資訊