Dubbo的一次體驗與分析
Dubbo是什么
Dubbo是一個分布式服務框架,致力于提供高性能和透明化的RPC遠程服務調用方案,以及SOA服務治理方案;
其核心部分包含:
遠程通訊:提供對多種基于長連接的NIO框架抽象封裝,包括多種線程模型,序列化,以及“請求-響應”模式的信息交換方式。
集群容錯:提供基于接口方法的透明遠程過程調用,包括多協議支持,以及軟負載均衡,失敗容錯,地址路由,動態配置等集群支持。
自動發現:基于注冊中心目錄服務,使服務消費方能動態的查找服務提供方,使地址透明,使服務提供方可以平滑增加或減少機器。
架構圖如下:來自官網

Provider: 暴露服務的服務提供方。
Consumer: 調用遠程服務的服務消費方。
Registry: 服務注冊與發現的注冊中心。
Monitor: 統計服務的調用次調和調用時間的監控中心。
Container: 服務運行容器。
與Spring和Zookeeper的集成測試
從上面的架構圖中了解到整個系統需要5個部分,不過此處為了方便測試,只提供了Provider,Consumer以及Registry三個部分;代碼結構圖如下所示:

dubboTest是公共的父工程,dubboProvider和dubboConsumer分別是其子工程,分別對應了Provider和Consumer兩個部分,至于Registry由Zookeeper來支持,下面詳細介紹每一部分。
1.公共maven依賴
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
主要是Spring,Zookeeper以及dubbo相關的包
2.Provider相關介紹
提供一個對外的接口類DemoService
public interface DemoService {
String syncSayHello(String name);
String asyncSayHello(String name);
}
提供DemoService的實現類DemoServiceImpl
public class DemoServiceImpl implements DemoService {
@Override
public String syncSayHello(String name) {
return "sync Hello " + name;
}
@Override
public String asyncSayHello(String name) {
return "async Hello " + name;
}
}
提供provider配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 提供方應用信息,用于計算依賴關系 -->
<dubbo:application name="hello-world-app" />
<!-- 使用zookeeper注冊中心暴露服務地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 用dubbo協議在20880端口暴露服務 -->
<dubbo:protocol name="dubbo" port="20880" />
<!-- 聲明需要暴露的服務接口 -->
<dubbo:service interface="org.dubboProvider.DemoService"
ref="demoService" />
<!-- 和本地bean一樣實現服務 -->
<bean id="demoService" class="org.dubboProvider.DemoServiceImpl" />
</beans>
dubbo:registry:提供了注冊中心,為了方便此處配置的是本地的zookeeper
dubbo:service:提供了對外的服務接口
提供Provider啟動類Provider
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "dubbo-provider.xml" });
context.start();
System.in.read(); // 按任意鍵退出
}
}
3.Consumer相關介紹
Consumer配置文件dubbo-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 消費方應用名,用于計算依賴關系,不是匹配條件,不要與提供方一樣 -->
<dubbo:application name="consumer-of-helloworld-app" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 生成遠程服務代理,可以和本地bean一樣使用demoService -->
<dubbo:reference id="demoService" interface="org.dubboProvider.DemoService" >
<dubbo:method name="syncSayHello" async="false" />
<dubbo:method name="asyncSayHello" async="true" />
</dubbo:reference>
</beans>
Consumer測試類Consumer
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dubboProvider.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.dubbo.rpc.RpcContext;
public class Consumer {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"dubbo-consumer.xml");
context.start();
DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理
System.out.println(demoService.syncSayHello("world"));
System.out.println(demoService.asyncSayHello("world"));
Future<String> futrue = RpcContext.getContext().getFuture();
System.out.println(futrue.get());
}
}
運行Consumer類,結果如下:
sync Hello world
null
async Hello world
調用流程簡要分析
1.啟動Provider,讀取配置文件
Provider啟動向Zookeeper中注冊服務器的相關信息,主要的接口類RegistryService,四個實現類分別是:ZookeeperRegistry,RedisRegistry,DubboRegistry以及MulticastRegistry,這里使用的是ZookeeperRegistry,啟動Provider的時候會調用doRegistry()方法,代碼如下:
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
url詳細信息:dubbo://192.168.67.13:20880/org.dubboProvider.DemoService?anyhost=true&application=hello-world-app&dubbo=2.5.3&interface=org.dubboProvider.DemoService&methods=syncSayHello,asyncSayHello&pid=4952&side=provider×tamp=1487671777581
通過zkclient在Zookeeper上創建節點,為Consumer獲取節點做準備。
2.啟動Consumer,讀取配置文件
監聽Zookeeper中注冊的服務器信息節點,通過節點信息建立和遠程服務器的連接,所有的客戶端都繼承于AbstractClient,對應的實現類有NettyClient,MinaClient以及GrizzlyClient;3個都是基于java nio的底層通信框架,默認使用的是NettyClient,在AbstractClient的構造方法中就建立了和服務器的連接,部分代碼如下:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
//...省略...
connect();
//...省略...
}
對應的在NettyClient中進行了doConnect()
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
//...省略...
NettyClient.this.channel = newChannel;
//...省略...
}
NettyClient中保存了建立的連接。
3.生成動態代理
通過簡單的接口調用就實現了遠程方法的調用,其實就是dubbo幫助我們生成了一個動態代理類,所有的關于建立遠程連接,消息封裝編碼,消息的發送以及消息的接收解碼都在動態代理類里面幫我們處理了
dubbo提供的代理工廠類都繼承于AbstractProxyFactory類,對應的實現類有JdkProxyFactory和JavassistProxyFactory,默認情況下使用的JavassistProxyFactory,相應代碼如下:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
4.方法調用觸發InvokerInvocationHandler調用invoke方法
invoke方法代碼如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
RpcInvocation用來封裝了方法調用的相關參數比如:方法名,參數類型,參數列表等,可以查看相關代碼:
public class RpcInvocation implements Invocation, Serializable {
private static final long serialVersionUID = -4355285085441097045L;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments;
private Map<String, String> attachments;
private transient Invoker<?> invoker;
......//以下省略
經過層層調用最后到達了DubboInvoker的doInvoke方法中,也是我們比較關心的類,代碼如下:
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
里面有我們比較關心的同步和異步調用:
同步調用:調用ResponseFuture的get()方法進行等待服務器的返回
異步調用:沒有等待服務器的返回,直接將ResponseFuture放入了RpcContext.getContext()中,這也是我們需要在代碼中使用Future futrue = RpcContext.getContext().getFuture();的原因。
5.發送請求
在第二步中已經建立了和遠程服務器的連接,建立的數量和集群的服務器有關,所以客戶端是一個數組,從數組中獲取一個客戶端連接
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
獲取之后執行currentClient.request(inv, timeout),就是對服務器發送請求,進入到request方法中有如下代碼:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
把要發送的消息封裝成了一個Request對象,并且返回了DefaultFuture(繼承于ResponseFuture實現了同步和異步的調用),可以看一下DefaultFuture的get()方法:
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
while一直在循環,直到isDone為true,或者超時了,isDone其實就是判斷response是否為空:
public boolean isDone() {
return response != null;
}
6.接收消息
DefaultFuture中的received()用來接收消息同時賦值給了Response response,這樣isDone()方法可以為true了
總結
以上只是對dubbo粗淺的使用,以及簡單的了解了一下調用的整個流程,沒有太多更加的深入,主要還是項目沒有實際用到;不過可以看到dubbo底層通信是基于netty,mina這種高性能的通信框架,而且通過長連接減少握手;二進制流壓縮數據,比常規HTTP等短連接協議更快,這個上面沒有提到,更多的可以查看編解碼類DubboCodec;可以認為dubbo的性能還是相當強的。
來自:http://codingo.xyz/index.php/2017/02/21/dubbo/