RabbitMQ之RPC實現
什么是RPC?
RPC是指遠程過程調用,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,由于不在一個內存空間,不能直接調用,需要通過網絡來表達調用的語義和傳達調用的數據。
為什么RPC呢?就是無法在一個進程內,甚至一個計算機內通過本地調用的方式完成的需求,比如比如不同的系統間的通訊,甚至不同的組織間的通訊。由于計算能力需要橫向擴展,需要在多臺機器組成的集群上部署應用,
RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。
RabbitMQ怎么實現RPC調用?
Callback Queue
一般在RabbitMQ中做RPC是很簡單的。客戶端發送請求消息,服務器回復響應的消息。為了接受響應的消息,我們需要在請求消息中發送一個回調隊列。可以使用默認的隊列(which is exclusive in the java client.):
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue",props,message.getBytes());
// then code to read a response message from the callback_queue...
Message properties
AMQP協議為消息預定義了一組14個屬性。
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
大部分的屬性是很少使用的。除了以下幾種(其余有興趣可以自行查看):
- deliveryMode: 標記消息傳遞模式,2-消息持久化,其他值-瞬態。
- contentType:內容類型,用于描述編碼的mime-type. 例如經常為該屬性設置JSON編碼。
- replyTo:應答,通用的回調隊列名稱,
- correlationId:關聯ID,方便RPC相應與請求關聯。
Correlation Id
在上述方法中為每個RPC請求創建一個回調隊列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端創建一個單一的回調隊列。
新的問題被提出,隊列收到一條回復消息,但是不清楚是那條請求的回復。這是就需要使用correlationId屬性了。我們要為每個請求設置唯一的值。然后,在回調隊列中獲取消息,查看這個屬性,關聯response和request就是基于這個屬性值的。如果我們看到一個未知的correlationId屬性值的消息,可以放心的無視它——它不是我們發送的請求。
你可能問道,為什么要忽略回調隊列中未知的信息,而不是當作一個失敗?這是由于在服務器端競爭條件的導致的。雖然不太可能,但是如果RPC服務器在發送給我們結果后,發送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的消息。如果發生了這種情況,重啟RPC服務器將會重新處理該請求。這就是為什么在客戶端必須很好的處理重復響應,RPC應該是冪等的。
Summary
RPC的處理流程:
- 當客戶端啟動時,創建一個匿名的回調隊列。
- 客戶端為RPC請求設置2個屬性:replyTo,設置回調隊列名字;correlationId,標記request。
- 請求被發送到rpc_queue隊列中。
- RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理并且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
- 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。
Demo Code
這里采用官網的一個例子來說明,RPC客戶端通過RPC調用服務器來計算斐波那契額值。
首先是服務端的代碼:
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String args[]) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConfig.ip);
factory.setPort(RabbitConfig.port);
factory.setUsername(RabbitConfig.username);
factory.setPassword(RabbitConfig.password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib("+message+")");
String repsonse = ""+fib(n);
channel.basicPublish("", props.getReplyTo(), replyProps, repsonse.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
}</code></pre>
RPC客戶端:
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConfig.ip);
factory.setPort(RabbitConfig.port);
factory.setUsername(RabbitConfig.username);
factory.setPassword(RabbitConfig.password);
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true,consumer);
}
public String call(String message) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if(delivery.getProperties().getCorrelationId().equals(corrId)){
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception{
connection.close();
}
public static void main(String args[]) throws Exception{
RPCClient fibRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibRpc.call("30");
System.out.println(" [.] Got '"+response+"'");
fibRpc.close();
}
}</code></pre>
參考資料
來自:http://blog.csdn.net/u013256816/article/details/55218595