Dubbo源码(七)(深度剖析dubbo源码)

  本篇文章为你整理了Dubbo源码(七)(深度剖析dubbo源码)的详细内容,包含有dubbo源码解析20pdf 深度剖析dubbo源码 dubbo spi源码 dubbo示例代码 Dubbo源码(七),希望能帮助你了解 Dubbo源码(七)。

  本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

  集群(cluster)就是一组计算机,它们作为一个总体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。

  在Dubbo中,为了避免单点故障,同一个服务允许有多个服务提供者,也允许同时连接多个注册中心。那么,服务消费者引用服务时,该请求哪个注册中心的服务提供者以及调用失败之后该如何处理呢?这些就是Dubbo集群所做的事。

  在分析集群源码之前,先看看集群容错的所有组件,下图是官方文档的组件图

  Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker:

  Cluster 是接口,其只有一个方法,负责生成Cluster Invoker

  Cluster Invoker继承了Invoker接口,是一个 Invoker,是主要逻辑实现的地方

  将上图从中间切分,可将集群工作过程分为两个阶段,左边为第一阶段。

  
第一个阶段是在服务消费者初始化期间。

  集群 Cluster 实现类为服务消费者创建 Cluster Invoker,即图上的 merge 操作,也就是将多个服务提供者合并为一个 Cluster Invoker

  
第二个阶段是在服务消费者进行远程调用时。

  步骤大体上就如图所示:list → route → select → invoke

  list:从服务目录拿到 invoker 集合

  route:通过路由过滤出符合规则的 invoker 集合

  select:通过负载均衡从 invoker 集合中选择一个

  invoke:执行 invoker 的 invoke 方法,进行真正的远程调用

  其中,list、route操作在之前文章讲过了,传送门:《服务目录》、《服务路由》

  select 不是本文重点,后续负载均衡时讲解。

  
以上就是集群工作的整个流程,这里并没有介绍集群是如何容错的,也就是 invoke 步骤调用失败的处理。Dubbo提供了多种容错方式:集群容错示例

  下面的源码我们以默认的 Failover Cluster - 失败自动切换 进行分析

  Cluster

  首先来看看 Cluster 接口,这是一个自适应拓展类,默认实现为FailoverCluster

  

public class FailoverCluster implements Cluster {

 

   public final static String NAME = "failover";

   @Override

   public T Invoker T join(Directory T directory) throws RpcException {

   return new FailoverClusterInvoker T (directory);

  

 

  前面讲了,Cluster 的作用就是将多个服务提供者合并为一个 Cluster Invoker

  多个服务提供者合并也就是 服务目录(Directory) 中的 invoker 集合。join 方法返回了一个 Cluster Invoker

  接下来,我们看看调用路径。 Cluster 接口在多个地方被调用,我们看服务消费者初始化期间的调用。

  

// 调用路径如下:

 

  // ReferenceBean#getObject()

  // ReferenceConfig#get()

  // ReferenceConfig#init()

  // ReferenceConfig#createProxy(Map String, String map)

  // RegistryProtocol#refer(Class T type, URL url)

  // RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class T type, URL url)

  private T Invoker T doRefer(Cluster cluster, Registry registry, Class T type, URL url) {

   // 创建 RegistryDirectory 实例

   RegistryDirectory T directory = new RegistryDirectory T (type, url);

   // 设置注册中心和协议

   directory.setRegistry(registry);

   directory.setProtocol(protocol);

   // all attributes of REFER_KEY

   Map String, String parameters = new HashMap String, String (directory.getUrl().getParameters());

   // 生成服务消费者链接

   URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

   // 注册服务消费者,在 consumers 目录下新节点

   if (!Constants.ANY_VALUE.equals(url.getServiceInterface())

   url.getParameter(Constants.REGISTER_KEY, true)) {

   URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);

   registry.register(registeredConsumerUrl);

   directory.setRegisteredConsumerUrl(registeredConsumerUrl);

   // 订阅 providers、configurators、routers 等节点数据

   directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,

   Constants.PROVIDERS_CATEGORY

   + "," + Constants.CONFIGURATORS_CATEGORY

   + "," + Constants.ROUTERS_CATEGORY));

   // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个

   Invoker invoker = cluster.join(directory);

   ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);

   return invoker;

  

 

  调用在倒数第三行代码。

  如果看过我之前写的《服务引用》那篇文章,想必对 doRefer 方法不陌生了。在服务目录订阅完注册中心的数据后,就调用 join 方法生成 Cluster Invoker

  啰嗦多一句:

  可以这么理解,实际负责远程调用的,是服务目录中的 invoker 集合中的 invoker,而 Cluster Invoker 则对服务目录中的 invoker 集合进行处理。

  Cluster Invoker

  默认的 Cluster Invoker 是FailoverClusterInvoker,既然是一个 Invoker,我们就从它的 invoke 方法入手。

  AbstractClusterInvoker

  invoke 方法在它的父类AbstractClusterInvoker中

  

public Result invoke(final Invocation invocation) throws RpcException {

 

   checkWhetherDestroyed();

   LoadBalance loadbalance = null;

   // 绑定 attachments 到 invocation 中.

   Map String, String contextAttachments = RpcContext.getContext().getAttachments();

   if (contextAttachments != null contextAttachments.size() != 0) {

   ((RpcInvocation) invocation).addAttachments(contextAttachments);

   // 列举 Invoker

   List Invoker T invokers = list(invocation);

   if (invokers != null !invokers.isEmpty()) {

   // 加载 LoadBalance

   loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

   .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

   RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

   // 调用 doInvoke 进行后续操作

   return doInvoke(invocation, invokers, loadbalance);

  

 

  invoke 方法逻辑也很简单:

  列举 invoker

  加载 LoadBalance(自适应拓展类)

  调用 doInvoke 进行后续操作

  其中列举 invoker 如下

  

protected List Invoker T list(Invocation invocation) throws RpcException {

 

   List Invoker T invokers = directory.list(invocation);

   return invokers;

  

 

  list 方法就是调用服务目录的 list 方法,里面做了两件事(结合前面的组件图):

  list:从服务目录拿到 invoker 集合

  route:通过路由过滤出符合规则的 invoker 集合

  FailoverClusterInvoker

  doInvoke 方法具体实现在FailoverClusterInvoker中,此 invoker 的容错方式为失败自动切换。

  

public Result doInvoke(Invocation invocation, final List Invoker T invokers, LoadBalance loadbalance) throws RpcException {

 

   List Invoker T copyinvokers = invokers;

   checkInvokers(copyinvokers, invocation);

   String methodName = RpcUtils.getMethodName(invocation);

   // 获取重试次数

   int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

   if (len = 0) {

   len = 1;

   // 循环调用,失败重试

   RpcException le = null; // last exception.

   List Invoker T invoked = new ArrayList Invoker T (copyinvokers.size()); // invoked invokers.

   Set String providers = new HashSet String (len);

   for (int i = 0; i len; i++) {

   if (i 0) {

   checkWhetherDestroyed();

   // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,

   // 通过调用 list 可得到最新可用的 Invoker 列表

   copyinvokers = list(invocation);

   // check again

   // 对 copyinvokers 进行判空检查

   checkInvokers(copyinvokers, invocation);

   // 通过负载均衡选择 Invoker

   Invoker T invoker = select(loadbalance, invocation, copyinvokers, invoked);

   // 添加到 invoker 到 invoked 列表中

   invoked.add(invoker);

   // 设置 invoked 到 RPC 上下文中

   RpcContext.getContext().setInvokers((List) invoked);

   try {

   // 调用目标 Invoker 的 invoke 方法

   Result result = invoker.invoke(invocation);

   return result;

   } catch (RpcException e) {

   if (e.isBiz()) { // biz exception.

   throw e;

   le = e;

   } catch (Throwable e) {

   le = new RpcException(e.getMessage(), e);

   } finally {

   providers.add(invoker.getUrl().getAddress());

   // 若重试失败,则抛出异常

   throw new RpcException(xxx);

  

 

  doInvoke 方法代码量不少,但是逻辑简化之后也很简单,就是根据重试次数,在 for 循环中进行远程调用,成功则返回,失败就重试。如果重试次数耗尽还无法调用成功,则抛出异常。

  从这里可以知道,Dubbo的默认失败重试次数是3次。

  此方法中我们关注下 select 方法,它负责从 invoker 集合中选出一个 infoker

  

protected Invoker T select(LoadBalance loadbalance, Invocation invocation, List Invoker T invokers, List Invoker T selected) throws RpcException {

 

   if (invokers == null invokers.isEmpty())

   return null;

   // 获取调用方法名

   String methodName = invocation == null ? "" : invocation.getMethodName();

   // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的

   // 调用同一个服务提供者,除非该提供者挂了再进行切换

   boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

   // 检测 invokers 列表是否包含 stickyInvoker,如果不包含,

   // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空

   if (stickyInvoker != null !invokers.contains(stickyInvoker)) {

   stickyInvoker = null;

   // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含

   // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。

   // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。

   if (sticky stickyInvoker != null (selected == null !selected.contains(stickyInvoker))) {

   // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的

   // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。

   if (availablecheck stickyInvoker.isAvailable()) {

   return stickyInvoker;

   // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。

   // 此时继续调用 doSelect 选择 Invoker

   Invoker T invoker = doSelect(loadbalance, invocation, invokers, selected);

   // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker

   if (sticky) {

   stickyInvoker = invoker;

   return invoker;

  

 

  select 方法主要处理对粘滞连接特性的支持。注释写的很清楚了。选择 invoker 的操作在 doSelect 方法

  

private Invoker T doSelect(LoadBalance loadbalance, Invocation invocation, List Invoker T invokers, List Invoker T selected) throws RpcException {

 

   if (invokers == null invokers.isEmpty())

   return null;

   if (invokers.size() == 1)

   return invokers.get(0);

   if (loadbalance == null) {

   // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance

   loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);

   // 通过负载均衡组件选择 Invoker

   Invoker T invoker = loadbalance.select(invokers, getUrl(), invocation);

   // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选

   if ((selected != null selected.contains(invoker))

   (!invoker.isAvailable() getUrl() != null availablecheck)) {

   try {

   // 进行重选

   Invoker T rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);

   if (rinvoker != null) {

   invoker = rinvoker;

   } else {

   // rinvoker 为空,定位 invoker 在 invokers 中的位置

   int index = invokers.indexOf(invoker);

   try {

   // 获取 index + 1 位置处的 Invoker,以下代码等价于:

   // invoker = invokers.get((index + 1) % invokers.size());

   invoker = index invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);

   } catch (Exception e) {

   logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);

   } catch (Throwable t) {

   logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);

   return invoker;

  

 

  这里通过负载均衡选出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中调用失败的invoker)或者不可用,则调用 reselect 方法进行重选。如果重选还是选不出 invoker,则返回 invoker 集合中的下一个元素。这里的繁琐判断,就是为了尽量保证拿到可用的 invoker

  我们继续看看 reselect 方法

  

private Invoker T reselect(LoadBalance loadbalance, Invocation invocation,

 

   List Invoker T invokers, List Invoker T selected, boolean availablecheck)

   throws RpcException {

   List Invoker T reselectInvokers = new ArrayList Invoker T (invokers.size() 1 ? (invokers.size() - 1) : invokers.size());

   // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下

   // 根据 availablecheck 进行不同的处理

   if (availablecheck) { // invoker.isAvailable() should be checked

   for (Invoker T invoker : invokers) {

   if (invoker.isAvailable()) {

   if (selected == null !selected.contains(invoker)) {

   reselectInvokers.add(invoker);

   if (!reselectInvokers.isEmpty()) {

   return loadbalance.select(reselectInvokers, getUrl(), invocation);

   } else { // do not check invoker.isAvailable()

   for (Invoker T invoker : invokers) {

   if (selected == null !selected.contains(invoker)) {

   reselectInvokers.add(invoker);

   if (!reselectInvokers.isEmpty()) {

   return loadbalance.select(reselectInvokers, getUrl(), invocation);

   // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。

   // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中

   if (selected != null) {

   for (Invoker T invoker : selected) {

   if ((invoker.isAvailable()) // available first

   !reselectInvokers.contains(invoker)) {

   reselectInvokers.add(invoker);

   if (!reselectInvokers.isEmpty()) {

   return loadbalance.select(reselectInvokers, getUrl(), invocation);

   return null;

  

 

  这个方法可以分成两部分:

  在非 selected 的 invoker 集合中,调用负载均衡选择一个 invoker

  在步骤1无法选出 invoker 时,在 selected 中选出 invoker

  至此,Dubbo的集群就讲完了。负载均衡有空再说。

  再论Cluster

  前面我们提到 Cluster 接口在多个地方被调用,也讲了同一个服务有多个服务提供者时的处理。那么,有多个注册中心呢,该如何处理?

  

// 类ReferenceConfig

 

  private T createProxy(Map String, String map) {

   ......

   // 本地引用

   if (isJvmRefer) {

   ......

   // 远程引用

   } else {

   ......

   // 单个注册中心或服务提供者(服务直连,下同)

   if (urls.size() == 1) {

   // 调用 RegistryProtocol 的 refer 构建 Invoker 实例

   invoker = refprotocol.refer(interfaceClass, urls.get(0));

   // 多个注册中心或多个服务提供者,或者两者混合

   } else {

   List Invoker ? invokers = new ArrayList Invoker ? ();

   URL registryURL = null;

   // 获取所有的 Invoker

   for (URL url : urls) {

   // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时

   // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法

   invokers.add(refprotocol.refer(interfaceClass, url));

   if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

   registryURL = url; // use last registry url

   if (registryURL != null) { // registry url is available

   // use AvailableCluster only when registers cluster is available

   // 如果注册中心链接不为空,则将使用 AvailableCluster

   URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);

   // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并

   invoker = cluster.join(new StaticDirectory(u, invokers));

   } else { // not a registry url

   invoker = cluster.join(new StaticDirectory(invokers));

   // 生成代理类

   return (T) proxyFactory.getProxy(invoker);

  

 

  createProxy 是服务引用时,生成服务代理对象的方法。这里会判断,如果有多个注册中心,会再封装一层集群,也就是先选择注册中心,再选择服务提供者。

  这里一般情况 registryURL 不为空,cluster 使用的是AvailableCluster

  

public class AvailableCluster implements Cluster {

 

   public static final String NAME = "available";

   @Override

   public T Invoker T join(Directory T directory) throws RpcException {

   return new AbstractClusterInvoker T (directory) {

   @Override

   public Result doInvoke(Invocation invocation, List Invoker T invokers, LoadBalance loadbalance) throws RpcException {

   for (Invoker T invoker : invokers) {

   if (invoker.isAvailable()) {

   return invoker.invoke(invocation);

   throw new RpcException("No provider available in " + invokers);

  

 

  AvailableCluster的逻辑很简单,按顺序选择可使用的 invoker (这里的invoker其实就是每个注册中心)

  本篇文章介绍了Dubbo集群容错的整体工作过程和调用逻辑。Dubbo提供了多种集群实现,本文只介绍了Failover Cluster,其余实现感兴趣的可以自行查看源码。

  参考资料

  Dubbo开发指南

  以上就是Dubbo源码(七)(深度剖析dubbo源码)的详细内容,想要了解更多 Dubbo源码(七)的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: