Dubbo源码(三)(深度剖析dubbo源码)

  本篇文章为你整理了Dubbo源码(三)(深度剖析dubbo源码)的详细内容,包含有dubbo源码解析20pdf 深度剖析dubbo源码 dubbo spi源码 dubbo示例代码 Dubbo源码(三),希望能帮助你了解 Dubbo源码(三)。

  本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

  在了解了Dubbo SPI后,我们来了解下Dubbo服务导出的过程。

  Dubbo的配置是通过DubboNamespaceHandler读取解析的,其中会将Dubbo服务提供者封装成ServiceBean注入Spring容器中。而服务导出就是在ServiceBean的onApplicationEvent开始的。

  想了解DubboNamespaceHandler的工作原理,请自行去了解Spring自定义标签,本文略。

  服务导出的入口方法是 ServiceBean 的 onApplicationEvent。因为代码过多,接下来会忽略部分代码,提供调用链条,专注于主要部分。

  

// ServiceBean#onApplicationEvent(ContextRefreshedEvent event) - 

 

  // ServiceBean#export() -

  // ServiceConfig#export() -

  // ServiceConfig#doExport() -

  // ServiceConfig#doExportUrls()

  private void doExportUrls() {

   // 加载注册中心链接

   List URL registryURLs = loadRegistries(true);

   // 遍历 protocols,并在每个协议下导出服务

   for (ProtocolConfig protocolConfig : protocols) {

   doExportUrlsFor1Protocol(protocolConfig, registryURLs);

  

 

  Dubbo是支持多注册中心多协议的。下面继续看doExportUrlsFor1Protocol方法。

  

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List URL registryURLs) {

 

   ......

   // 判断协议名是否为 injvm(本地调用)

   if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {

   protocolConfig.setRegister(false);

   map.put("notify", "false");

   ......

   String scope = url.getParameter(Constants.SCOPE_KEY);

   // 如果 scope = none,则什么都不做

   if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

   // scope != remote,导出到本地

   if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {

   exportLocal(url);

   // scope != local,导出到远程

   if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {

   if (logger.isInfoEnabled()) {

   logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);

   if (registryURLs != null !registryURLs.isEmpty()) {

   for (URL registryURL : registryURLs) {

   ......

   // 为服务提供类(ref)生成 Invoker

   Invoker ? invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

   // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig

   DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

   // 导出服务,并生成 Exporter

   Exporter ? exporter = protocol.export(wrapperInvoker);

   exporters.add(exporter);

   } else {

   ......

   this.urls.add(url);

  

 

  这里我们只保留主要代码,完整的注释可以去看我fork的注释版源码或者官方开发指南。

  这里主要逻辑有3部分:

  
invoker的生成

  Invoker ? invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

  
导出服务,并生成 Exporter

  Exporter ? exporter = protocol.export(wrapperInvoker);

  导出服务包含了远程服务暴露和注册中心处理。

  
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

  Invoker 是由 ProxyFactory 创建而来,ProxyFactory 是方法级别的自适应拓展接口,其生成的自适应拓展类如下:

  

package com.alibaba.dubbo.rpc;

 

  import com.alibaba.dubbo.common.extension.ExtensionLoader;

  public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {

   public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {

   if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

   if (arg0.getUrl() == null)

   throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");

   com.alibaba.dubbo.common.URL url = arg0.getUrl();

   String extName = url.getParameter("proxy", "javassist");

   if (extName == null)

   throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

   com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

   return extension.getProxy(arg0);

   public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0, boolean arg1) throws com.alibaba.dubbo.rpc.RpcException {

   if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

   if (arg0.getUrl() == null)

   throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");

   com.alibaba.dubbo.common.URL url = arg0.getUrl();

   String extName = url.getParameter("proxy", "javassist");

   if (extName == null)

   throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

   com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

   return extension.getProxy(arg0, arg1);

   public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {

   if (arg2 == null) throw new IllegalArgumentException("url == null");

   com.alibaba.dubbo.common.URL url = arg2;

   String extName = url.getParameter("proxy", "javassist");

   if (extName == null)

   throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

   com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

   return extension.getInvoker(arg0, arg1, arg2);

  

 

  从上面可以看到,默认的实现类是JavassistProxyFactory,进去看看是如何创建invoker的

  

public T Invoker T getInvoker(T proxy, Class T type, URL url) {

 

   // 为目标类创建 Wrapper

   // 此处是动态生成的Wrapper的实现类,会重写invokeMethod方法

   final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf($) 0 ? proxy.getClass() : type);

   // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。

   return new AbstractProxyInvoker T (proxy, type, url) {

   @Override

   protected Object doInvoke(T proxy, String methodName,

   Class ? [] parameterTypes,

   Object[] arguments) throws Throwable {

   return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

  

 

  这里创建的是抽象类AbstractProxyInvoker,使用的是经典的模板模式,具体的逻辑由子类实现,去看看 AbstractProxyInvoker 是怎么封装 invoke 方法的

  

public Result invoke(Invocation invocation) throws RpcException {

 

   try {

   // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中

   return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));

   } catch (InvocationTargetException e) {

   return new RpcResult(e.getTargetException());

   } catch (Throwable e) {

   throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);

  

 

  别问我wrapper是怎么生成的,我也看不懂。。。

  本地服务暴露

  当一个应用既是一个服务的提供者,同时也是这个服务的消费者的时候,可以直接对本机提供的服务发起本地调用。一般情况下也用不到,所以不感兴趣的可以略过此节。

  当不指定范围为远程时,Dubbo默认支持本地调用的,参见前面的doExportUrlsFor1Protocol方法

  

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List URL registryURLs) {

 

   ......

   String scope = url.getParameter(Constants.SCOPE_KEY);

   // 如果 scope = none,则什么都不做

   if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

   // scope != remote,导出到本地

   if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {

   exportLocal(url);

   .....

  private void exportLocal(URL url) {

   // 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出

   if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {

   // 将协议设置为injvm

   URL local = URL.valueOf(url.toFullString())

   .setProtocol(Constants.LOCAL_PROTOCOL)

   .setHost(LOCALHOST)

   .setPort(0);

   StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));

   // 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法

   Exporter ? exporter = protocol.export(

   proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

   exporters.add(exporter);

   logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");

  

 

  重点是protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));这句,前面已经讲了invoker的生成,略过proxyFactory.getInvoker,直接看protocol.export,因为protocol也是方法级别的自适应拓展,下面照旧放上自适应拓展生成的类

  

package com.alibaba.dubbo.rpc;

 

  import com.alibaba.dubbo.common.extension.ExtensionLoader;

  public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {

   public void destroy() {

   throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");

   public int getDefaultPort() {

   throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");

   public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {

   if (arg1 == null) throw new IllegalArgumentException("url == null");

   com.alibaba.dubbo.common.URL url = arg1;

   String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());

   if (extName == null)

   throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

   com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

   return extension.refer(arg0, arg1);

   public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {

   if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

   if (arg0.getUrl() == null)

   throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");

   com.alibaba.dubbo.common.URL url = arg0.getUrl();

   String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());

   if (extName == null)

   throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

   com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

   return extension.export(arg0);

  

 

  默认协议使用的是dubbo,但是 exportLocal 方法中,已经将协议转成LOCAL_PROTOCOL,也就是InjvmProtocol,下面去看看他的 export 方法做了什么

  

public T Exporter T export(Invoker T invoker) throws RpcException {

 

   return new InjvmExporter T (invoker, invoker.getUrl().getServiceKey(), exporterMap);

  

 

  这里只创建了一个 InjvmExporter,无其他逻辑。具体使用结合服务的引出和调用过程才能分析清除,此处不多赘述。

  远程服务暴露

  让我们回到 前置准备 小节的末尾,前面说了 protocol.export(wrapperInvoker) 方法包含了远程服务暴露和注册中心处理。参数 wrapperInvoker 中的url的协议是 registry ,所以实际调用的是RegistryProtocol的export方法

  

public T Exporter T export(final Invoker T originInvoker) throws RpcException {

 

   // 远程服务暴露

   final ExporterChangeableWrapper T exporter = doLocalExport(originInvoker);

   // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:

   // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider dubbo=2.0.2 export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider

   URL registryUrl = getRegistryUrl(originInvoker);

   // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry,这里已经连接上注册中心,但是还没有将服务注册上去

   final Registry registry = getRegistry(originInvoker);

   // 获取已注册的服务提供者 URL,比如:

   // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true application=demo-provider dubbo=2.0.2 generic=false interface=com.alibaba.dubbo.demo.DemoService methods=sayHello

   final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

   // 获取 register 参数

   boolean register = registeredProviderUrl.getParameter("register", true);

   // 向服务提供者与消费者注册表中注册服务提供者

   ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

   // 根据 register 的值决定是否注册服务

   if (register) {

   // 向注册中心注册服务

   register(registryUrl, registeredProviderUrl);

   ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);

   ......

   return new DestroyableExporter T (exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);

  

 

  这里有3个重点:

  
本节重点在远程服务暴露,所以继续看doLocalExport方法

  

private T ExporterChangeableWrapper T doLocalExport(final Invoker T originInvoker) {

 

   String key = getCacheKey(originInvoker);

   // 访问缓存

   ExporterChangeableWrapper T exporter = (ExporterChangeableWrapper T ) bounds.get(key);

   if (exporter == null) {

   synchronized (bounds) {

   exporter = (ExporterChangeableWrapper T ) bounds.get(key);

   if (exporter == null) {

   // 创建 Invoker 为委托类对象

   final Invoker ? invokerDelegete = new InvokerDelegete T (originInvoker, getProviderUrl(originInvoker));

   // 调用 protocol 的 export 方法导出服务

   exporter = new ExporterChangeableWrapper T ((Exporter T ) protocol.export(invokerDelegete), originInvoker);

   bounds.put(key, exporter);

   return exporter;

  

 

  这里就是一些缓存操作,ExporterChangeableWrapper就是对unexport方法做了一下包装,逻辑很简单。我们重点关注protocol.export(invokerDelegete),而protocol是自适应拓展类,根据参数invokerDelegete中的url,实际调用的是DubboProtocol#export(invokerDelegete)

  

public T Exporter T export(Invoker T invoker) throws RpcException {

 

   URL url = invoker.getUrl();

   // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:

   // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880

   String key = serviceKey(url);

   // 创建 DubboExporter

   DubboExporter T exporter = new DubboExporter T (invoker, key, exporterMap);

   // 将 key, exporter 键值对放入缓存中

   exporterMap.put(key, exporter);

   // 本地存根相关代码

   ......

   // 启动服务器

   openServer(url);

   // 优化序列化

   optimizeSerialization(url);

   return exporter;

  

 

  Dubbo的网络传输层默认使用的是Netty,openServer方法就是启动netty服务,进去看看

  

private void openServer(URL url) {

 

   // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例

   String key = url.getAddress();

   boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);

   if (isServer) {

   ExchangeServer server = serverMap.get(key);

   if (server == null) {

   // 创建服务器实例

   serverMap.put(key, createServer(url));

   } else {

   // 服务器已创建,则根据 url 中的配置重置服务器

   server.reset(url);

  

 

  根据ip+端口判断是否已经创建,已经创建就根据 url 中的配置重置服务器(例如修改线程池配置,这个涉及线程派发模型,此处不多赘述)。我们重点关注服务器实例的创建。

  

private ExchangeServer createServer(URL url) {

 

   url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());

   // 添加心跳检测配置到 url 中

   url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

   // 获取 server 参数,默认为 netty

   String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

   // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常

   if (str != null str.length() 0 !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))

   throw new RpcException("Unsupported server type: " + str + ", url: " + url);

   // 添加编码解码器参数

   url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

   ExchangeServer server;

   try {

   // 创建 ExchangeServer

   server = Exchangers.bind(url, requestHandler);

   } catch (RemotingException e) {

   throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);

   // 获取 client 参数,可指定 netty,mina

   str = url.getParameter(Constants.CLIENT_KEY);

   if (str != null str.length() 0) {

   // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]

   Set String supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();

   // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,

   // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常

   if (!supportedTypes.contains(str)) {

   throw new RpcException("Unsupported client type: " + str);

   return server;

  

 

  这里我们关注Exchangers.bind(url, requestHandler)方法,其前面逻辑是检测是否支持server所需的协议,后面的逻辑是检测是否支持client所需的协议。

  

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

 

   ......

   url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

   // 获取 Exchanger,默认为 HeaderExchanger。

   // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例

   return getExchanger(url).bind(url, handler);

  

 

  不多bb,直接跳到HeaderExchanger.bind(url, handler)

  

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

 

   // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:

   // 1. new HeaderExchangeHandler(handler)

   // 2. new DecodeHandler(new HeaderExchangeHandler(handler))

   // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))

   return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

  

 

  HeaderExchangeHandler 和 DecodeHandler 都是handler的装饰类。HeaderExchangeServer内部持有Server,并封装了心跳的功能。不多赘述,我们的重点在 Transporters.bind ,也就是如何生成Server

  

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {

 

   ......

   ChannelHandler handler;

   if (handlers.length == 1) {

   handler = handlers[0];

   } else {

   // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器

   handler = new ChannelHandlerDispatcher(handlers);

   // 获取自适应 Transporter 实例,并调用实例方法

   return getTransporter().bind(url, handler);

  public static Transporter getTransporter() {

   return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();

  

 

  getTransporter() 获取的是自适应拓展类,默认是NettyTransporter, 去看看

  

public Server bind(URL url, ChannelHandler listener) throws RemotingException {

 

   return new NettyServer(url, listener);

  

 

  这里就创建了一个NettyServer对象,很明显,启动服务的相关逻辑在NettyServer的构造函数中

  

public class NettyServer extends AbstractServer implements Server {

 

   // 构造方法

   public NettyServer(URL url, ChannelHandler handler) throws RemotingException {

   super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));

  public abstract class AbstractServer extends AbstractEndpoint implements Server {

   public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {

   ......

   try {

   // 调用模板方法 doOpen 启动服务器

   doOpen();

   if (logger.isInfoEnabled()) {

   logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

   } catch (Throwable t) {

   throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()

   + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);

   ......

  

 

  NettyServer 调用的是父类的构造,而在父类 AbstractServer 中,又调用了子类的 doOpen() ,明显的模板模式。重新回到 NettyServer 看 doOpen 方法

  

protected void doOpen() throws Throwable {

 

   bootstrap = new ServerBootstrap();

   // 创建 boss 和 worker 线程池

   bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));

   workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),

   new DefaultThreadFactory("NettyServerWorker", true));

   final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

   channels = nettyServerHandler.getChannels();

   bootstrap.group(bossGroup, workerGroup)

   .channel(NioServerSocketChannel.class)

   .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

   .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)

   .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

   .childHandler(new ChannelInitializer NioSocketChannel () {

   @Override

   protected void initChannel(NioSocketChannel ch) throws Exception {

   NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);

   ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug

   .addLast("decoder", adapter.getDecoder())

   .addLast("encoder", adapter.getEncoder())

   .addLast("handler", nettyServerHandler);

   // 绑定到指定的 ip 和端口上

   ChannelFuture channelFuture = bootstrap.bind(getBindAddress());

   channelFuture.syncUninterruptibly();

   channel = channelFuture.channel();

  

 

  这段是Netty启动服务的代码,就不多解释了。

  至此,远程服务暴露的过程就分析完了。

  注册中心处理

  OK,让我们把视线放回RegistryProtocol的export方法中,上节说了远程服务暴露,本节就来说说剩下的创建注册中心连接以及向注册中心注册服务

  创建注册中心连接

  创建注册中心连接,是在getRegistry(originInvoker)方法中

  

private Registry getRegistry(final Invoker ? originInvoker) {

 

   URL registryUrl = getRegistryUrl(originInvoker);

   return registryFactory.getRegistry(registryUrl);

  

 

  registryFactory 是自适应拓展类,根据参数 registryUrl 的协议protocol字段,可知实际调用的是ZookeeperRegistryFactory,getRegistry 方法在ZookeeperRegistryFactory的父类AbstractRegistryFactory中

  

// AbstractRegistryFactory#getRegistry(URL url)

 

  public Registry getRegistry(URL url) {

   url = url.setPath(RegistryService.class.getName())

   .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())

   .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);

   String key = url.toServiceStringWithoutResolving();

   LOCK.lock();

   try {

   Registry registry = REGISTRIES.get(key);

   if (registry != null) {

   return registry;

   // 缓存未命中,创建 Registry 实例

   registry = createRegistry(url);

   if (registry == null) {

   throw new IllegalStateException("Can not create registry " + url);

   REGISTRIES.put(key, registry);

   return registry;

   } finally {

   // Release the lock

   LOCK.unlock();

  

 

  这里就是一些URL的参数处理以及缓存操作,主要看createRegistry(url),此方法由子类ZookeeperRegistryFactory实现

  

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

 

   private ZookeeperTransporter zookeeperTransporter;

   // zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive

   public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {

   this.zookeeperTransporter = zookeeperTransporter;

   @Override

   public Registry createRegistry(URL url) {

   return new ZookeeperRegistry(url, zookeeperTransporter);

  

 

  这里注意一下,zookeeperTransporter 是由Dubbo SPI机制自动注入的。createRegistry 方法就创建了 ZookeeperRegistry 对象,所以处理逻辑应该就在它的构造方法中。

  

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {

 

   ......

   // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporte

   zkClient = zookeeperTransporter.connect(url);

   // 添加状态监听器

   zkClient.addStateListener(new StateListener() {

   @Override

   public void stateChanged(int state) {

   if (state == RECONNECTED) {

   try {

   recover();

   } catch (Exception e) {

   logger.error(e.getMessage(), e);

  

 

  这里,我们关注的是 zookeeperTransporter.connect(url),zookeeperTransporter的默认实现类是CuratorZookeeperTransporte,它的 connect 方法就是创建了CuratorZookeeperClient,所以直接去看构造方法

  

public CuratorZookeeperClient(URL url) {

 

   super(url);

   try {

   int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);

   // 创建 CuratorFramework 构造器

   CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()

   .connectString(url.getBackupAddress())

   .retryPolicy(new RetryNTimes(1, 1000))

   .connectionTimeoutMs(timeout);

   String authority = url.getAuthority();

   if (authority != null authority.length() 0) {

   builder = builder.authorization("digest", authority.getBytes());

   // 构建 CuratorFramework 实例

   client = builder.build();

   // 添加监听器

   client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

   @Override

   public void stateChanged(CuratorFramework client, ConnectionState state) {

   if (state == ConnectionState.LOST) {

   CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);

   } else if (state == ConnectionState.CONNECTED) {

   CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);

   } else if (state == ConnectionState.RECONNECTED) {

   CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);

   // 启动客户端

   client.start();

   } catch (Exception e) {

   throw new IllegalStateException(e.getMessage(), e);

  

 

  这里就是 Apache Curator 操作 zookeeper 的一些相关操作了,不多赘述。至此,注册中心创建过程结束,已连接 zookeeper

  向注册中心注册服务

  注册中心已创建,下一步就是向注册中心注册服务。

  回到RegistryProtocol的export方法中,register(registryUrl, registeredProviderUrl) 方法就是向注册中心注册服务

  

public void register(URL registryUrl, URL registedProviderUrl) {

 

   // 创建注册中心(此时会从缓存获取)

   Registry registry = registryFactory.getRegistry(registryUrl);

   // 将服务注册到注册中心

   registry.register(registedProviderUrl);

  

 

  registryFactory.getRegistry(registryUrl) 在上一节讲过,已经创建了注册中心(类型为ZookeeperRegistry),此时会命中缓存,直接返回。

  ZookeeperRegistry并没有实现 register 方法,在其父类FailbackRegistry中。

  

public void register(URL url) {

 

   super.register(url);

   failedRegistered.remove(url);

   failedUnregistered.remove(url);

   try {

   // 模板方法,由子类实现

   doRegister(url);

   } catch (Exception e) {

   Throwable t = e;

   // 获取 check 参数,若 check = true 将会直接抛出异常

   boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)

   url.getParameter(Constants.CHECK_KEY, true)

   !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());

   boolean skipFailback = t instanceof SkipFailbackWrapperException;

   if (check skipFailback) {

   if (skipFailback) {

   t = t.getCause();

   throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);

   } else {

   logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);

   // 记录注册失败的链接

   failedRegistered.add(url);

  

 

  如果注册失败,则记录在failedRegistered中,用于重试。我们重点关注doRegister(url),其实现在子类ZookeeperRegistry中

  

protected void doRegister(URL url) {

 

   try {

   // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:

   // /${group}/${serviceInterface}/providers/${url}

   // 比如

   // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......

   zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

   } catch (Throwable e) {

   throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  

 

  继续看 zkClient.create 方法

  

public void create(String path, boolean ephemeral) {

 

   if (!ephemeral) {

   // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在

   if(persistentExistNodePath.contains(path)){

   return;

   if (checkExists(path)) {

   persistentExistNodePath.add(path);

   return;

   int i = path.lastIndexOf(/);

   if (i 0) {

   // 递归创建上一级路径

   create(path.substring(0, i), false);

   if (ephemeral) {

   createEphemeral(path);

   } else {

   createPersistent(path);

   persistentExistNodePath.add(path);

  

 

  根据 ephemeral 参数判断,是创建临时节点还是永久节点。这里注意一下,path 的切割,是从后往前的。下面放上zookeeper的节点结构(可视化工具ZooInspector)

  到这里,向注册中心注册服务也讲完了

  在Dubbo中,大量使用了Dubbo SPI机制(自动注入、自适应拓展),且很多地方都使用了模板模式。

  参考资料

  Dubbo开发指南

  以上就是Dubbo源码(三)(深度剖析dubbo源码)的详细内。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: