RabbitMQ之RPC實現

qyof1847 7年前發布 | 39K 次閱讀 RPC RabbitMQ 消息系統

什么是RPC?

RPC是指遠程過程調用,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,由于不在一個內存空間,不能直接調用,需要通過網絡來表達調用的語義和傳達調用的數據。

為什么RPC呢?就是無法在一個進程內,甚至一個計算機內通過本地調用的方式完成的需求,比如比如不同的系統間的通訊,甚至不同的組織間的通訊。由于計算能力需要橫向擴展,需要在多臺機器組成的集群上部署應用,

RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。

RabbitMQ怎么實現RPC調用?

Callback Queue

一般在RabbitMQ中做RPC是很簡單的。客戶端發送請求消息,服務器回復響應的消息。為了接受響應的消息,我們需要在請求消息中發送一個回調隊列。可以使用默認的隊列(which is exclusive in the java client.):

callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue",props,message.getBytes());
// then code to read a response message from the callback_queue...

Message properties

AMQP協議為消息預定義了一組14個屬性。

private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;

大部分的屬性是很少使用的。除了以下幾種(其余有興趣可以自行查看):

  • deliveryMode: 標記消息傳遞模式,2-消息持久化,其他值-瞬態。
  • contentType:內容類型,用于描述編碼的mime-type. 例如經常為該屬性設置JSON編碼。
  • replyTo:應答,通用的回調隊列名稱,
  • correlationId:關聯ID,方便RPC相應與請求關聯。

Correlation Id

在上述方法中為每個RPC請求創建一個回調隊列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端創建一個單一的回調隊列。

新的問題被提出,隊列收到一條回復消息,但是不清楚是那條請求的回復。這是就需要使用correlationId屬性了。我們要為每個請求設置唯一的值。然后,在回調隊列中獲取消息,查看這個屬性,關聯response和request就是基于這個屬性值的。如果我們看到一個未知的correlationId屬性值的消息,可以放心的無視它——它不是我們發送的請求。

你可能問道,為什么要忽略回調隊列中未知的信息,而不是當作一個失敗?這是由于在服務器端競爭條件的導致的。雖然不太可能,但是如果RPC服務器在發送給我們結果后,發送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的消息。如果發生了這種情況,重啟RPC服務器將會重新處理該請求。這就是為什么在客戶端必須很好的處理重復響應,RPC應該是冪等的。

Summary

RPC的處理流程:

  1. 當客戶端啟動時,創建一個匿名的回調隊列。
  2. 客戶端為RPC請求設置2個屬性:replyTo,設置回調隊列名字;correlationId,標記request。
  3. 請求被發送到rpc_queue隊列中。
  4. RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理并且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
  5. 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

Demo Code

這里采用官網的一個例子來說明,RPC客戶端通過RPC調用服務器來計算斐波那契額值。

首先是服務端的代碼:

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String args[]) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(RabbitConfig.ip);
    factory.setPort(RabbitConfig.port);
    factory.setUsername(RabbitConfig.username);
    factory.setPassword(RabbitConfig.password);

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    System.out.println(" [x] Awaiting RPC requests");

    while(true){
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        BasicProperties props = delivery.getProperties();
        BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
        String message = new String(delivery.getBody());
        int n = Integer.parseInt(message);
        System.out.println(" [.] fib("+message+")");
        String repsonse = ""+fib(n);
        channel.basicPublish("", props.getReplyTo(), replyProps, repsonse.getBytes());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

}</code></pre>

RPC客戶端:

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

public RPCClient() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(RabbitConfig.ip);
    factory.setPort(RabbitConfig.port);
    factory.setUsername(RabbitConfig.username);
    factory.setPassword(RabbitConfig.password);

    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true,consumer);
}

public String call(String message) throws IOException,
        ShutdownSignalException, ConsumerCancelledException,
        InterruptedException {
    String response = null;
    String corrId = UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties.Builder()
            .correlationId(corrId)
            .replyTo(replyQueueName)
            .build();
    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while(true){
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if(delivery.getProperties().getCorrelationId().equals(corrId)){
            response = new String(delivery.getBody());
            break;
        }
    }

    return response;
}

public void close() throws Exception{
    connection.close();
}

public static void main(String args[]) throws Exception{
    RPCClient fibRpc = new RPCClient();
    System.out.println(" [x] Requesting fib(30)");
    String response = fibRpc.call("30");
    System.out.println(" [.] Got '"+response+"'");
    fibRpc.close();

}

}</code></pre>

參考資料

  1. Remote procedure call (RPC)
  2. 輕松搞定RabbitMQ(七)——遠程過程調用RPC

 

來自:http://blog.csdn.net/u013256816/article/details/55218595

 

 本文由用戶 qyof1847 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!