nacos源码编译,nacos 读法

  nacos源码编译,nacos 读法

  为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的。

  这篇文章将会带大家阅读高洛源码以及教大家阅读源码的技巧,我们正式开始吧!

  先给大家献上一张我梳理的高清源码图,方便大家对高洛的源码有一个整体上的认识。

  有了这张图,我们就很容易去看高洛源码了。

  

如何找切入点

首先我们得要找一个切入点进入到高洛源码中,那么就从高洛依赖入手

 

  依赖groupIdcom.alibaba.cloud/groupId artifactId spring-cloud-starter-Ali-nacos-discovery/artifactId/dependency进入这个依赖文件,会发现它又依赖了一个组件:

  依赖性groupIdcom.alibaba.cloud/groupId artifactId spring-cloud-Alibaba-nacos-发现/artifactId/依赖进入依赖之后,我们发现它长这样:

  从这张图中,我们发现了一个熟悉的配置文件春天。工厂,这是sringboot自动装配的必备文件

  org。spring框架。靴子。自动配置。启用自动配置= com.alibaba.cloud。玉米片。发现。玉米片。ribbonnacosautoconfiguration。阿里巴巴。云。玉米片。端点。nacosdiscovery端点自动配置。阿里巴巴。云。玉米片。注册表。nacos服务注册表自动配置。阿里巴巴。云。玉米片。发现客户端配置。阿里巴巴。云因为这张主要说的是服务注册源码,所以我们可以只用关注(NacosServiceRegistryAutoConfiguration)自动装配文件

  公众的类NacosServiceRegistryAutoConfiguration { @ bean public NacosServiceRegistry NacosServiceRegistry(nacosdiscovery properties){ return new NacosServiceRegistry(nacosdiscovery properties);} @ ConditionalOnBean(autoserviceregistrationproperties。class)public nacos registration nacos registration(nacosdiscovery属性nacos discovery属性,应用程序上下文){ return new nacos registration(nacos discovery属性,上下文);公共NacosAutoServiceRegistration

  sServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);}我们看到的是三个bean注入,这里给大家介绍一个看源码的小技巧:自动装配的文件中申明的bean类,我们只需要看带有auto的bean,这个往往是入口;NacosAutoServiceRegistration 带有auto,我们点进去看看里面都有什么:

  

@Overrideprotected void register() {if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {log.debug("Registration disabled.");return;}if (this.registration.getPort() < 0) {this.registration.setPort(getPort().get());}super.register();}

里面有一个register()方法,我在这里打个断点,因为我猜测这个就是注册的入口,我现在使用debug模式,启动一个服务,看它会不会调用这个方法:

 

  

客户端注册

这里贴上我debug后,进入register方法的调用链截图

 

  

 

  看到这个调用链,看到一个onApplicationEvent的回调方法,找到这个方法所在的类AbstractAutoServiceRegistration这个类继承了ApplicationListener这个多播器监听器,spring启动之后,会发布多播器事件,然后回调实现多播器组件的onApplicationEvent方法,我们从这个方法开始分析:

  

public void onApplicationEvent(WebServerInitializedEvent event) {bind(event); // 绑定端口,并启动}@Deprecatedpublic void bind(WebServerInitializedEvent event) {// 设置端口 this.port.compareAndSet(0, event.getWebServer().getPort()); // 启动客户端注册组件this.start();}public void start() { // 省略分支代码 // 调用注册register();}

因为springcloud提供了多种注册中心扩展,但是我们这里只引用了nacos注册中心,所以这里直接调用的是NacosServiceRegistry的register方法:

 

  

public void register(Registration registration) { // 省略分支代码 // 获取服务idString serviceId = registration.getServiceId();// 获取组配置String group = nacosDiscoveryProperties.getGroup(); // 封装服务实例Instance instance = getNacosInstanceFromRegistration(registration);// 调用 命名服务的 registerInstance方法 注册实例namingService.registerInstance(serviceId, group, instance);}

进入到registerInstance方法

 

  

 public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { // 省略分支代码 // 与服务端建立心跳,默认每隔5秒定时发送新跳包 this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } // 通过http方式向服务端发送注册请求 this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }

serverproxy通过调用对http进行封装的reapi方法,向服务端接口("/nacos/v1/ns/instance")发送请求,

 

  

 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST"); }

我们知道nacos经常是以集群形式部署的,那客户端是如何选择其中一个节点发送呢,肯定得实现负载均衡的逻辑,我们点击reqAPI,看它是如何实现的

 

  

 if (servers != null && !servers.isEmpty()) { Random random = new Random(System.currentTimeMillis()); // 随机获取一个索引,servers保存的是所有nacos节点地址 int index = random.nextInt(servers.size()); // 遍历所有节点,根据index值,从servers中找到对应位置的server,进行请求调用,如果调用成功则返回,否则依次往后遍历,直到请求成功 for(int i = 0; i < servers.size(); ++i) { String server = (String)servers.get(index); try { return this.callServer(api, params, server, method); } catch (NacosException var11) { exception = var11; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11); } catch (Exception var12) { exception = var12; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12); } // index+1 然后取模 是保证index不会越界 index = (index + 1) % servers.size(); } throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage()); }

到这里,客户端注册的代码已经分析完了,不过这还不是本篇的结束,我们还得继续分析服务端是如何处理客户端发送过来的注册请求:

 

  

服务端处理客户端注册请求

如果需要查看服务端源码的话,则需要将nacos源码下下来 下载地址

 

  我们从服务注册api接口地址(/nacos/v1/ns/instance),可以找到对应的controller为(com.alibaba.nacos.naming.controllers.InstanceController)

  因为注册实例发送的是post请求,所以直接找被postmapping注解的register方法

  

 @CanDistro @PostMapping public String register(HttpServletRequest request) throws Exception {// 获取服务名 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 获取命名空间id String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 注册实例serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; }

我们点击进入到registerInstance方法:

 

  

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); }// 执行添加实例的操作 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }

分析

在nacos中,注册实例后,还需要将注册信息同步到其他节点,所有在nacos中存在两种同步模式AP和CP,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。com.alibaba.nacos.naming.core.ServiceManager.addInstance()

 

  

 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 生成服务的key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 获取服务 Service service = getService(namespaceId, serviceName); // 使用同步锁处理 synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 调用consistencyService.put 处理同步过来的服务 consistencyService.put(key, instances); } }

我们在进入到consistencyService.put方法中

 

  

 

  点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

  

 @Override public void put(String key, Record value) throws NacosException { // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步 mapConsistencyService(key).put(key, value); }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

 

  

 private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }

AP 方式同步的流程(ephemeralConsistencyService) 本地服务器处理注册信息&将注册信息同步到其它节点

 

  

 @Override public void put(String key, Record value) throws NacosException { // 处理本地注册列表 onPut(key, value); // 添加阻塞任务,同步信息到其他集群节点 taskDispatcher.addTask(key); }

处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

 

  这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

  线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

  

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

 

  

 @Override public void run() { List<String> keys = new ArrayList<>(); while (true) { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); // 省略判断代码 // 添加同步的key keys.add(key); // 计数 dataSize++; // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步 if (dataSize == partitionConfig.getBatchSyncKeyCount() (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { // 遍历所有集群节点,直接调用http进行同步 for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0); } // 记录本次同步时间 lastDispatchTime = System.currentTimeMillis(); // 计数清零 dataSize = 0; } } } }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

 

  同步少量的key解决方案: 只有积累到指定数量的key,才发起批量同步距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步 CP 方式同步的流程(RaftConsistencyServiceImpl)

  cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

  nacos使用raft协议来进行选举leader,来实现cp模式。

  同样进入到 RaftConsistencyServiceImpl的put方法

  

 @Override public void put(String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

 

  

// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; }

同样采用同样队列的方式,去处理本地注册列表

 

  

onPublish(datum, peers.local());public void onPublish(Datum datum, RaftPeer source) throws Exception { // 添加同步key任务到阻塞队列中 notifier.addTask(datum.key, ApplyAction.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }

遍历所有集群节点,发送http同步请求

 

  

 for (final String server : peers.allServersIncludeMyself()) { // 如果是leader,则不进行同步 if (isLeader(server)) { latch.countDown(); continue; } // 组装url 发送同步请求到其它集群节点 final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); }

到此,nacos服务注册及服务实例同步的主干源码已经分析完了。

 

  

总结

对于刚开始接触nacos源码的同学,可以先把头上的图多看几遍,然后对照着源码找到对应的位置 ,最后结合图再结合本文,整体连贯的看下来,相信会有很大收获的;虽然阅读源码的过程很痛苦,但是你只要坚持下来了,掌握到了阅读源码的技巧,你就会发现再难的源码,你都能把它啃下来;后面我会专门写一篇教你如何高效阅读源码的文章,希望对于刚接触源码的同学能有所帮助。

 

  到此这篇关于Nacos源码阅读方法的文章就介绍到这了,更多相关Nacos源码阅读内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: