RabbitMQ與java、Spring結合實例詳細講解

80301983 8年前發布 | 460K 次閱讀 RabbitMQ 消息系統

         林炳文Evankaka原創作品。轉載請注明出處http://blog.csdn.net/evankaka

         摘要:本文介紹了rabbitMq,提供了如何在Ubuntu下安裝RabbitMQ 服務的方法。最好以RabbitMQ與java、Spring結合的兩個實例來演示如何使用RabbitMQ。

本文工程免費下載


一、rabbitMQ簡介

1.1、rabbitMQ的優點(適用范圍)

  1. 基于erlang語言開發具有高可用高并發的優點,適合集群服務器。
  2. 健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全。
  3. 有消息確認機制和持久化機制,可靠性高。
  4. 開源
    其他MQ的優勢:
  5. Apache ActiveMQ曝光率最高,但是可能會丟消息。
  6. ZeroMQ延遲很低、支持靈活拓撲,但是不支持消息持久化和崩潰恢復。</p>

    1.2、幾個概念說明
    producer&Consumer
    producer指的是消息生產者,consumer消息的消費者。
    Queue
    消息隊列,提供了FIFO的處理機制,具有緩存消息的能力。rabbitmq中,隊列消息可以設置為持久化,臨時或者自動刪除。
    設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
    設置為臨時隊列,queue中的數據在系統重啟之后就會丟失
    設置為自動刪除的隊列,當不存在用戶連接到server,隊列中的數據會被自動刪除Exchange

    Exchange類似于數據通信網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
    Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別:
    Direct
    直接交換器,工作方式類似于單播,Exchange會將消息發送完全匹配ROUTING_KEY的Queue
    fanout
    廣播是式交換器,不管消息的ROUTING_KEY設置為什么,Exchange都會將消息轉發給所有綁定的Queue。
    topic
    主題交換器,工作方式類似于組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列,比如,ROUTING_KEY為user.stock的Message會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
    headers
    消息體的header匹配(ignore)
    Binding
    所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
    virtual host
    在rabbitmq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當于物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。producer和consumer連接rabbit server需要指定一個vhost。

    1.3、消息隊列的使用過程

  7. 客戶端連接到消息隊列服務器,打開一個channel。
  8. 客戶端聲明一個exchange,并設置相關屬性。
  9. 客戶端聲明一個queue,并設置相關屬性。
  10. 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
  11. 客戶端投遞消息到exchange。
  12. exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里
    </p>


    二、環境配置與安裝

    1、Erlang環境安裝
    RabbitMQ是基于Erlang的,所以首先必須配置Erlang環境。
    從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。然后:

    $ tar xvzf otp_src_R14B03.tar.gz
    $ cd otp_src_R14B03
    $ ./configure
    編譯后的輸出
    如下圖:


    注:
    可能會報錯 configure: error: No curses library functions found
    configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts


    原因是缺少ncurses包

    解決:在ubuntu系統下
    apt-cache search ncurses
    apt-get install libncurses5-dev

    然后重新執行

    ./configure

    提示沒有wxWidgets和fop、ssh、odbc、ssl,但是問題不大。繼續:

    make

    然后:

    sudo make install


    配置erlang環境變量
    修改/etc/profile文件,增加下面的環境變量:(vim profile i插入 編輯完畢ESC退出 wq!強制修改)

    #set erlang environment
    export PATH=$PATH:/usr/erlang/bin:$PATH
    source profile使得文件生效

    下面是我的


    2、RabbitMQ-Server安裝

    安裝完Erlang,開始安裝RabbitMQ-Server。安裝方法有三種,這里筆者三者都試過了,就只有以下這個方法成功了。

    直接使用:

    apt-get  install rabbitmq-server

    安裝完成后會自動打開:

    使用命令查看rabbitmq運行狀態:

    rabbitmqctl status


    停止

    rabbitmqctl stop

    開啟

    rabbitmq-server start

    3、rabbitmq web管理頁面插件安裝

    輸入以下命令

    cd /usr/lib/rabbitmq/bin/
    rabbitmq-plugins enable rabbitmq_management

    這里筆者一直安裝不成功。


    如果安裝成功打開瀏覽器,輸入 http://[server-name]:15672/ 如 http://localhost:15672/ ,會要求輸入用戶名和密碼,用默認的guest/guest即可(guest/guest用戶只能從localhost地址登錄,如果要配置遠程登錄,必須另創建用戶)。
    如果要從遠程登錄怎么做呢?處于安全考慮,guest這個默認的用戶只能通過http://localhost:15672來登錄,其他的IP無法直接用這個guest帳號。這里我們可以通過配置文件來實現從遠程登錄管理界面,只要編輯/etc/rabbitmq/rabbitmq.config文件(沒有就新增),添加以下配置就可以了。

    4、添加用戶

    vim /etc/rabbitmq/rabbitmq.config
    然后添加
    [ 
    {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} 
    ].
    注意上面有個點號

    現在添加了一個新授權用戶asdf,可以遠程使用這個用戶名。記得要先用命令添加這個命令才行:
    cd /usr/lib/rabbitmq/bin/
    #用戶名與密碼</span>
    sudo rabbitmqctl add_user asdf 123456
    用戶設置為administrator才能遠程訪問</span>
    sudo rabbitmqctl set_user_tags asdf administrator 
    sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"

    其實也可以通過管理平臺頁面直接添加用戶和密碼等信息。如果還不能遠程訪問或遠程登錄檢查是不是5672, 15672端口沒有開放!!!!!!</span>

    5、開放端口

    ufw allow 5672


    三、簡單Java實例

    </div>
    下面來演示一個使用java的簡單實例:
    1、首先是消息生產者和提供者的基類
    package com.lin;

import java.io.IOException;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

/**

  • 功能概要: EndPoint類型的隊列
  • @author linbingwen
  • @since 2016年1月11日 */ public abstract class EndPoint{

    protected Channel channel; protected Connection connection; protected String endPointName;

    public EndPoint(String endpointName) throws IOException{

      this.endPointName = endpointName;
    
      //Create a connection factory
      ConnectionFactory factory = new ConnectionFactory();
    
      //hostname of your rabbitmq server
      factory.setHost("10.75.4.25");
      factory.setPort(5672);
      factory.setUsername("asdf");
      factory.setPassword("123456");
    
      //getting a connection
      connection = factory.newConnection();
    
      //creating a channel
      channel = connection.createChannel();
    
      //declaring a queue for this channel. If queue does not exist,
      //it will be created on the server.
      channel.queueDeclare(endpointName, false, false, false, null);
    

    }

/**
 * 關閉channel和connection。并非必須,因為隱含是自動調用的。
 * @throws IOException
 */
 public void close() throws IOException{
     this.channel.close();
     this.connection.close();
 }

}</pre>
2、消息提供者 </div>

package com.lin.producer;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;

import com.lin.EndPoint;


/**
 * 
 * 功能概要:消息生產者
 * 
 * @author linbingwen
 * @since  2016年1月11日
 */
public class Producer extends EndPoint{

    public Producer(String endPointName) throws IOException{
        super(endPointName);
    }

    public void sendMessage(Serializable object) throws IOException {
        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
    }  
}

3、消息消費者
package com.lin.consumer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.lin.EndPoint;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;


/**
 * 
 * 功能概要:讀取隊列的程序端,實現了Runnable接口
 * 
 * @author linbingwen
 * @since  2016年1月11日
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer{

    public QueueConsumer(String endPointName) throws IOException{
        super(endPointName);       
    }

    public void run() {
        try {
            //start consuming messages. Auto acknowledge messages.
            channel.basicConsume(endPointName, true,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * Called when consumer is registered.
     */
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer "+consumerTag +" registered");    
    }

    /**
     * Called when new message is available.
     */
    public void handleDelivery(String consumerTag, Envelope env,
            BasicProperties props, byte[] body) throws IOException {
        Map map = (HashMap)SerializationUtils.deserialize(body);
        System.out.println("Message Number "+ map.get("message number") + " received.");

    }

    public void handleCancel(String consumerTag) {}
    public void handleCancelOk(String consumerTag) {}
    public void handleRecoverOk(String consumerTag) {}
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

4、測試
package com.lin.test;

import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;

import com.lin.consumer.QueueConsumer;
import com.lin.producer.Producer;

public class Test {
    public Test() throws Exception{

        QueueConsumer consumer = new QueueConsumer("queue");
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        Producer producer = new Producer("queue");

        for (int i = 0; i < 1000000; i++) {
            HashMap message = new HashMap();
            message.put("message number", i);
            producer.sendMessage(message);
            System.out.println("Message Number "+ i +" sent.");
        }
    }

    /**
     * @param args
     * @throws SQLException
     * @throws IOException
     */
    public static void main(String[] args) throws Exception{
      new Test();
    }
}
其中引入的jar包:
<!-- rabbitmq客戶端 -->
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
        </dependency>

    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.1</version>
    </dependency>
    </dependencies>

測試結果:
在提供消息
在消費消息
然后同時打開rabbitmq的服務端,輸入如下:
rabbitmqctl list_queues
這個命令是用來查看服務端中有多處個消息隊列的。
可以看到有個名為queue的消息隊列(更好的方法是安裝好web監控插件,筆者一直安裝失敗,所以這里就不展示了)


四、Rbbitmq與Spring結合使用

首先建立一個maven工程,整個項目的結構如下:

下面將具體來講講整個過程
1、jar包的引入
pom.xml配置即可,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lin</groupId>
    <artifactId>rabbit_c2</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <!-- spring版本號 -->
        <spring.version>3.2.8.RELEASE</spring.version>
        <!-- log4j日志文件管理包版本 -->
        <slf4j.version>1.6.6</slf4j.version>
        <log4j.version>1.2.12</log4j.version>
        <!-- junit版本號 -->
        <junit.version>4.10</junit.version>
    </properties>

    <dependencies>
        <!-- 添加Spring依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!--單元測試依賴 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- 日志文件管理包 -->
        <!-- log start -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- log end -->

        <!--spring單元測試依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>

        <!--rabbitmq依賴 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>javax.validation</groupId>
            <artifactId>validation-api</artifactId>
            <version>1.1.0.Final</version>
        </dependency>

        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>5.0.1.Final</version>
        </dependency>

    </dependencies>
    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <targetPath>${basedir}/target/classes</targetPath>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <targetPath>${basedir}/target/resources</targetPath>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>true</filtering>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>2.1.1</version>
                <configuration>
                    <warSourceExcludes>${warExcludes}</warSourceExcludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <testFailureIgnore>true</testFailureIgnore>
                </configuration>
            </plugin>
            <plugin>
                <inherited>true</inherited>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2、消息生產者
package com.lin.producer;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

/**
 * 功能概要:消息產生,提交到隊列中去
 * 
 * @author linbingwen
 * @since  2016年1月15日 
 */
@Service
public class MessageProducer {

    private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message){
      logger.info("to send message:{}",message);
      amqpTemplate.convertAndSend("queueTestKey",message);
    }
}
3、消息消費者
package com.lin.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
 * 功能概要:消費接收
 * 
 * @author linbingwen
 * @since  2016年1月15日 
 */
public class MessageConsumer implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @Override
    public void onMessage(Message message) {
        logger.info("receive message:{}",message);
    }

}

4、rabbitMq.xml配置信息
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <!--配置connection-factory,指定連接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory"
        username="asdf" password="123456" host="10.75.4.25" port="5672" />

    <!--定義rabbit template用于數據的接收和發送 -->
    <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory" 
        exchange="exchangeTest" />

    <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定義queue -->
    <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />

    <!-- 定義direct exchange,綁定queueTest -->
    <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="com.lin.consumer.MessageConsumer"></bean>

    <!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
    <rabbit:listener-container connection-factory="connectionFactory">
             <rabbit:listener queues="queueTest" ref="messageReceiver"/>
    </rabbit:listener-container>

</beans>

5、spring集成rabbiqMq。application.xml內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">


    <import resource="classpath*:rabbitmq.xml" />


    <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods并把所注釋的注冊為Spring Beans -->
    <context:component-scan base-package="com.lin.consumer,com.lin.producer" />


    <!-- 激活annotation功能 -->
    <context:annotation-config />
    <!-- 激活annotation功能 -->
    <context:spring-configured />

</beans>

6、最后,為了方便,打印了日志,log4j.properties配置如下
log4j.rootLogger=DEBUG,Console,Stdout

#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n

log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG 

log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender  
log4j.appender.Stdout.File = E://logs/log.log  
log4j.appender.Stdout.Append = true  
log4j.appender.Stdout.Threshold = DEBUG   
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout  
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

接著運行整個工程即可:
下面是運行的結果:

一會發一會收:因為在同一工程,所以發消息和接消息是交替出現的


我們出可以去rabbitMq 服務器上看:
可以看到,我們配置的隊列已存在了:

到此,整個工程結束。

 本文由用戶 80301983 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!