Dubbo源码(五)(深度剖析dubbo源码)

  本篇文章为你整理了Dubbo源码(五)(深度剖析dubbo源码)的详细内容,包含有dubbo源码解析20pdf 深度剖析dubbo源码 dubbo spi源码 dubbo示例代码 Dubbo源码(五),希望能帮助你了解 Dubbo源码(五)。

  本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

  今天,来聊聊Dubbo的服务目录(Directory)。下面是官方文档对服务目录的定义:

  服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

  服务目录持有Invoker对象集合,Dubbo的服务调用均由Invoker发起。

  当服务提供者信息发生变化时(比如某一个服务挂了),服务目录也需要动态调整。

  服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory。它们均继承自AbstractDirectory,而 AbstractDirectory 实现了 Directory 接口。Directory 接口提供了list(Invocation invocation) 方法,这个方法就是用来获取 invoker 集合的。

  再看 RegistryDirectory 实现了 NotifyListener 接口,这个接口中只有一个方法,notify(List urls),当注册中心节点信息发生变化后,触发此方法调整服务目录中的配置信息以及 invoker 集合。

  上面我们讲了,服务调用需求用到 invoker,而服务目录持有 invoker 集合,并通过 list 方法提供 invoker。下面放上服务消费者Demo中DemoService#sayHello 方法的调用路径

  AbstractDirectory 实现了 Directory 接口的 list 方法

  

public List Invoker T list(Invocation invocation) throws RpcException {

 

   if (destroyed) {

   throw new RpcException("Directory already destroyed .url: " + getUrl());

   // 调用 doList 方法列举 Invoker,doList 是模板方法,由子类实现

   List Invoker T invokers = doList(invocation);

   // 获取路由 Router 列表

   List Router localRouters = this.routers; // local reference

   if (localRouters != null !localRouters.isEmpty()) {

   for (Router router : localRouters) {

   try {

   // 获取 runtime 参数,并根据参数决定是否进行路由

   if (router.getUrl() == null router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {

   // 进行服务路由

   invokers = router.route(invokers, getConsumerUrl(), invocation);

   } catch (Throwable t) {

   logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);

   return invokers;

  

 

  此方法就两段逻辑:

  通过 doList 获取 invoker 集合

  通过路由选择合适的 invoker

  路由非本文重点,略过。

  doList 是模板方法,由子类实现。

  StaticDirectory

  StaticDirectory 是一个静态服务目录,其 invokers 集合通过构造方法注入,不应被改变。

  

// StaticDirectory的doList啥都没做,直接返回持有的invokers

 

  protected List Invoker T doList(Invocation invocation) throws RpcException {

   // 列举 Inovker,也就是直接返回 invokers 成员变量

   return invokers;

  

 

  StaticDirectory 的其它方法就不分析了,同样很简单。

  RegistryDirectory

  RegistryDirectory 是动态调整的服务目录,其持有的 invokers 有内部方法生成。

  在上篇博文《Dubbo源码(四) - 服务引用(消费者)》中,我留了一个坑,也就是服务引用过程中,创建了注册中心之后,如何订阅节点数据。在RegistryProtocol#doRefer方法中。

  其中调用了RegistryDirectory#subscribe(URL url)方法

  

public void subscribe(URL url) {

 

   setConsumerUrl(url);

   registry.subscribe(url, this);

  

 

  我们用的注册中心是 zookeeper,所以 registry 是 ZookeeperRegistry,而 subscribe 方法的实现在其父类FailbackRegistry中

  

public void subscribe(URL url, NotifyListener listener) {

 

   super.subscribe(url, listener);

   removeFailedSubscribed(url, listener);

   try {

   // Sending a subscription request to the server side

   doSubscribe(url, listener);

   } catch (Exception e) {

   ......

   // 订阅失败处理

   addFailedSubscribed(url, listener);

  

 

  模板方法,调用子类的 doSubscribe 方法

  

protected void doSubscribe(final URL url, final NotifyListener listener) {

 

   try {

   if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {

   } else {

   List URL urls = new ArrayList URL

   // 切割路径(providers、configurators、routers等)

   for (String path : toCategoriesPath(url)) {

   ConcurrentMap NotifyListener, ChildListener listeners = zkListeners.get(url);

   if (listeners == null) {

   zkListeners.putIfAbsent(url, new ConcurrentHashMap NotifyListener, ChildListener

   listeners = zkListeners.get(url);

   // 缓存操作,获取节点监听器

   ChildListener zkListener = listeners.get(listener);

   if (zkListener == null) {

   listeners.putIfAbsent(listener, new ChildListener() {

   @Override

   public void childChanged(String parentPath, List String currentChilds) {

   // 这里和方法末尾的 notify(url, listener, urls); 是调用的同一个方法

   // 节点变更时触发变更操作

   ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));

   zkListener = listeners.get(listener);

   zkClient.create(path, false);

   // 注册节点监听器

   List String children = zkClient.addChildListener(path, zkListener);

   if (children != null) {

   urls.addAll(toUrlsWithEmpty(url, path, children));

   // 触发节点变更操作

   notify(url, listener, urls);

   } catch (Throwable e) {

   throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  

 

  订阅方法做了3个操作:

  切割url,拆分订阅路径

  创建节点监听器

  触发节点变更操作

  这里注意下,订阅时节点数据并没有发生变更,所以需要手动触发 notify 方法。

  下面继续看节点变更操作做了什么,调用路径有点深,就不一步一步调试了,直接把路径写在注释上。

  

// FailbackRegistry#notify(URL url, NotifyListener listener, List URL urls) - 

 

  // FailbackRegistry#doNotify(URL url, NotifyListener listener, List URL urls) -

  // AbstractRegistry#notify(URL url, NotifyListener listener, List URL urls)

  protected void notify(URL url, NotifyListener listener, List URL urls) {

   ......

   Map String, List URL result = new HashMap String, List URL ();

   // 将urls按分类分组转成map

   ......

   for (Map.Entry String, List URL entry : result.entrySet()) {

   String category = entry.getKey();

   List URL categoryList = entry.getValue();

   categoryNotified.put(category, categoryList);

   saveProperties(url);

   listener.notify(categoryList);

  

 

  此处的listener变量,就是本节的主角RegistryDirectory,下面来分析 listener.notify(categoryList)

  

public synchronized void notify(List URL urls) {

 

   // 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url

   List URL invokerUrls = new ArrayList URL

   List URL routerUrls = new ArrayList URL

   List URL configuratorUrls = new ArrayList URL

   // 根据 category 参数分别对3种url进行处理

   ......

   // 刷新 Invoker 列表

   refreshInvoker(invokerUrls);

  

 

  此方法分别对服务提供者 url,路由 url,配置器 url各自进行了处理,这里我省略了对路由 url 和配置器 url 的处理,感兴趣的自行去看源码。咱们聚焦在 Invoker 的处理中

  

private void refreshInvoker(List URL invokerUrls) {

 

   // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务

   if (invokerUrls != null invokerUrls.size() == 1 invokerUrls.get(0) != null

   Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

   // 设置 forbidden 为 true

   this.forbidden = true; // Forbid to access

   this.methodInvokerMap = null; // Set the method invoker map to null

   // 销毁所有 Invoker

   destroyAllInvokers(); // Close all invokers

   } else {

   this.forbidden = false; // Allow to access

   Map String, Invoker T oldUrlInvokerMap = this.urlInvokerMap; // local reference

   if (invokerUrls.isEmpty() this.cachedInvokerUrls != null) {

   // 添加缓存 url 到 invokerUrls 中

   invokerUrls.addAll(this.cachedInvokerUrls);

   } else {

   this.cachedInvokerUrls = new HashSet URL

   // 缓存 invokerUrls

   this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison

   if (invokerUrls.isEmpty()) {

   return;

   // 将 url 转成 Invoker

   Map String, Invoker T newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

   // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射

   Map String, List Invoker T newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

   // state change

   // If the calculation is wrong, it is not processed.

   // 转换出错,直接打印异常,并返回

   if (newUrlInvokerMap == null newUrlInvokerMap.size() == 0) {

   logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));

   return;

   // 合并多个组的 Invoker

   this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;

   this.urlInvokerMap = newUrlInvokerMap;

   try {

   // 销毁无用 Invoker

   destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

   } catch (Exception e) {

   logger.warn("destroyUnusedInvokers error. ", e);

  

 

  此方法中的逻辑有点多,

  判断是否要销毁所有 invoker

  创建 invoker

  销毁无用 invoker

  我们关注下 invoker 的创建,toInvokers(invokerUrls)

  

private Map String, Invoker T toInvokers(List URL urls) {

 

   ......

   // 获取服务消费端配置的协议

   String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);

   for (URL providerUrl : urls) {

   ......

   // 将本地 Invoker 缓存赋值给 localUrlInvokerMap

   Map String, Invoker T localUrlInvokerMap = this.urlInvokerMap; // local reference

   Invoker T invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

   if (invoker == null) { // Not in the cache, refer again

   try {

   boolean enabled = true;

   if (url.hasParameter(Constants.DISABLED_KEY)) {

   // 获取 disable 配置,取反,然后赋值给 enable 变量

   enabled = !url.getParameter(Constants.DISABLED_KEY, false);

   } else {

   // 获取 enable 配置,并赋值给 enable 变量

   enabled = url.getParameter(Constants.ENABLED_KEY, true);

   if (enabled) {

   // 调用 refer 获取 Invoker

   invoker = new InvokerDelegate T (protocol.refer(serviceType, url), url, providerUrl);

   } catch (Throwable t) {

   logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);

   if (invoker != null) { // Put new invoker in cache

   // 缓存 Invoker 实例

   newUrlInvokerMap.put(key, invoker);

   // 缓存命中

   } else {

   // 将 invoker 存储到 newUrlInvokerMap 中

   newUrlInvokerMap.put(key, invoker);

   keys.clear();

   return newUrlInvokerMap;

  

 

  这里的判断有点复杂,会对协议各种判断(是否支持、是否为empty)等,然后如果缓存未命中,则需要创建invoker,也就是protocol.refer(serviceType, url)这一段代码。

  此时,我们上一篇文章留下的另一个坑也填上了,也就是DubboProtocol#refer的调用时机。

  获取invoker集合

  

public List Invoker T doList(Invocation invocation) {

 

   ......

   List Invoker T invokers = null;

   // 获取 Invoker 本地缓存

   Map String, List Invoker T localMethodInvokerMap = this.methodInvokerMap; // local reference

   if (localMethodInvokerMap != null localMethodInvokerMap.size() 0) {

   // 获取方法名和参数列表

   String methodName = RpcUtils.getMethodName(invocation);

   Object[] args = RpcUtils.getArguments(invocation);

   // 检测参数列表的第一个参数是否为 String 或 enum 类型

   if (args != null args.length 0 args[0] != null

   (args[0] instanceof String args[0].getClass().isEnum())) {

   // 通过 方法名 + 第一个参数名称 查询 Invoker 列表,具体的使用场景暂时没想到

   invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter

   if (invokers == null) {

   // 通过方法名获取 Invoker 列表

   invokers = localMethodInvokerMap.get(methodName);

   if (invokers == null) {

   // 通过星号 * 获取 Invoker 列表

   invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);

   // 冗余逻辑,pull request #2861 移除了下面的 if 分支代码

   if (invokers == null) {

   Iterator List Invoker T iterator = localMethodInvokerMap.values().iterator();

   if (iterator.hasNext()) {

   invokers = iterator.next();

   // 返回 Invoker 列表

   return invokers == null ? new ArrayList Invoker T (0) : invokers;

  

 

  这里的逻辑也很简单,就是从类变量 methodInvokerMap 中获取invoker,所有我们需要去看看 methodInvokerMap 的赋值。

  我们在上一小节的 refreshInvoker 方法中,讲了 invoker 的生成。refreshInvoker 方法中还有对methodInvokerMap 的处理。也就是 toMethodInvokers(newUrlInvokerMap) 方法

  这里面会将 url-invoker 的映射转成 方法名-invoker 的映射。

  Dubbo的服务调用,需要通过服务目录拿到 invoker 才能发起。当注册中心发生变化时,服务目录同样需要动态调整,并刷新持有的 invoker 集合。服务目录是 Dubbo 集群容错的一部分,也是比较基础的部分。

  PS:以上讲的不包含本地服务调用,别杠

  参考资料

  Dubbo开发指南

  以上就是Dubbo源码(五)(深度剖析dubbo源码)的详细内容,想要了解更多 Dubbo源码(五)的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: