nacos注册中心单节点ap架构源码解析(nacos注册中心配置)

  本篇文章为你整理了nacos注册中心单节点ap架构源码解析(nacos注册中心配置)的详细内容,包含有nacos注册中心原理详解 nacos注册中心配置 nacos注册中心优点 nacos 注册中心 nacos注册中心单节点ap架构源码解析,希望能帮助你了解 nacos注册中心单节点ap架构源码解析。

  单nacos节点流程图如下:

  流程图可以知,Nacos注册流程包括客户端的服务注册、服务实例列表拉取、定时心跳任务;以及服务端的定时检查服务实例任务、服务实例更新推送5个功能。

  服务注册:当客户端启动的时候会根据当前微服务的配置信息把微服务注册到nacos服务端。

  服务实例列表拉取:当客户端启动的时候从nacos服务端获取当前服务的名称已经注册的实例数据,并把这些实例数据缓存在客户端的serviceInfoMap 对象中。

  定时心跳任务:当客户端向nacos服务注册临时实例对象的时候,会创建一个延期的任务去往服务端发送心跳信息。如果发送心跳信息成功,则又会创建一个延期任务往服务端注册心跳信息,一直重复该逻辑。nacos服务端接收到客户端的心跳信息就是更新客户端实例的最后心跳时间。该时间用来判断实例是否健康和是否需要删除。

  定时检查服务实例任务:nacos服务端在创建空服务对象的时候会通过线程池来定时执行检查服务,其主要逻辑为判断当前时间和最后心跳时间之差是否大于健康超时时间和删除实例超时时间,如果大于,则更新实例的健康状态和删除当前实例。定时执行的规则为5秒之后执行检查,并且每次执行完检查之后,5秒之后再次执行检查。

  服务实例更新推送:当有客户端更新实例对象时,服务端会先获取该客户端的服务名称下所有已经注册的客户端实例,并会针每一个客户端发送一个更新serviceinfo的udp消息,客户端监听收到nacos服务端发送的udp数据后进行本地缓存的更新。

  二、客户端

  一、服务注册

  根据spring-cloud-starter-alibaba-nacos-discovery的spring.factories文件,找到服务注册启动配置类。

  spring.factories文件内容为如下,

  

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

 

   com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\

   com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\

   com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\

   com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\

   com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\

   com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\

   com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\

   com.alibaba.cloud.nacos.NacosServiceAutoConfiguration

  org.springframework.cloud.bootstrap.BootstrapConfiguration=\

   com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

  

 

  根据名称判断可以得出 NacosServiceRegistryAutoConfiguration 为服务注册启动配置类,源码如下

  

@Configuration(proxyBeanMethods = false)

 

  @EnableConfigurationProperties

  @ConditionalOnNacosDiscoveryEnabled

  @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",

   matchIfMissing = true)

  @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,

   AutoServiceRegistrationAutoConfiguration.class,

   NacosDiscoveryAutoConfiguration.class })

  public class NacosServiceRegistryAutoConfiguration {

   @Bean

   public NacosServiceRegistry nacosServiceRegistry(

   NacosDiscoveryProperties nacosDiscoveryProperties) {

   return new NacosServiceRegistry(nacosDiscoveryProperties);

   @Bean

   @ConditionalOnBean(AutoServiceRegistrationProperties.class)

   public NacosRegistration nacosRegistration(

   ObjectProvider List NacosRegistrationCustomizer registrationCustomizers,

   NacosDiscoveryProperties nacosDiscoveryProperties,

   ApplicationContext context) {

   return new NacosRegistration(registrationCustomizers.getIfAvailable(),

   nacosDiscoveryProperties, context);

   @Bean

   @ConditionalOnBean(AutoServiceRegistrationProperties.class)

   public NacosAutoServiceRegistration nacosAutoServiceRegistration(

   NacosServiceRegistry registry,

   AutoServiceRegistrationProperties autoServiceRegistrationProperties,

   NacosRegistration registration) {

   return new NacosAutoServiceRegistration(registry,

   autoServiceRegistrationProperties, registration);

  

 

  关键类 NacosAutoServiceRegistration 的类图结构如下

  上图可知,NacosAutoServiceRegistration 实现了 ApplicationListener接口,该监听器会在SpringBoot启动的时候会自动调用 onApplicationEvent方法,onApplicationEvent具体实现方法如下

  

public void onApplicationEvent(WebServerInitializedEvent event) {

 

   this.bind(event);

  @Deprecated

  public void bind(WebServerInitializedEvent event) {

   ApplicationContext context = event.getApplicationContext();

   if (!(context instanceof ConfigurableWebServerApplicationContext) !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {

   this.port.compareAndSet(0, event.getWebServer().getPort());

   // 具体的启动方法

   this.start();

  

 

  具体的启动方法this.start();方法的代码如下,

  

public void start() {

 

   if (!this.isEnabled()) {

   if (logger.isDebugEnabled()) {

   logger.debug("Discovery Lifecycle disabled. Not starting");

   } else {

   if (!this.running.get()) {

   this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));

   // 关键逻辑

   this.register();

   if (this.shouldRegisterManagement()) {

   this.registerManagement();

   this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));

   this.running.compareAndSet(false, true);

  

 

  关键逻辑为this.register();方法代码如下

  

protected void register() {

 

   if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {

   log.debug("Registration disabled.");

   return;

   if (this.registration.getPort() 0) {

   this.registration.setPort(getPort().get());

   super.register();

  

 

  关键逻辑为super.register();方法代码如下,

  

protected void register() {

 

   this.serviceRegistry.register(this.getRegistration());

  

 

  关键逻辑为this.serviceRegistry.register方法代码如下,

  

@Override

 

  public void register(Registration registration) {

   if (StringUtils.isEmpty(registration.getServiceId())) {

   log.warn("No service to register for nacos client...");

   return;

   // 根据配置属性构建NamingService对象

   NamingService namingService = namingService();

   // 获取服务名,默认为 ${spring.application.name}

   String serviceId = registration.getServiceId();

   // 获取组名 ,默认为 DEFAULT_GROUP

   String group = nacosDiscoveryProperties.getGroup();

   // 创建注册实例

   Instance instance = getNacosInstanceFromRegistration(registration);

   try {

   // 发起注册

   namingService.registerInstance(serviceId, group, instance);

   log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,

   instance.getIp(), instance.getPort());

   catch (Exception e) {

   log.error("nacos registry, {} register failed...{},", serviceId,

   registration.toString(), e);

   // rethrow a RuntimeException if the registration is failed.

   // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132

   rethrowRuntimeException(e);

  

 

  先通过getNacosInstanceFromRegistration方法创建实例对象,getNacosInstanceFromRegistration代码如下,

  

private Instance getNacosInstanceFromRegistration(Registration registration) {

 

   Instance instance = new Instance();

   // 获取服务ip

   instance.setIp(registration.getHost());

   // 获取服务

   instance.setPort(registration.getPort());

   // 获取权重

   instance.setWeight(nacosDiscoveryProperties.getWeight());

   // 获取集群名称

   instance.setClusterName(nacosDiscoveryProperties.getClusterName());

   instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());

   // 获取元数据

   instance.setMetadata(registration.getMetadata());

   // 获取是否为临时实例

   instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());

   return instance;

  

 

  然后通过namingService.registerInstance方法发起注册,registerInstance方法的代码如下,

  

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

 

   // 检查 实例是否合法

   // heart beat timeout must(默认15秒) heart beat interval (默认5秒)抛异常

   // ip delete timeout must(默认30 秒) heart beat interval(默认5秒)抛异常

   NamingUtils.checkInstanceIsLegal(instance);

   // 构建 groupName@@serviceName

   String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);

   // 如果是临时实例,则创建心跳信息,定时给nacos服务发送

   if (instance.isEphemeral()) {

   BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);

   this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);

   // 向 nacos-service 注册实例

   this.serverProxy.registerService(groupedServiceName, groupName, instance);

  

 

  先检查实例是否合法,然后构建服务名称,规则为groupName@@serviceName。通过this.serverProxy.registerService方法向 nacos-service 注册实例,代码如下,

  

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

 

   NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);

   final Map String, String params = new HashMap String, String (16);

   //设置 namespaceId

   params.put(CommonParams.NAMESPACE_ID, namespaceId);

   //设置 serviceName

   params.put(CommonParams.SERVICE_NAME, serviceName);

   //设置 groupName

   params.put(CommonParams.GROUP_NAME, groupName);

   //设置 clusterName

   params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());

   params.put("ip", instance.getIp());

   params.put("port", String.valueOf(instance.getPort()));

   params.put("weight", String.valueOf(instance.getWeight()));

   params.put("enable", String.valueOf(instance.isEnabled()));

   params.put("healthy", String.valueOf(instance.isHealthy()));

   params.put("ephemeral", String.valueOf(instance.isEphemeral()));

   params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));

   // 调用 nacos-service 的nacosUrlInstance接口注册实例

   reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

  

 

  通过向reqApi方法向nacos服务端注册当前实例数据,其实就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 发送POST请求。该请求地址对应的nacos服务端的源码的naming工程中InstanceController的register方法,代码如下,

  

public String register(HttpServletRequest request) throws Exception {

 

   final String namespaceId = WebUtils

   .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

   final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

   NamingUtils.checkServiceNameFormat(serviceName);

   //根据请求构建 Instance 对象

   final Instance instance = parseInstance(request);

   //注册 Instance 对象,serviceManager对象中保存了所有的服务对象。

   serviceManager.registerInstance(namespaceId, serviceName, instance);

   return "ok";

  

 

  先根据请求对象构建Instance对象,然后通过serviceManager.registerInstance方法用来注册Instance对象,registerInstance代码如下

  

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

 

   // 如果 namespaceId 为 key 的数据为空,则创建 service ,并初始化service

   createEmptyService(namespaceId, serviceName, instance.isEphemeral());

   // 获取 service 对象

   Service service = getService(namespaceId, serviceName);

   // 如果 service为空 则报错

   if (service == null) {

   throw new NacosException(NacosException.INVALID_PARAM,

   "service not found, namespace: " + namespaceId + ", service: " + serviceName);

   // 添加实例

   addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

  

 

  如果 namespaceId为key的数据为空,则创建 service,并初始化service。然后调用addInstance添加实例对象,addInstance方法代码如下,

  

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)

 

   throws NacosException {

   // 根据 命名空间 和 服务名称 构建 key

   String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

   // 获取 service

   Service service = getService(namespaceId, serviceName);

   // 同步锁

   synchronized (service) {

   // 获取服务下的实例集合(服务已有 + 新增的实例)

   List Instance instanceList = addIpAddresses(service, ephemeral, ips);

   Instances instances = new Instances();

   instances.setInstanceList(instanceList);

   // 根据KEY添加服务的实例

   consistencyService.put(key, instances);

  

 

  addIpAddresses方法中会调用updateIpAddresses方法,且action为 add。该方法根据action的值来获取该服务下的最新实例集合(新增实例或删除实例加上目前服务已有的实例数据合集)。如果action为add表示新增,则方法最后返回的集合对象中会把该服务中已有的实例集合加上新增的实例集合数据一起返回 ;如果action为 remove表示删除,则方法最后返回的集合对象中会把该服务中已有的实例集合删除掉需要删除的实例集合数据。后面通过调用consistencyService.put(key, instances)方法来把updateIpAddresses方法返回的值直接添加consistencyService的实例中。updateIpAddresses方法的代码如下,

  

public List Instance updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)

 

   throws NacosException {

   // 从本地缓存中获取服务的实例数据

   Datum datum = consistencyService

   .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

   // 获取 当前服务下所有的 实例

   List Instance currentIPs = service.allIPs(ephemeral);

   // 创建当前实例数据map

   Map String, Instance currentInstances = new HashMap (currentIPs.size());

   // 创建 当前实例Id set

   Set String currentInstanceIds = Sets.newHashSet();

   // 遍历当前服务的所有实例,添加到 创建当前实例数据 map 和 当前实例Id集合

   for (Instance instance : currentIPs) {

   currentInstances.put(instance.toIpAddr(), instance);

   currentInstanceIds.add(instance.getInstanceId());

   // 构造 实例集合对象的 map

   Map String, Instance instanceMap;

   // 如果有缓存数据

   if (datum != null null != datum.value) {

   // 从本地缓存中以及当前服务的内存数据获取最新服务的实例数据

   instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);

   // 如果没有缓存数据

   else {

   // 创建 instanceMap

   instanceMap = new HashMap (ips.length);

   // 遍历参数传过来的实例对象

   for (Instance instance : ips) {

   // 如果 service 不包括 实例的 ClusterName 则创建 实例 Cluster,并初始化

   if (!service.getClusterMap().containsKey(instance.getClusterName())) {

   Cluster cluster = new Cluster(instance.getClusterName(), service);

   cluster.init();

   service.getClusterMap().put(instance.getClusterName(), cluster);

   Loggers.SRV_LOG

   .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",

   instance.getClusterName(), instance.toJson());

   // 如果是删除,则从 instanceMap 中 删除 该实例

   if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {

   instanceMap.remove(instance.getDatumKey());

   // 如果是新增

   else {

   //获取已存在的 实例

   Instance oldInstance = instanceMap.get(instance.getDatumKey());

   if (oldInstance != null) {

   instance.setInstanceId(oldInstance.getInstanceId());

   } else {

   // 生成 实例 id

   instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));

   // instanceMap 添加instance实例

   instanceMap.put(instance.getDatumKey(), instance);

   // 如果集合小于0 ,并且是新增操作则抛异常

   if (instanceMap.size() = 0 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {

   throw new IllegalArgumentException(

   "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils

   .toJson(instanceMap.values()));

   // 返回 服务中最新的实例数据

   return new CopyOnWriteArrayList (instanceMap.values());

  

 

  通过updateIpAddresses方法拿到需要更新的实例集合对象后,再通过consistencyService.put(key, instances)把拿到的实例集合对象添加到实现了PersistentConsistencyServiceDelegateImpl或者EphemeralConsistencyService接口的实例对象中,consistencyService.put(key, instances)的源码如下,

  

@Override

 

  public void put(String key, Record value) throws NacosException {

   // 根据key获取具体的 consistencyService ,并且向其中添加具体的 key 和 value

   mapConsistencyService(key).put(key, value);

  

 

  根据key获取具体的 consistencyService ,并且向其中添加具体的 key 和 value。consistencyService中根据key获取集群的实例对象(临时服务对象EphemeralConsistencyService和持久服务对象PersistentConsistencyServiceDelegateImpl)

  

private ConsistencyService mapConsistencyService(String key) {

 

   // 根据key返回具体的服务对象

   return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;

  

 

  如果是注册的临时实例节点,这里取到的是实现了ephemeralConsistencyService接口的DistroConsistencyServiceImpl 对象,它的put源码如下:

  

@Override

 

  public void put(String key, Record value) throws NacosException {

   // 添加key 和 value

   onPut(key, value);

   distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,

   globalConfig.getTaskDispatchPeriod() / 2);

  

 

  通过onPut方法添加key 和 value,opPut方法的代码如下,

  

public void onPut(String key, Record value) {

 

   // 如果是临时节点实例,则创建 Datum 并保存在 dataStore 中

   if (KeyBuilder.matchEphemeralInstanceListKey(key)) {

   Datum Instances datum = new Datum ();

   datum.value = (Instances) value;

   datum.key = key;

   datum.timestamp.incrementAndGet();

   dataStore.put(key, datum);

   // 如果 监听对象不包括 key 则返回

   if (!listeners.containsKey(key)) {

   return;

   // 向notifier对象添加通知任务

   notifier.addTask(key, DataOperation.CHANGE);

  

 

  如果是临时实例节点,则创建 Datum 并保存在 dataStore 中,然后通过notifier.addTask用来向notifier对象添加通知任务,且操作类型为DataOperation.CHANGE,addTask方法的代码如下:

  

public void addTask(String datumKey, DataOperation action) {

 

   // 如果services包括了当前的 datumKey ,并且是修改操作 则直接返回

   if (services.containsKey(datumKey) action == DataOperation.CHANGE) {

   return;

   // 如果是修改操作,则向 services 添加 datumKey

   if (action == DataOperation.CHANGE) {

   services.put(datumKey, StringUtils.EMPTY);

   // 向 tasks中添加 Pair 对象

   tasks.offer(Pair.with(datumKey, action));

  

 

  以上代码的tasks是用来存放具体实例key和动作类型的对象,它是一个ArrayBlockingQueue对象,DistroConsistencyServiceImpl 对象的init方法代码如下,

  

@PostConstruct

 

  public void init() {

   GlobalExecutor.submitDistroNotifyTask(notifier);

  

 

  根据以上代码可知,在DistroConsistencyServiceImpl 实例对象初始化之后会往GlobalExecutor线程池对象中添加了一个notifier对象。notifier对象为一个实现了Runnable 的实例。上面的代码会执行notifier对象的run方法,notifier的run方法代码如下:

  

public void run() {

 

   Loggers.DISTRO.info("distro notifier started");

   // 死循环遍历

   for (; ; ) {

   try {

   // 获取 tasks的数据,如果没有数据会阻塞当前线程,直到tasks有数据为止。

   Pair String, DataOperation pair = tasks.take();

   // 处理数据

   handle(pair);

   } catch (Throwable e) {

   Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);

  

 

  上面是一个死循环,tasks.take()是一个阻塞式获取数据的方法,如果tasks没有数据则会阻塞当前线程直到tasks.take()拿到数据,拿到数据之后会调用handle方法处理,handle代码如下,

  

private void handle(Pair String, DataOperation pair) {

 

   try {

   String datumKey = pair.getValue0();

   DataOperation action = pair.getValue1();

   // 先从 services 中删除 key

   services.remove(datumKey);

   int count = 0;

   // 根据 key 获取 服务对象数据

   ConcurrentLinkedQueue RecordListener recordListeners = listeners.get(datumKey);

   if (recordListeners == null) {

   Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);

   return;

   for (RecordListener listener : recordListeners) {

   count++;

   try {

   // 如果是新增

   if (action == DataOperation.CHANGE) {

   Datum datum = dataStore.get(datumKey);

   if (datum != null) {

   // 更新 serivce 的实例数据

   listener.onChange(datumKey, datum.value);

   } else {

   Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);

   continue;

   // 如果是删除

   if (action == DataOperation.DELETE) {

   listener.onDelete(datumKey);

   continue;

   } catch (Throwable e) {

   Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);

   if (Loggers.DISTRO.isDebugEnabled()) {

   Loggers.DISTRO

   .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",

   datumKey, count, action.name());

   } catch (Throwable e) {

   Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);

  

 

  根据action 为 DataOperation.CHANGE,代码中执行的代码分支为listener.onChange(datumKey, datum.value),该方法的逻辑为修改服务的实例数据,源码如下

  

public void onChange(String key, Instances value) throws Exception {

 

   Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

   for (Instance instance : value.getInstanceList()) {

   if (instance == null) {

   // Reject this abnormal instance list:

   throw new RuntimeException("got null instance " + key);

   if (instance.getWeight() 10000.0D) {

   instance.setWeight(10000.0D);

   if (instance.getWeight() 0.01D instance.getWeight() 0.0D) {

   instance.setWeight(0.01D);

   // 更新 service 的 实例集合

   updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

   recalculateChecksum();

  

 

  以上代码先遍历所有的实例数据设置权值,再通过updateIPs方法更新服务实例,updateIPs方法的代码如下:

  

public void updateIPs(Collection Instance instances, boolean ephemeral) {

 

   // 根据 clusterMap 创建 ipMap对象

   Map String, List Instance ipMap = new HashMap (clusterMap.size());

   // 根据 clusterMap 初始化 ipMap对象

   for (String clusterName : clusterMap.keySet()) {

   ipMap.put(clusterName, new ArrayList ());

   // 遍历最新的实例集合数据

   for (Instance instance : instances) {

   try {

   if (instance == null) {

   Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");

   continue;

   // 如果集群名称为null ,则设置默认的集群名称 DEFAULT

   if (StringUtils.isEmpty(instance.getClusterName())) {

   instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);

   // 如果当前 service 的clusterMap不包括 实例的 集群名称,则需要创建新的集群对象

   if (!clusterMap.containsKey(instance.getClusterName())) {

   Loggers.SRV_LOG

   .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",

   instance.getClusterName(), instance.toJson());

   Cluster cluster = new Cluster(instance.getClusterName(), this);

   cluster.init();

   getClusterMap().put(instance.getClusterName(), cluster);

   // 如果当前 ipMap 不包括 当前实例的 集群名称,则需要创建新的集群对象

   List Instance clusterIPs = ipMap.get(instance.getClusterName());

   if (clusterIPs == null) {

   clusterIPs = new LinkedList ();

   ipMap.put(instance.getClusterName(), clusterIPs);

   // 给当前的 集群对象赋值 实例数据。

   clusterIPs.add(instance);

   } catch (Exception e) {

   Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);

   // 遍历 ipMap对象,给 clusterMap 替换最新的 entryIPs

   for (Map.Entry String, List Instance entry : ipMap.entrySet()) {

   //make every ip mine

   List Instance entryIPs = entry.getValue();

   // 给 clusterMap 替换最新的 entryIPs

   clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);

   setLastModifiedMillis(System.currentTimeMillis());

   // 发布

   getPushService().serviceChanged(this);

   StringBuilder stringBuilder = new StringBuilder();

   for (Instance instance : allIPs()) {

   stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");

   Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),

   stringBuilder.toString());

  

 

  以上代码先根据当前服务下的集群信息构造构造ipMap对象,然后遍历最新的实例集合数据更新ipMap对象,最后循环调用clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral)方法来更新当前集群中的实例列表数据。updateIps方法代码如下:

  

public void updateIps(List Instance ips, boolean ephemeral) {

 

   // 获取 本集群中的 实例集合

   Set Instance toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

   // 根据old的实例数据 构建 hashmap

   HashMap String, Instance oldIpMap = new HashMap (toUpdateInstances.size());

   // 根据实例的 key 添加到 oldIpMap中

   for (Instance ip : toUpdateInstances) {

   oldIpMap.put(ip.getDatumKey(), ip);

   // 获取更新的 实例数据 List

   List Instance updatedIPs = updatedIps(ips, oldIpMap.values());

   if (updatedIPs.size() 0) {

   for (Instance ip : updatedIPs) {

   Instance oldIP = oldIpMap.get(ip.getDatumKey());

   // do not update the ip validation status of updated ips

   // because the checker has the most precise result

   // Only when ip is not marked, dont we update the health status of IP:

   if (!ip.isMarked()) {

   ip.setHealthy(oldIP.isHealthy());

   if (ip.isHealthy() != oldIP.isHealthy()) {

   // ip validation status updated

   Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),

   (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());

   if (ip.getWeight() != oldIP.getWeight()) {

   // ip validation status updated

   Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}- {}", getService().getName(), oldIP.toString(),

   ip.toString());

   // 获取新增的 实例数据

   List Instance newIPs = subtract(ips, oldIpMap.values());

   if (newIPs.size() 0) {

   Loggers.EVT_LOG

   .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),

   getName(), newIPs.size(), newIPs.toString());

   for (Instance ip : newIPs) {

   HealthCheckStatus.reset(ip);

   // 获取删除的 实例数据

   List Instance deadIPs = subtract(oldIpMap.values(), ips);

   if (deadIPs.size() 0) {

   Loggers.EVT_LOG

   .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),

   getName(), deadIPs.size(), deadIPs.toString());

   for (Instance ip : deadIPs) {

   HealthCheckStatus.remv(ip);

   // 根据传进来的 实例集合 创建需要更新的实例set

   toUpdateInstances = new HashSet (ips);

   // 直接替换

   if (ephemeral) {

   ephemeralInstances = toUpdateInstances;

   } else {

   persistentInstances = toUpdateInstances;

  

 

  以上代码就是更新cluster对象下的实例数据逻辑,根据代码可知在cluster对象中更新实例数据就是拿传进来的实例列表创建set集合直接替换的。

  二、服务实例列表拉取

  客户端程序启动之后,会执行com.alibaba.cloud.nacos.discovery.NacosWatch类的start()方法,此方法中会执行以下语句,

  

namingService.subscribe(properties.getService(), properties.getGroup(),

 

   Arrays.asList(properties.getClusterName()), eventListener);

  

 

  此方法用来获取当前服务的实例数据,subscribe方法代码如下,

  

public void subscribe(String serviceName, String groupName, List String clusters, EventListener listener)

 

   throws NacosException {

   // 获取服务列表数据

   hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),

   listener);

  

 

  通过hostReactor.subscribe方法获取服务列表数据,subscribe方法的代码如下,

  

public void subscribe(String serviceName, String clusters, EventListener eventListener) {

 

   notifier.registerListener(serviceName, clusters, eventListener);

   // 获取服务列表数据

   getServiceInfo(serviceName, clusters);

  

 

  通过getServiceInfo方法获取服务列表数据,getServiceInfo的代码如下:

  

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());

 

  String key = ServiceInfo.getKey(serviceName, clusters);

  if (failoverReactor.isFailoverSwitch()) {

   return failoverReactor.getService(key);

  // 根据服务名称和集群名称获取本地的服务列表数据

  ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

  if (null == serviceObj) {

   serviceObj = new ServiceInfo(serviceName, clusters);

   serviceInfoMap.put(serviceObj.getKey(), serviceObj);

   updatingMap.put(serviceName, new Object());

   // 如果本地服务实例数据为null,则去获取最新的服务实例列表

   updateServiceNow(serviceName, clusters);

   updatingMap.remove(serviceName);

  } else if (updatingMap.containsKey(serviceName)) {

   if (UPDATE_HOLD_INTERVAL 0) {

   // hold a moment waiting for update finish

   synchronized (serviceObj) {

   try {

   serviceObj.wait(UPDATE_HOLD_INTERVAL);

   } catch (InterruptedException e) {

   NAMING_LOGGER

   .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);

  scheduleUpdateIfAbsent(serviceName, clusters);

  return serviceInfoMap.get(serviceObj.getKey());

  

 

  以上代码可知,会根据服务名称和clusters名称获取本地缓存serviceInfoMap对象中的服务列表数据。如果本地服务实例数据为null,则通过updateServiceNow方法去nacos服务端获取最新的服务实例列表。updateServiceNow方法代码如下:

  

try {

 

   // 更新本地服务方法

   updateService(serviceName, clusters);

  } catch (NacosException e) {

   NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);

  

 

  updateService的代码如下:

  

public void updateService(String serviceName, String clusters) throws NacosException {

 

   ServiceInfo oldService = getServiceInfo0(serviceName, clusters);

   try {

   // 调用服务代理类获取服务实例列表,pushReceiver.getUdpPort()会随机生成一个udp端口

   String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

   if (StringUtils.isNotEmpty(result)) {

   // 如果 result不为空,则向本地缓存 serviceInfoMap 添加服务实例列表

   processServiceJson(result);

   } finally {

   if (oldService != null) {

   synchronized (oldService) {

   oldService.notifyAll();

  

 

  通过调用服务代理类serverProxy的queryList方法获取服务实例列。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: