使用 RMI + ZooKeeper 實現遠程調用框架
在 Java 世界里, 有一種技術可以實現“跨虛擬機”的調用,它就是 RMI(Remote Method Invocation,遠程方法調用) 。例如,服務A 在 JVM1 中運行,服務B 在 JVM2 中運行,服務A 與 服務B 可相互進行遠程調用,就像調用本地方法一樣,這就是 RMI。在分布式系統中,我們使用 RMI 技術可輕松將 服務提供者(Service Provider)與 服務消費者(Service Consumer)進行分離,充分體現組件之間的弱耦合,系統架構更易于擴展。
本文先從通過一個最簡單的 RMI 服務與調用示例,讓讀者快速掌握 RMI 的使用方法,然后指出 RMI 的局限性,最后筆者對此問題提供了一種簡單的解決方案,即使用 ZooKeeper 輕松解決 RMI 調用過程中所涉及的問題。
1 發布 RMI 服務
發布一個 RMI 服務,我們只需做三件事情:
- 定義一個 RMI 接口
- 編寫 RMI 接口的實現類
- 通過 JNDI 發布 RMI 服務
1.1 定義一個 RMI 接口
RMI 接口實際上還是一個普通的 Java 接口, 只是 RMI 接口必須繼承 java.rmi.Remote ,此外, 每個 RMI 接口的方法必須聲明拋出一個 java.rmi.RemoteException 異常 ,就像下面這樣:
package com.king.zkrmi;
import java.rmi.Remote;
import java.rmi.RemoteException;
/**
* RMI服務接口
*/
public interface HelloService extends Remote {
String sayHello(String name) throws RemoteException;
}
繼承了 Remote 接口,實際上是讓 JVM 得知該接口是需要用于遠程調用的 , 拋出了 RemoteException 是為了讓調用 RMI 服務的程序捕獲這個異常 。畢竟遠程調用過程中,什么奇怪的事情都會發生(比如:斷網)。需要說明的是, RemoteException 是一個“受檢異常”,在調用的時候必須使用 try...catch... 自行處理 。
1.2 編寫 RMI 接口的實現類
實現以上的 HelloService 是一件非常簡單的事情, 但需要注意的是,我們必須讓實現類繼承 java.rmi.server.UnicastRemoteObject 類 ,此外, 必須提供一個構造器,并且構造器必須拋出 java.rmi.RemoteException 異常 。我們既然使用 JVM 提供的這套 RMI 框架,那么就必須按照這個要求來實現,否則是無法成功發布 RMI 服務的,一句話:我們得按規矩出牌!
package com.king.zkrmi;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
/**
* RMI服務實現
*/
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
protected HelloServiceImpl() throws RemoteException {
}
@Override
public String sayHello(String name) throws RemoteException {
return String.format("Hello %s", name);
}
}
為了滿足 RMI 框架的要求,我們確實做了很多額外的工作( 繼承了 UnicastRemoteObject 類,拋出了 RemoteException 異常 ),但這些工作阻止不了我們發布 RMI 服務的決心!我們可以 通過 JVM 提供的 JNDI(Java Naming and Directory Interface,Java 命名與目錄接口)這個 API 輕松發布 RMI 服務 。
1.3 通過 JNDI 發布 RMI 服務
發布 RMI 服務,我們需要告訴 JNDI 三個基本信息: 1. 域名或 IP 地址(host)、2. 端口號(port)、3. 服務名(service) ,它們構成了 RMI 協議的 URL(或稱為“RMI 地址”):
rmi://<host>:<port>/<service>
如果我們是在本地發布 RMI 服務,那么 host 就是“localhost”。 此外,RMI 默認的 port 是“1099” ,我們也可以自行設置 port 的值(只要不與其它端口沖突即可)。 service 實際上是一個基于同一 host 與 port 下唯一的服務名,我們不妨使用 Java 完全類名來表示吧 ,這樣也比較容易保證 RMI 地址的唯一性。
對于我們的示例而言,RMI 地址為:
rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl
我們只需簡單提供一個 main() 方法就能發布 RMI 服務,就像下面這樣:
package demo.zookeeper.rmi.server;
import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;
public class RmiServer {
public static void main(String[] args) throws Exception {
int port = 1099;
String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
LocateRegistry.createRegistry(port);
Naming.rebind(url, new HelloServiceImpl());
}
}
需要注意的是,我們 通過 LocateRegistry.createRegistry() 方法在 JNDI 中創建一個注冊表,只需提供一個 RMI 端口號即可 。此外, 通過 Naming.rebind() 方法綁定 RMI 地址與 RMI 服務實現類 ,這里使用了 rebind() 方法,它相當于先后調用 Naming 的 unbind() 與 bind() 方法,只是使用 rebind() 方法來得更加痛快而已,所以我們選擇了它。
運行這個 main() 方法,RMI 服務就會自動發布,剩下要做的就是寫一個 RMI 客戶端來調用已發布的 RMI 服務。
2 調用 RMI 服務
同樣我們也使用一個 main() 方法來調用 RMI 服務,相比發布而言,調用會更加簡單,我們只需要知道兩個東西: 1. RMI 請求路徑、2. RMI 接口(一定不需要 RMI 實現類,否則就是本地調用了) 。數行代碼就能調用剛才發布的 RMI 服務,就像下面這樣:
package demo.zookeeper.rmi.client;
import demo.zookeeper.rmi.common.HelloService;
import java.rmi.Naming;
public class RmiClient {
public static void main(String[] args) throws Exception {
String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
HelloService helloService = (HelloService) Naming.lookup(url);
String result = helloService.sayHello("Jack");
System.out.println(result);
}
}
當我們運行以上 main() 方法,在控制臺中看到“Hello Jack”輸出,就表明 RMI 調用成功。
3 RMI 服務的局限性
可見,借助 JNDI 這個所謂的命名與目錄服務,我們成功地發布并調用了 RMI 服務。實際上, JNDI 就是一個注冊表,服務端將服務對象放入到注冊表中,客戶端從注冊表中獲取服務對象。在服務端我們發布了 RMI 服務,并在 JNDI 中進行了注冊,此時就在服務端創建了一個 Skeleton(骨架),當客戶端第一次成功連接 JNDI 并獲取遠程服務對象后,立馬就在本地創建了一個 Stub(存根),遠程通信實際上是通過 Skeleton 與 Stub 來完成的,數據是基于 TCP/IP 協議,在“傳輸層”上發送的 。毋庸置疑,理論上 RMI 一定比 WebService 要快,畢竟 WebService 是基于 HTTP 的,而 HTTP 所攜帶的數據是通過“應用層”來傳輸的,傳輸層較應用層更為底層,越底層越快。
既然 RMI 比 WebService 快,使用起來也方便,那么為什么我們有時候還要用 WebService 呢?
其實原因很簡單, WebService 可以實現跨語言系統之間的調用,而 RMI 只能實現 Java 系統之間的調用 。也就是說,RMI 的跨平臺性不如 WebService 好,假如我們的系統都是用 Java 開發的,那么當然首選就是 RMI 服務了。
貌似 RMI 確實挺優秀的,除了不能跨平臺以外,還有那些問題呢?
筆者認為有兩點局限性:
- RMI 使用了 Java 默認的序列化方式,對于性能要求比較高的系統,可能需要使用其它序列化方案來解決(例如:Protobuf)。
- RMI 服務在運行時難免會存在出故障,例如,如果 RMI 服務無法連接了,就會導致客戶端無法響應的現象。
在一般的情況下,Java 默認的序列化方式確實已經足以滿足我們的要求了,如果性能方面如果不是問題的話,我們需要解決的實際上是第二點,也就是說, 讓使系統具備 HA(High Availability,高可用性) 。
4 使用 ZooKeeper 提供高可用的 RMI 服務
要想解決 RMI 服務的高可用性問題,我們需要 利用 ZooKeeper 充當一個 服務注冊表(Service Registry),讓多個 服務提供者(Service Provider)形成一個集群,讓 服務消費者(Service Consumer)通過服務注冊表獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者 。如下圖所示:
需要注意的是, 服務注冊表并不是 Load Balancer(負載均衡器),提供的不是“反向代理”服務,而是“服務注冊”與“心跳檢測”功能 。
利用服務注冊表來注冊 RMI 地址,這個很好理解,那么“心跳檢測”又如何理解呢?說白了就是通過服務中心定時向各個服務提供者發送一個請求(實際上建立的是一個 Socket 長連接),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,只會從還“活著”的服務提供者中選出一個做為當前的服務提供者。
也許讀者會考慮到,服務中心可能會出現單點故障,如果服務注冊表都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構, 必須保證服務中心也具備高可用性 。
ZooKeeper 正好能夠滿足我們上面提到的所有需求:
使用 ZooKeeper 的臨時性 ZNode 來存放服務提供者的 RMI 地址,一旦與服務提供者的 Session 中斷,會自動清除相應的 ZNode。
讓服務消費者去監聽這些 ZNode,一旦發現 ZNode 的數據(RMI 地址)有變化,就會重新獲取一份有效數據的拷貝。
ZooKeeper 與生俱來的集群能力(例如:數據同步與領導選舉特性),可以確保服務注冊表的高可用性。
4.1 服務提供者
需要編寫一個 ServiceProvider 類,來發布 RMI 服務,并將 RMI 地址注冊到 ZooKeeper 中(實際存放在 ZNode 上):
package com.king.zkrmi;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;
/**
* RMI服務提供者
*/
public class ServiceProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
// 用于等待 SyncConnected 事件觸發后繼續執行當前線程
private CountDownLatch latch = new CountDownLatch(1);
// 發布 RMI 服務并注冊 RMI 地址到 ZooKeeper 中
public void publish(Remote remote, String host, int port) {
String url = publishService(remote, host, port); // 發布 RMI 服務并返回 RMI 地址
if (url != null) {
ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象
if (zk != null) {
createNode(zk, url); // 創建 ZNode 并將 RMI 地址放入 ZNode 上
}
}
}
// 發布 RMI 服務
private String publishService(Remote remote, String host, int port) {
String url = null;
try {
url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
LocateRegistry.createRegistry(port);
Naming.rebind(url, remote);
LOGGER.debug("publish rmi service (url: {})", url);
} catch (RemoteException | MalformedURLException e) {
LOGGER.error("", e);
}
return url;
}
// 連接 ZooKeeper 服務器
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown(); // 喚醒當前正在執行的線程
}
}
});
latch.await(); // 使當前線程處于等待狀態
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}
// 創建 ZNode
private void createNode(ZooKeeper zk, String url) {
try {
byte[] data = url.getBytes();
String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 創建一個臨時性且有序的 ZNode
LOGGER.debug("create zookeeper node ({} => {})", path, url);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
}
涉及到的 Constant 常量,見如下代碼:
package com.king.zkrmi;
/**
* ZK常量
*/
public interface Constant {
String ZK_CONNECTION_STRING = "localhost:2181";
int ZK_SESSION_TIMEOUT = 5000;
String ZK_REGISTRY_PATH = "/registry";
String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
}
注意:我們首先需要使用 ZooKeeper 的客戶端工具創建一個持久性 ZNode,名為“/registry”,該節點是不存放任何數據的,可使用如下命令:
create /registry null
4.2 服務消費者
服務消費者需要在創建的時候連接 ZooKeeper,同時監聽 /registry 節點的 NodeChildrenChanged 事件 ,也就是說,一旦該節點的子節點有變化,就需要重新獲取最新的子節點。 這里提到的子節點,就是存放服務提供者發布的 RMI 地址 。需要強調的是, 這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務注冊表的 Session 中斷后,該臨時性節會被自動刪除 。
package com.king.zkrmi;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
/**
* RMI服務消費者
*/
public class ServiceConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
// 用于等待 SyncConnected 事件觸發后繼續執行當前線程
private CountDownLatch latch = new CountDownLatch(1);
// 定義一個 volatile 成員變量,用于保存最新的 RMI 地址(考慮到該變量或許會被其它線程所修改,一旦修改后,該變量的值會影響到所有線程)
private volatile List<String> urlList = new ArrayList<>();
// 構造器
public ServiceConsumer() {
ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象
if (zk != null) {
watchNode(zk); // 觀察 /registry 節點的所有子節點并更新 urlList 成員變量
}
}
// 查找 RMI 服務
public <T extends Remote> T lookup() {
T service = null;
int size = urlList.size();
if (size > 0) {
String url;
if (size == 1) {
url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素
LOGGER.debug("using only url: {}", url);
} else {
url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素
LOGGER.debug("using random url: {}", url);
}
service = lookupService(url); // 從 JNDI 中查找 RMI 服務
}
return service;
}
// 連接 ZooKeeper 服務器
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown(); // 喚醒當前正在執行的線程
}
}
});
latch.await(); // 使當前線程處于等待狀態
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}
// 觀察 /registry 節點下所有子節點是否有變化
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk); // 若子節點有變化,則重新調用該方法(為了獲取最新子節點中的數據)
}
}
});
List<String> dataList = new ArrayList<>(); // 用于存放 /registry 所有子節點中的數據
for (String node : nodeList) {
byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的數據
dataList.add(new String(data));
}
LOGGER.debug("node data: {}", dataList);
urlList = dataList; // 更新最新的 RMI 地址
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
// 在 JNDI 中查找 RMI 遠程服務對象
@SuppressWarnings("unchecked")
private <T> T lookupService(String url) {
T remote = null;
try {
remote = (T) Naming.lookup(url);
} catch (NotBoundException | MalformedURLException | RemoteException e) {
if (e instanceof ConnectException) {
// 若連接中斷,則使用 urlList 中第一個 RMI 地址來查找(這是一種簡單的重試方式,確保不會拋出異常)
LOGGER.error("ConnectException -> url: {}", url);
if (urlList.size() != 0) {
url = urlList.get(0);
return lookupService(url);
}
}
LOGGER.error("", e);
}
return remote;
}
}
4.3 發布服務
我們需要調用 ServiceProvider 的 publish() 方法來發布 RMI 服務,發布成功后也會自動在 ZooKeeper 中注冊 RMI 地址:
package com.king.zkrmi;
/**
* 服務發布
*/
public class Server {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("please using command: java Server <rmi_host> <rmi_port>");
System.exit(-1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
ServiceProvider provider = new ServiceProvider();
HelloService helloService = new HelloServiceImpl();
provider.publish(helloService, host, port);
Thread.sleep(Long.MAX_VALUE);
}
}
注意:在運行 Server 類的 main() 方法時,一定要使用命令行參數來指定 host 與 port,例如:
java Server localhost 1099
java Server localhost 2099
以上兩條 Java 命令可在本地運行兩個 Server 程序,當然也可以同時運行更多的 Server 程序,只要 port 不同就行。
4.4 調用服務
通過調用 ServiceConsumer 的 lookup() 方法來查找 RMI 遠程服務對象。我們使用一個“死循環”來模擬每隔 3 秒鐘調用一次遠程方法。
package com.king.zkrmi;
/**
* RMI客戶端
*/
public class Client {
public static void main(String[] args) throws Exception {
ServiceConsumer consumer = new ServiceConsumer();
while (true) {
HelloService helloService = consumer.lookup();
String result = helloService.sayHello("Jack");
System.out.println(result);
Thread.sleep(3000);
}
}
}
4.5 使用方法
根據以下步驟驗證 RMI 服務的高可用性:
- 運行兩個 Server 程序,一定要確保 port 是不同的。
- 運行一個 Client 程序。
- 停止其中一個 Server 程序,并觀察 Client 控制臺的變化(停止一個 Server 不會導致 Client 端調用失敗)。
- 重新啟動剛才關閉的 Server 程序,繼續觀察 Client 控制臺變化(新啟動的 Server 會加入候選)。
- 先后停止所有的 Server 程序,還是觀察 Client 控制臺變化(Client 會重試連接,多次連接失敗后,自動關閉)。
5 總結
通過本文,我們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,通過 ZooKeeper 注冊所有服務提供者發布的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案局限于 RMI 服務,對于任何形式的服務(比如:WebService),也提供了一定參考。
如果再配合 ZooKeeper 自身的集群,那才是一個相對完美的解決方案,對于 ZooKeeper 的集群,請讀者自行實踐。
由于筆者水平有限,對于描述有誤之處,還請各位讀者提出建議,并期待更加優秀的解決方案。
6 Zookeeper + RMI 時序圖
來自: http://my.oschina.net/xianggao/blog/645015