使用 RMI + ZooKeeper 實現遠程調用框架
在 Java 世界里,有一種技術可以實現“跨虛擬機”的調用,它就是 RMI
(Remote Method Invocation,遠程方法調用)。例如,服務A 在 JVM1 中運行,服務B 在 JVM2 中運行,服務A 與 服務B 可相互進行遠程調用,就像調用本地方法一樣,這就是 RMI。在分布式系統中,我們使用 RMI 技術可輕松將 服務提供者
(Service Provider)與 服務消費者
(Service Consumer)進行分離,充分體現組件之間的弱耦合,系統架構更易于擴展。
本文先從通過一個最簡單的 RMI 服務與調用示例,讓讀者快速掌握 RMI 的使用方法,然后指出 RMI 的局限性,最后筆者對此問題提供了一種簡單的解決方案,即使用 ZooKeeper 輕松解決 RMI 調用過程中所涉及的問題。
下面我們就從一個最簡單的 RMI 示例開始吧!
1 發布 RMI 服務
發布一個 RMI 服務,我們只需做三件事情:
- 定義一個 RMI 接口
- 編寫 RMI 接口的實現類
- 通過 JNDI 發布 RMI 服務
1.1 定義一個 RMI 接口
RMI 接口實際上還是一個普通的 Java 接口,只是 RMI 接口必須繼承 java.rmi.Remote
,此外,每個 RMI 接口的方法必須聲明拋出一個 java.rmi.RemoteException
異常,就像下面這樣:
package demo.zookeeper.remoting.common;import java.rmi.Remote; import java.rmi.RemoteException;
public interface HelloService extends Remote {
String sayHello(String name) throws RemoteException;
}</pre>
繼承了
Remote
接口,實際上是讓 JVM 得知該接口是需要用于遠程調用的,拋出了RemoteException
是為了讓調用 RMI 服務的程序捕獲這個異常。畢竟遠程調用過程中,什么奇怪的事情都會發生(比如:斷網)。需要說明的是,RemoteException 是一個“受檢異常”,在調用的時候必須使用try...catch...
自行處理。1.2 編寫 RMI 接口的實現類
實現以上的
HelloService
是一件非常簡單的事情,但需要注意的是,我們必須讓實現類繼承java.rmi.server.UnicastRemoteObject
類,此外,必須提供一個構造器,并且構造器必須拋出java.rmi.RemoteException
異常。我們既然使用 JVM 提供的這套 RMI 框架,那么就必須按照這個要求來實現,否則是無法成功發布 RMI 服務的,一句話:我們得按規矩出牌!package demo.zookeeper.remoting.server;import demo.zookeeper.remoting.common.HelloService; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject;
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
protected HelloServiceImpl() throws RemoteException { } @Override public String sayHello(String name) throws RemoteException { return String.format("Hello %s", name); }
}</pre>
為了滿足 RMI 框架的要求,我們確實做了很多額外的工作(繼承了
UnicastRemoteObject
類,拋出了RemoteException
異常),但這些工作阻止不了我們發布 RMI 服務的決心!我們可以通過 JVM 提供的 JNDI(Java Naming and Directory Interface,Java 命名與目錄接口)這個 API 輕松發布 RMI 服務。1.3 通過 JNDI 發布 RMI 服務
發布 RMI 服務,我們需要告訴 JNDI 三個基本信息:1. 域名或 IP 地址(host)、2. 端口號(port)、3. 服務名(service),它們構成了 RMI 協議的 URL(或稱為“RMI 地址”):
rmi://<host>:<port>/<service>如果我們是在本地發布 RMI 服務,那么
host
就是“localhost”。此外,RMI 默認的port
是“1099”,我們也可以自行設置 port 的值(只要不與其它端口沖突即可)。service
實際上是一個基于同一 host 與 port 下唯一的服務名,我們不妨使用 Java 完全類名來表示吧,這樣也比較容易保證 RMI 地址的唯一性。對于我們的示例而言,RMI 地址為:
rmi://localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl我們只需簡單提供一個 main() 方法就能發布 RMI 服務,就像下面這樣:
package demo.zookeeper.remoting.server;import java.rmi.Naming; import java.rmi.registry.LocateRegistry;
public class RmiServer {
public static void main(String[] args) throws Exception { int port = 1099; String url = "rmi://localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl"; LocateRegistry.createRegistry(port); Naming.rebind(url, new HelloServiceImpl()); }
}</pre>
需要注意的是,我們通過
LocateRegistry.createRegistry()
方法在 JNDI 中創建一個注冊表,只需提供一個 RMI 端口號即可。此外,通過Naming.rebind()
方法綁定 RMI 地址與 RMI 服務實現類,這里使用了rebind()
方法,它相當于先后調用 Naming 的unbind()
與bind()
方法,只是使用 rebind() 方法來得更加痛快而已,所以我們選擇了它。運行這個 main() 方法,RMI 服務就會自動發布,剩下要做的就是寫一個 RMI 客戶端來調用已發布的 RMI 服務。
2 調用 RMI 服務
同樣我們也使用一個 main() 方法來調用 RMI 服務,相比發布而言,調用會更加簡單,我們只需要知道兩個東西:1. RMI 請求路徑、2. RMI 接口(一定不需要 RMI 實現類,否則就是本地調用了)。數行代碼就能調用剛才發布的 RMI 服務,就像下面這樣:
package demo.zookeeper.remoting.client;import demo.zookeeper.remoting.common.HelloService; import java.rmi.Naming;
public class RmiClient {
public static void main(String[] args) throws Exception { String url = "rmi://localhost:1099/demo.zookeeper.remoting.server.HelloServiceImpl"; HelloService helloService = (HelloService) Naming.lookup(url); String result = helloService.sayHello("Jack"); System.out.println(result); }
}</pre>
當我們運行以上 main() 方法,在控制臺中看到“Hello Jack”輸出,就表明 RMI 調用成功。
3 RMI 服務的局限性
可見,借助 JNDI 這個所謂的命名與目錄服務,我們成功地發布并調用了 RMI 服務。實際上,JNDI 就是一個注冊表,服務端將服務對象放入到注冊表中,客戶端從注冊表中獲取服務對象。在服務端我們發布了 RMI 服務,并在 JNDI 中進行了注冊,此時就在服務端創建了一個
Skeleton
(骨架),當客戶端第一次成功連接 JNDI 并獲取遠程服務對象后,立馬就在本地創建了一個Stub
(存根),遠程通信實際上是通過 Skeleton 與 Stub 來完成的,數據是基于 TCP/IP 協議,在“傳輸層”上發送的。毋庸置疑,理論上 RMI 一定比 WebService 要快,畢竟 WebService 是基于 HTTP 的,而 HTTP 所攜帶的數據是通過“應用層”來傳輸的,傳輸層較應用層更為底層,越底層越快。既然 RMI 比 WebService 快,使用起來也方便,那么為什么我們有時候還要用 WebService 呢?
其實原因很簡單,WebService 可以實現跨語言系統之間的調用,而 RMI 只能實現 Java 系統之間的調用。也就是說,RMI 的跨平臺性不如 WebService 好,假如我們的系統都是用 Java 開發的,那么當然首選就是 RMI 服務了。
貌似 RMI 確實挺優秀的,除了不能跨平臺以外,還有那些問題呢?
筆者認為有兩點局限性:
- RMI 使用了 Java 默認的序列化方式,對于性能要求比較高的系統,可能需要使用其它序列化方案來解決(例如:Proto Buffer)。
- RMI 服務在運行時難免會存在出故障,例如,如果 RMI 服務無法連接了,就會導致客戶端無法響應的現象。
在一般的情況下,Java 默認的序列化方式確實已經足以滿足我們的要求了,如果性能方面如果不是問題的話,我們需要解決的實際上是第二點,也就是說,讓使系統具備 HA(High Availability,高可用性)。
4 使用 ZooKeeper 提供高可用的 RMI 服務
ZooKeeper 是 Hadoop 的一個子項目,用于解決分布式系統之間的數據一致性問題。如果讀者尚不了解 ZooKeeper 的工作原理與使用方法,可以通過以下鏈接來了解:
本文假設讀者已經對 ZooKeeper 有一定了解的前提下,對 RMI 的高可用性問題提供一個簡單的解決方案。
要想解決 RMI 服務的高可用性問題,我們需要利用 ZooKeeper 充當一個 服務注冊表
(Service Registry),讓多個 服務提供者
(Service Provider)形成一個集群,讓 服務消費者
(Service Consumer)通過服務注冊表獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者。如下圖所示:
需要注意的是,服務注冊表并不是 Load Balancer(負載均衡器),提供的不是“反向代理”服務,而是“服務注冊”與“心跳檢測”功能。
利用服務注冊表來注冊 RMI 地址,這個很好理解,那么“心跳檢測”又如何理解呢?說白了就是通過服務中心定時向各個服務提供者發送一個請求(實際上建立的是一個 Socket 長連接),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,只會從還“活著”的服務提供者中選出一個做為當前的服務提供者。
也許讀者會考慮到,服務中心可能會出現單點故障,如果服務注冊表都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構,必須保證服務中心也具備高可用性。
ZooKeeper 正好能夠滿足我們上面提到的所有需求。
- 使用 ZooKeeper 的臨時性 ZNode 來存放服務提供者的 RMI 地址,一旦與服務提供者的 Session 中斷,會自動清除相應的 ZNode。
- 讓服務消費者去監聽這些 ZNode,一旦發現 ZNode 的數據(RMI 地址)有變化,就會重新獲取一份有效數據的拷貝。
- ZooKeeper 與生俱來的集群能力(例如:數據同步與領導選舉特性),可以確保服務注冊表的高可用性。
4.1 服務提供者
需要編寫一個 ServiceProvider
類,來發布 RMI 服務,并將 RMI 地址注冊到 ZooKeeper 中(實際存放在 ZNode 上)。
package demo.zookeeper.remoting.server;import demo.zookeeper.remoting.common.Constant; import java.io.IOException; import java.net.MalformedURLException; import java.rmi.Naming; import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ServiceProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class); // 用于等待 SyncConnected 事件觸發后繼續執行當前線程 private CountDownLatch latch = new CountDownLatch(1); // 發布 RMI 服務并注冊 RMI 地址到 ZooKeeper 中 public void publish(Remote remote, String host, int port) { String url = publishService(remote, host, port); // 發布 RMI 服務并返回 RMI 地址 if (url != null) { ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象 if (zk != null) { createNode(zk, url); // 創建 ZNode 并將 RMI 地址放入 ZNode 上 } } } // 發布 RMI 服務 private String publishService(Remote remote, String host, int port) { String url = null; try { url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName()); LocateRegistry.createRegistry(port); Naming.rebind(url, remote); LOGGER.debug("publish rmi service (url: {})", url); } catch (RemoteException | MalformedURLException e) { LOGGER.error("", e); } return url; } // 連接 ZooKeeper 服務器 private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); // 喚醒當前正在執行的線程 } } }); latch.await(); // 使當前線程處于等待狀態 } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } // 創建 ZNode private void createNode(ZooKeeper zk, String url) { try { byte[] data = url.getBytes(); String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 創建一個臨時性且有序的 ZNode LOGGER.debug("create zookeeper node ({} => {})", path, url); } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } }
}</pre>
涉及到的
Constant
常量,見如下代碼:package demo.zookeeper.remoting.common;public interface Constant {
String ZK_CONNECTION_STRING = "localhost:2181"; int ZK_SESSION_TIMEOUT = 5000; String ZK_REGISTRY_PATH = "/registry"; String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
}</pre>
注意:我們首先需要使用 ZooKeeper 的客戶端工具創建一個持久性 ZNode,名為“/registry”,該節點是不存放任何數據的,可使用如下命令:
create /registry null4.2 服務消費者
服務消費者需要在創建的時候連接 ZooKeeper,同時監聽
/registry
節點的NodeChildrenChanged
事件,也就是說,一旦該節點的子節點有變化,就需要重新獲取最新的子節點。這里提到的子節點,就是存放服務提供者發布的 RMI 地址。需要強調的是,這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務注冊表的 Session 中斷后,該臨時性節會被自動刪除。package demo.zookeeper.remoting.client;import demo.zookeeper.remoting.common.Constant; import java.io.IOException; import java.net.MalformedURLException; import java.rmi.ConnectException; import java.rmi.Naming; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ServiceConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); // 用于等待 SyncConnected 事件觸發后繼續執行當前線程 private CountDownLatch latch = new CountDownLatch(1); // 定義一個 volatile 成員變量,用于保存最新的 RMI 地址(考慮到該變量或許會被其它線程所修改,一旦修改后,該變量的值會影響到所有線程) private volatile List<String> urlList = new ArrayList<>(); // 構造器 public ServiceConsumer() { ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象 if (zk != null) { watchNode(zk); // 觀察 /registry 節點的所有子節點并更新 urlList 成員變量 } } // 查找 RMI 服務 public <T extends Remote> T lookup() { T service = null; int size = urlList.size(); if (size > 0) { String url; if (size == 1) { url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素 LOGGER.debug("using only url: {}", url); } else { url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素 LOGGER.debug("using random url: {}", url); } service = lookupService(url); // 從 JNDI 中查找 RMI 服務 } return service; } // 連接 ZooKeeper 服務器 private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); // 喚醒當前正在執行的線程 } } }); latch.await(); // 使當前線程處于等待狀態 } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } // 觀察 /registry 節點下所有子節點是否有變化 private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); // 若子節點有變化,則重新調用該方法(為了獲取最新子節點中的數據) } } }); List<String> dataList = new ArrayList<>(); // 用于存放 /registry 所有子節點中的數據 for (String node : nodeList) { byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的數據 dataList.add(new String(data)); } LOGGER.debug("node data: {}", dataList); urlList = dataList; // 更新最新的 RMI 地址 } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } } // 在 JNDI 中查找 RMI 遠程服務對象 @SuppressWarnings("unchecked") private <T> T lookupService(String url) { T remote = null; try { remote = (T) Naming.lookup(url); } catch (NotBoundException | MalformedURLException | RemoteException e) { if (e instanceof ConnectException) { // 若連接中斷,則使用 urlList 中第一個 RMI 地址來查找(這是一種簡單的重試方式,確保不會拋出異常) LOGGER.error("ConnectException -> url: {}", url); if (urlList.size() != 0) { url = urlList.get(0); return lookupService(url); } } LOGGER.error("", e); } return remote; }
}</pre>
4.3 發布服務
我們需要調用 ServiceProvider 的 publish() 方法來發布 RMI 服務,發布成功后也會自動在 ZooKeeper 中注冊 RMI 地址。
package demo.zookeeper.remoting.server;import demo.zookeeper.remoting.common.HelloService;
public class Server {
public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("please using command: java Server <rmi_host> <rmi_port>"); System.exit(-1); } String host = args[0]; int port = Integer.parseInt(args[1]); ServiceProvider provider = new ServiceProvider(); HelloService helloService = new HelloServiceImpl(); provider.publish(helloService, host, port); Thread.sleep(Long.MAX_VALUE); }
}</pre>
注意:在運行 Server 類的 main() 方法時,一定要使用命令行參數來指定 host 與 port,例如:
java Server localhost 1099 java Server localhost 2099以上兩條 Java 命令可在本地運行兩個 Server 程序,當然也可以同時運行更多的 Server 程序,只要 port 不同就行。
4.4 調用服務
通過調用 ServiceConsumer 的 lookup() 方法來查找 RMI 遠程服務對象。我們使用一個“死循環”來模擬每隔 3 秒鐘調用一次遠程方法。
package demo.zookeeper.remoting.client;import demo.zookeeper.remoting.common.HelloService;
public class Client {
public static void main(String[] args) throws Exception { ServiceConsumer consumer = new ServiceConsumer(); while (true) { HelloService helloService = consumer.lookup(); String result = helloService.sayHello("Jack"); System.out.println(result); Thread.sleep(3000); } }
}</pre>
4.5 使用方法
根據以下步驟驗證 RMI 服務的高可用性:
- 運行兩個 Server 程序,一定要確保 port 是不同的。
- 運行一個 Client 程序。
- 停止其中一個 Server 程序,并觀察 Client 控制臺的變化(停止一個 Server 不會導致 Client 端調用失敗)。
- 重新啟動剛才關閉的 Server 程序,繼續觀察 Client 控制臺變化(新啟動的 Server 會加入候選)。
- 先后停止所有的 Server 程序,還是觀察 Client 控制臺變化(Client 會重試連接,多次連接失敗后,自動關閉)。
5 總結
通過本文,我們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,通過 ZooKeeper 注冊所有服務提供者發布的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案局限于 RMI 服務,對于任何形式的服務(比如:WebService),也提供了一定參考。
如果再配合 ZooKeeper 自身的集群,那才是一個相對完美的解決方案,對于 ZooKeeper 的集群,請讀者自行實踐。
由于筆者水品有限,對于描述有誤之處,還請各位讀者提出建議,并期待更加優秀的解決方案。
來自:http://my.oschina.net/huangyong/blog/345164