Dubbo的一次體驗與分析

KathrinSuar 8年前發布 | 16K 次閱讀 Dubbo 分布式/云計算/大數據

Dubbo是什么

Dubbo是一個分布式服務框架,致力于提供高性能和透明化的RPC遠程服務調用方案,以及SOA服務治理方案;

其核心部分包含:

遠程通訊:提供對多種基于長連接的NIO框架抽象封裝,包括多種線程模型,序列化,以及“請求-響應”模式的信息交換方式。

集群容錯:提供基于接口方法的透明遠程過程調用,包括多協議支持,以及軟負載均衡,失敗容錯,地址路由,動態配置等集群支持。

自動發現:基于注冊中心目錄服務,使服務消費方能動態的查找服務提供方,使地址透明,使服務提供方可以平滑增加或減少機器。

架構圖如下:來自官網

Provider: 暴露服務的服務提供方。

Consumer: 調用遠程服務的服務消費方。

Registry: 服務注冊與發現的注冊中心。

Monitor: 統計服務的調用次調和調用時間的監控中心。

Container: 服務運行容器。

與Spring和Zookeeper的集成測試

從上面的架構圖中了解到整個系統需要5個部分,不過此處為了方便測試,只提供了Provider,Consumer以及Registry三個部分;代碼結構圖如下所示:

dubboTest是公共的父工程,dubboProvider和dubboConsumer分別是其子工程,分別對應了Provider和Consumer兩個部分,至于Registry由Zookeeper來支持,下面詳細介紹每一部分。

1.公共maven依賴

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>4.3.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.github.sgroschupf</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.1</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dubbo</artifactId>
    <version>2.5.3</version>
    <exclusions>
        <exclusion>
            <artifactId>spring</artifactId>
            <groupId>org.springframework</groupId>
        </exclusion>
    </exclusions>
</dependency>

主要是Spring,Zookeeper以及dubbo相關的包

2.Provider相關介紹

提供一個對外的接口類DemoService

public interface DemoService {
    String syncSayHello(String name);

    String asyncSayHello(String name);
}

提供DemoService的實現類DemoServiceImpl

public class DemoServiceImpl implements DemoService {

    @Override
    public String syncSayHello(String name) {
        return "sync Hello " + name;
    }

    @Override
    public String asyncSayHello(String name) {
        return "async Hello " + name;
    }
}

提供provider配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans          
    http://www.springframework.org/schema/beans/spring-beans.xsd          
    http://code.alibabatech.com/schema/dubbo          
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方應用信息,用于計算依賴關系 -->
    <dubbo:application name="hello-world-app" />

    <!-- 使用zookeeper注冊中心暴露服務地址 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181" />

    <!-- 用dubbo協議在20880端口暴露服務 -->
    <dubbo:protocol name="dubbo" port="20880" />

    <!-- 聲明需要暴露的服務接口 -->
    <dubbo:service interface="org.dubboProvider.DemoService"
        ref="demoService" />

    <!-- 和本地bean一樣實現服務 -->
    <bean id="demoService" class="org.dubboProvider.DemoServiceImpl" />
</beans>

dubbo:registry:提供了注冊中心,為了方便此處配置的是本地的zookeeper

dubbo:service:提供了對外的服務接口

提供Provider啟動類Provider

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Provider {

    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "dubbo-provider.xml" });
        context.start();

        System.in.read(); // 按任意鍵退出
    }
}

3.Consumer相關介紹

Consumer配置文件dubbo-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans          
    http://www.springframework.org/schema/beans/spring-beans.xsd          
    http://code.alibabatech.com/schema/dubbo          
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <!-- 消費方應用名,用于計算依賴關系,不是匹配條件,不要與提供方一樣 -->
    <dubbo:application name="consumer-of-helloworld-app" />
    <dubbo:registry address="zookeeper://127.0.0.1:2181" />
    <!-- 生成遠程服務代理,可以和本地bean一樣使用demoService -->
    <dubbo:reference id="demoService" interface="org.dubboProvider.DemoService" >
        <dubbo:method name="syncSayHello" async="false" />
        <dubbo:method name="asyncSayHello" async="true" />
    </dubbo:reference>
</beans>

Consumer測試類Consumer

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.dubboProvider.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.alibaba.dubbo.rpc.RpcContext;

public class Consumer {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "dubbo-consumer.xml");
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理

        System.out.println(demoService.syncSayHello("world"));
        System.out.println(demoService.asyncSayHello("world"));
        Future<String> futrue = RpcContext.getContext().getFuture();
        System.out.println(futrue.get());
    }
}

運行Consumer類,結果如下:

sync Hello world
null
async Hello world

調用流程簡要分析

1.啟動Provider,讀取配置文件

Provider啟動向Zookeeper中注冊服務器的相關信息,主要的接口類RegistryService,四個實現類分別是:ZookeeperRegistry,RedisRegistry,DubboRegistry以及MulticastRegistry,這里使用的是ZookeeperRegistry,啟動Provider的時候會調用doRegistry()方法,代碼如下:

protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

url詳細信息:dubbo://192.168.67.13:20880/org.dubboProvider.DemoService?anyhost=true&application=hello-world-app&dubbo=2.5.3&interface=org.dubboProvider.DemoService&methods=syncSayHello,asyncSayHello&pid=4952&side=provider&timestamp=1487671777581

通過zkclient在Zookeeper上創建節點,為Consumer獲取節點做準備。

2.啟動Consumer,讀取配置文件

監聽Zookeeper中注冊的服務器信息節點,通過節點信息建立和遠程服務器的連接,所有的客戶端都繼承于AbstractClient,對應的實現類有NettyClient,MinaClient以及GrizzlyClient;3個都是基于java nio的底層通信框架,默認使用的是NettyClient,在AbstractClient的構造方法中就建立了和服務器的連接,部分代碼如下:

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    //...省略...
    connect();
    //...省略...
}

對應的在NettyClient中進行了doConnect()

protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        //...省略...
        NettyClient.this.channel = newChannel;
        //...省略...
}

NettyClient中保存了建立的連接。

3.生成動態代理

通過簡單的接口調用就實現了遠程方法的調用,其實就是dubbo幫助我們生成了一個動態代理類,所有的關于建立遠程連接,消息封裝編碼,消息的發送以及消息的接收解碼都在動態代理類里面幫我們處理了

dubbo提供的代理工廠類都繼承于AbstractProxyFactory類,對應的實現類有JdkProxyFactory和JavassistProxyFactory,默認情況下使用的JavassistProxyFactory,相應代碼如下:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
     return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

4.方法調用觸發InvokerInvocationHandler調用invoke方法

invoke方法代碼如下:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

RpcInvocation用來封裝了方法調用的相關參數比如:方法名,參數類型,參數列表等,可以查看相關代碼:

public class RpcInvocation implements Invocation, Serializable {

    private static final long serialVersionUID = -4355285085441097045L;

    private String               methodName;

    private Class<?>[]           parameterTypes;

    private Object[]             arguments;

    private Map<String, String>  attachments;

    private transient Invoker<?> invoker;

    ......//以下省略

經過層層調用最后到達了DubboInvoker的doInvoke方法中,也是我們比較關心的類,代碼如下:

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

里面有我們比較關心的同步和異步調用:

同步調用:調用ResponseFuture的get()方法進行等待服務器的返回

異步調用:沒有等待服務器的返回,直接將ResponseFuture放入了RpcContext.getContext()中,這也是我們需要在代碼中使用Future futrue = RpcContext.getContext().getFuture();的原因。

5.發送請求

在第二步中已經建立了和遠程服務器的連接,建立的數量和集群的服務器有關,所以客戶端是一個數組,從數組中獲取一個客戶端連接

if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }

獲取之后執行currentClient.request(inv, timeout),就是對服務器發送請求,進入到request方法中有如下代碼:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

把要發送的消息封裝成了一個Request對象,并且返回了DefaultFuture(繼承于ResponseFuture實現了同步和異步的調用),可以看一下DefaultFuture的get()方法:

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

while一直在循環,直到isDone為true,或者超時了,isDone其實就是判斷response是否為空:

public boolean isDone() {
        return response != null;
    }

6.接收消息

DefaultFuture中的received()用來接收消息同時賦值給了Response response,這樣isDone()方法可以為true了

總結

以上只是對dubbo粗淺的使用,以及簡單的了解了一下調用的整個流程,沒有太多更加的深入,主要還是項目沒有實際用到;不過可以看到dubbo底層通信是基于netty,mina這種高性能的通信框架,而且通過長連接減少握手;二進制流壓縮數據,比常規HTTP等短連接協議更快,這個上面沒有提到,更多的可以查看編解碼類DubboCodec;可以認為dubbo的性能還是相當強的。

 

來自:http://codingo.xyz/index.php/2017/02/21/dubbo/

 

 本文由用戶 KathrinSuar 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!