使用 RMI + ZooKeeper 實現遠程調用框架

sammuu 8年前發布 | 20K 次閱讀 Java RMI ZooKeeper Java開發

在 Java 世界里, 有一種技術可以實現“跨虛擬機”的調用,它就是 RMI(Remote Method Invocation,遠程方法調用) 。例如,服務A 在 JVM1 中運行,服務B 在 JVM2 中運行,服務A 與 服務B 可相互進行遠程調用,就像調用本地方法一樣,這就是 RMI。在分布式系統中,我們使用 RMI 技術可輕松將 服務提供者(Service Provider)與 服務消費者(Service Consumer)進行分離,充分體現組件之間的弱耦合,系統架構更易于擴展。

本文先從通過一個最簡單的 RMI 服務與調用示例,讓讀者快速掌握 RMI 的使用方法,然后指出 RMI 的局限性,最后筆者對此問題提供了一種簡單的解決方案,即使用 ZooKeeper 輕松解決 RMI 調用過程中所涉及的問題。

1 發布 RMI 服務

發布一個 RMI 服務,我們只需做三件事情:

  1. 定義一個 RMI 接口
  2. 編寫 RMI 接口的實現類
  3. 通過 JNDI 發布 RMI 服務

1.1 定義一個 RMI 接口

RMI 接口實際上還是一個普通的 Java 接口, 只是 RMI 接口必須繼承 java.rmi.Remote ,此外, 每個 RMI 接口的方法必須聲明拋出一個 java.rmi.RemoteException 異常 ,就像下面這樣:

package com.king.zkrmi;

import java.rmi.Remote;
import java.rmi.RemoteException;

/**
 * RMI服務接口
 */
public interface HelloService extends Remote {

    String sayHello(String name) throws RemoteException;
}

繼承了 Remote 接口,實際上是讓 JVM 得知該接口是需要用于遠程調用的 , 拋出了 RemoteException 是為了讓調用 RMI 服務的程序捕獲這個異常 。畢竟遠程調用過程中,什么奇怪的事情都會發生(比如:斷網)。需要說明的是, RemoteException 是一個“受檢異常”,在調用的時候必須使用 try...catch... 自行處理 。

1.2 編寫 RMI 接口的實現類

實現以上的 HelloService 是一件非常簡單的事情, 但需要注意的是,我們必須讓實現類繼承 java.rmi.server.UnicastRemoteObject 類 ,此外, 必須提供一個構造器,并且構造器必須拋出 java.rmi.RemoteException 異常 。我們既然使用 JVM 提供的這套 RMI 框架,那么就必須按照這個要求來實現,否則是無法成功發布 RMI 服務的,一句話:我們得按規矩出牌!

package com.king.zkrmi;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

/**
 * RMI服務實現
 */
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {

    protected HelloServiceImpl() throws RemoteException {
    }

    @Override
    public String sayHello(String name) throws RemoteException {
        return String.format("Hello %s", name);
    }
}

為了滿足 RMI 框架的要求,我們確實做了很多額外的工作( 繼承了 UnicastRemoteObject 類,拋出了 RemoteException 異常 ),但這些工作阻止不了我們發布 RMI 服務的決心!我們可以 通過 JVM 提供的 JNDI(Java Naming and Directory Interface,Java 命名與目錄接口)這個 API 輕松發布 RMI 服務 。

1.3 通過 JNDI 發布 RMI 服務

發布 RMI 服務,我們需要告訴 JNDI 三個基本信息: 1. 域名或 IP 地址(host)、2. 端口號(port)、3. 服務名(service) ,它們構成了 RMI 協議的 URL(或稱為“RMI 地址”):

rmi://<host>:<port>/<service>

如果我們是在本地發布 RMI 服務,那么 host 就是“localhost”。 此外,RMI 默認的 port 是“1099” ,我們也可以自行設置 port 的值(只要不與其它端口沖突即可)。 service 實際上是一個基于同一 host 與 port 下唯一的服務名,我們不妨使用 Java 完全類名來表示吧 ,這樣也比較容易保證 RMI 地址的唯一性。

對于我們的示例而言,RMI 地址為:

rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl

我們只需簡單提供一個 main() 方法就能發布 RMI 服務,就像下面這樣:

package demo.zookeeper.rmi.server;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

public class RmiServer {

    public static void main(String[] args) throws Exception {
        int port = 1099;
        String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
        LocateRegistry.createRegistry(port);
        Naming.rebind(url, new HelloServiceImpl());
    }
}

需要注意的是,我們 通過 LocateRegistry.createRegistry() 方法在 JNDI 中創建一個注冊表,只需提供一個 RMI 端口號即可 。此外, 通過 Naming.rebind() 方法綁定 RMI 地址與 RMI 服務實現類 ,這里使用了 rebind() 方法,它相當于先后調用 Naming 的 unbind() 與 bind() 方法,只是使用 rebind() 方法來得更加痛快而已,所以我們選擇了它。

運行這個 main() 方法,RMI 服務就會自動發布,剩下要做的就是寫一個 RMI 客戶端來調用已發布的 RMI 服務。

2 調用 RMI 服務

同樣我們也使用一個 main() 方法來調用 RMI 服務,相比發布而言,調用會更加簡單,我們只需要知道兩個東西: 1. RMI 請求路徑、2. RMI 接口(一定不需要 RMI 實現類,否則就是本地調用了) 。數行代碼就能調用剛才發布的 RMI 服務,就像下面這樣:

package demo.zookeeper.rmi.client;

import demo.zookeeper.rmi.common.HelloService;
import java.rmi.Naming;

public class RmiClient {

    public static void main(String[] args) throws Exception {
        String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
        HelloService helloService = (HelloService) Naming.lookup(url);
        String result = helloService.sayHello("Jack");
        System.out.println(result);
    }
}

當我們運行以上 main() 方法,在控制臺中看到“Hello Jack”輸出,就表明 RMI 調用成功。

3 RMI 服務的局限性

可見,借助 JNDI 這個所謂的命名與目錄服務,我們成功地發布并調用了 RMI 服務。實際上, JNDI 就是一個注冊表,服務端將服務對象放入到注冊表中,客戶端從注冊表中獲取服務對象。在服務端我們發布了 RMI 服務,并在 JNDI 中進行了注冊,此時就在服務端創建了一個 Skeleton(骨架),當客戶端第一次成功連接 JNDI 并獲取遠程服務對象后,立馬就在本地創建了一個 Stub(存根),遠程通信實際上是通過 Skeleton 與 Stub 來完成的,數據是基于 TCP/IP 協議,在“傳輸層”上發送的 。毋庸置疑,理論上 RMI 一定比 WebService 要快,畢竟 WebService 是基于 HTTP 的,而 HTTP 所攜帶的數據是通過“應用層”來傳輸的,傳輸層較應用層更為底層,越底層越快。

既然 RMI 比 WebService 快,使用起來也方便,那么為什么我們有時候還要用 WebService 呢?

其實原因很簡單, WebService 可以實現跨語言系統之間的調用,而 RMI 只能實現 Java 系統之間的調用 。也就是說,RMI 的跨平臺性不如 WebService 好,假如我們的系統都是用 Java 開發的,那么當然首選就是 RMI 服務了。

貌似 RMI 確實挺優秀的,除了不能跨平臺以外,還有那些問題呢?

筆者認為有兩點局限性:

  1. RMI 使用了 Java 默認的序列化方式,對于性能要求比較高的系統,可能需要使用其它序列化方案來解決(例如:Protobuf)。
  2. RMI 服務在運行時難免會存在出故障,例如,如果 RMI 服務無法連接了,就會導致客戶端無法響應的現象。

在一般的情況下,Java 默認的序列化方式確實已經足以滿足我們的要求了,如果性能方面如果不是問題的話,我們需要解決的實際上是第二點,也就是說, 讓使系統具備 HA(High Availability,高可用性) 。

4 使用 ZooKeeper 提供高可用的 RMI 服務

要想解決 RMI 服務的高可用性問題,我們需要 利用 ZooKeeper 充當一個 服務注冊表(Service Registry),讓多個 服務提供者(Service Provider)形成一個集群,讓 服務消費者(Service Consumer)通過服務注冊表獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者 。如下圖所示:

需要注意的是, 服務注冊表并不是 Load Balancer(負載均衡器),提供的不是“反向代理”服務,而是“服務注冊”與“心跳檢測”功能 。

利用服務注冊表來注冊 RMI 地址,這個很好理解,那么“心跳檢測”又如何理解呢?說白了就是通過服務中心定時向各個服務提供者發送一個請求(實際上建立的是一個 Socket 長連接),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,只會從還“活著”的服務提供者中選出一個做為當前的服務提供者。

也許讀者會考慮到,服務中心可能會出現單點故障,如果服務注冊表都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構, 必須保證服務中心也具備高可用性 。

ZooKeeper 正好能夠滿足我們上面提到的所有需求:

使用 ZooKeeper 的臨時性 ZNode 來存放服務提供者的 RMI 地址,一旦與服務提供者的 Session 中斷,會自動清除相應的 ZNode。

讓服務消費者去監聽這些 ZNode,一旦發現 ZNode 的數據(RMI 地址)有變化,就會重新獲取一份有效數據的拷貝。

ZooKeeper 與生俱來的集群能力(例如:數據同步與領導選舉特性),可以確保服務注冊表的高可用性。

4.1 服務提供者

需要編寫一個 ServiceProvider 類,來發布 RMI 服務,并將 RMI 地址注冊到 ZooKeeper 中(實際存放在 ZNode 上):

package com.king.zkrmi;

import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;

/**
 * RMI服務提供者
 */
public class ServiceProvider {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);

    // 用于等待 SyncConnected 事件觸發后繼續執行當前線程
    private CountDownLatch latch = new CountDownLatch(1);

    // 發布 RMI 服務并注冊 RMI 地址到 ZooKeeper 中
    public void publish(Remote remote, String host, int port) {
        String url = publishService(remote, host, port); // 發布 RMI 服務并返回 RMI 地址
        if (url != null) {
            ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象
            if (zk != null) {
                createNode(zk, url); // 創建 ZNode 并將 RMI 地址放入 ZNode 上
            }
        }
    }

    // 發布 RMI 服務
    private String publishService(Remote remote, String host, int port) {
        String url = null;
        try {
            url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
            LocateRegistry.createRegistry(port);
            Naming.rebind(url, remote);
            LOGGER.debug("publish rmi service (url: {})", url);
        } catch (RemoteException | MalformedURLException e) {
            LOGGER.error("", e);
        }
        return url;
    }

    // 連接 ZooKeeper 服務器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的線程
                    }
                }
            });
            latch.await(); // 使當前線程處于等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    // 創建 ZNode
    private void createNode(ZooKeeper zk, String url) {
        try {
            byte[] data = url.getBytes();
            String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);    // 創建一個臨時性且有序的 ZNode
            LOGGER.debug("create zookeeper node ({} => {})", path, url);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}

涉及到的 Constant 常量,見如下代碼:

package com.king.zkrmi;

/**
 * ZK常量
 */
public interface Constant {

    String ZK_CONNECTION_STRING = "localhost:2181";
    int ZK_SESSION_TIMEOUT = 5000;
    String ZK_REGISTRY_PATH = "/registry";
    String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
}

注意:我們首先需要使用 ZooKeeper 的客戶端工具創建一個持久性 ZNode,名為“/registry”,該節點是不存放任何數據的,可使用如下命令:

create /registry null

4.2 服務消費者

服務消費者需要在創建的時候連接 ZooKeeper,同時監聽 /registry 節點的 NodeChildrenChanged 事件 ,也就是說,一旦該節點的子節點有變化,就需要重新獲取最新的子節點。 這里提到的子節點,就是存放服務提供者發布的 RMI 地址 。需要強調的是, 這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務注冊表的 Session 中斷后,該臨時性節會被自動刪除 。

package com.king.zkrmi;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

/**
 * RMI服務消費者
 */
public class ServiceConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);

    // 用于等待 SyncConnected 事件觸發后繼續執行當前線程
    private CountDownLatch latch = new CountDownLatch(1);

    // 定義一個 volatile 成員變量,用于保存最新的 RMI 地址(考慮到該變量或許會被其它線程所修改,一旦修改后,該變量的值會影響到所有線程)
    private volatile List<String> urlList = new ArrayList<>();

    // 構造器
    public ServiceConsumer() {
        ZooKeeper zk = connectServer(); // 連接 ZooKeeper 服務器并獲取 ZooKeeper 對象
        if (zk != null) {
            watchNode(zk); // 觀察 /registry 節點的所有子節點并更新 urlList 成員變量
        }
    }

    // 查找 RMI 服務
    public <T extends Remote> T lookup() {
        T service = null;
        int size = urlList.size();
        if (size > 0) {
            String url;
            if (size == 1) {
                url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素
                LOGGER.debug("using only url: {}", url);
            } else {
                url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素
                LOGGER.debug("using random url: {}", url);
            }
            service = lookupService(url); // 從 JNDI 中查找 RMI 服務
        }
        return service;
    }

    // 連接 ZooKeeper 服務器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的線程
                    }
                }
            });
            latch.await(); // 使當前線程處于等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    // 觀察 /registry 節點下所有子節點是否有變化
    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk); // 若子節點有變化,則重新調用該方法(為了獲取最新子節點中的數據)
                    }
                }
            });
            List<String> dataList = new ArrayList<>(); // 用于存放 /registry 所有子節點中的數據
            for (String node : nodeList) {
                byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的數據
                dataList.add(new String(data));
            }
            LOGGER.debug("node data: {}", dataList);
            urlList = dataList; // 更新最新的 RMI 地址
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }

    // 在 JNDI 中查找 RMI 遠程服務對象
    @SuppressWarnings("unchecked")
    private <T> T lookupService(String url) {
        T remote = null;
        try {
            remote = (T) Naming.lookup(url);
        } catch (NotBoundException | MalformedURLException | RemoteException e) {
            if (e instanceof ConnectException) {
                // 若連接中斷,則使用 urlList 中第一個 RMI 地址來查找(這是一種簡單的重試方式,確保不會拋出異常)
                LOGGER.error("ConnectException -> url: {}", url);
                if (urlList.size() != 0) {
                    url = urlList.get(0);
                    return lookupService(url);
                }
            }
            LOGGER.error("", e);
        }
        return remote;
    }
}

4.3 發布服務

我們需要調用 ServiceProvider 的 publish() 方法來發布 RMI 服務,發布成功后也會自動在 ZooKeeper 中注冊 RMI 地址:

package com.king.zkrmi;

/**
 * 服務發布
 */
public class Server {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("please using command: java Server <rmi_host> <rmi_port>");
            System.exit(-1);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);

        ServiceProvider provider = new ServiceProvider();

        HelloService helloService = new HelloServiceImpl();
        provider.publish(helloService, host, port);

        Thread.sleep(Long.MAX_VALUE);
    }
}

注意:在運行 Server 類的 main() 方法時,一定要使用命令行參數來指定 host 與 port,例如:

java Server localhost 1099
java Server localhost 2099

以上兩條 Java 命令可在本地運行兩個 Server 程序,當然也可以同時運行更多的 Server 程序,只要 port 不同就行。

4.4 調用服務

通過調用 ServiceConsumer 的 lookup() 方法來查找 RMI 遠程服務對象。我們使用一個“死循環”來模擬每隔 3 秒鐘調用一次遠程方法。

package com.king.zkrmi;

/**
 * RMI客戶端
 */
public class Client {

    public static void main(String[] args) throws Exception {
        ServiceConsumer consumer = new ServiceConsumer();

        while (true) {
            HelloService helloService = consumer.lookup();
            String result = helloService.sayHello("Jack");
            System.out.println(result);
            Thread.sleep(3000);
        }
    }
}

4.5 使用方法

根據以下步驟驗證 RMI 服務的高可用性:

  1. 運行兩個 Server 程序,一定要確保 port 是不同的。
  2. 運行一個 Client 程序。
  3. 停止其中一個 Server 程序,并觀察 Client 控制臺的變化(停止一個 Server 不會導致 Client 端調用失敗)。
  4. 重新啟動剛才關閉的 Server 程序,繼續觀察 Client 控制臺變化(新啟動的 Server 會加入候選)。
  5. 先后停止所有的 Server 程序,還是觀察 Client 控制臺變化(Client 會重試連接,多次連接失敗后,自動關閉)。

5 總結

通過本文,我們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,通過 ZooKeeper 注冊所有服務提供者發布的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案局限于 RMI 服務,對于任何形式的服務(比如:WebService),也提供了一定參考。

如果再配合 ZooKeeper 自身的集群,那才是一個相對完美的解決方案,對于 ZooKeeper 的集群,請讀者自行實踐。

由于筆者水平有限,對于描述有誤之處,還請各位讀者提出建議,并期待更加優秀的解決方案。

6 Zookeeper + RMI 時序圖

 

來自: http://my.oschina.net/xianggao/blog/645015

 

 本文由用戶 sammuu 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!