手撕Nacos源码,今日撕服务户端源码(手撕svm)

  本篇文章为你整理了手撕Nacos源码,今日撕服务户端源码(手撕svm)的详细内容,包含有什么叫手撕代码 手撕svm 手撕算法是写伪代码吗 手撕cp 手撕Nacos源码,今日撕服务户端源码,希望能帮助你了解 手撕Nacos源码,今日撕服务户端源码。

  紧接上文,我们分析了Nacos的客户端代码,

  今天我们再来试一下服务端 ,至此就可以Nacos源码就告一段落,欢迎大家品鉴。

  nacos服务端

  注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳、选主等操作。

  我们来学习一下注册中心服务端接收客户端服务注册的功能。

  我们先来学习一下Nacos的工具类WebUtils,该工具类在nacos-core工程下,该工具类是用于处理请求参数转化的,里面提供了2个常被用到的方法required()和optional():

  

required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。

 

  optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。

  

 

  代码如下:

  

/**

 

   * required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。

  public static String required(final HttpServletRequest req, final String key) {

   String value = req.getParameter(key);

   if (StringUtils.isEmpty(value)) {

   throw new IllegalArgumentException("Param " + key + " is required.");

   String encoding = req.getParameter("encoding");

   return resolveValue(value, encoding);

   * optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。

  public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {

   if (!req.getParameterMap().containsKey(key) req.getParameterMap().get(key)[0] == null) {

   return defaultValue;

   String value = req.getParameter(key);

   if (StringUtils.isBlank(value)) {

   return defaultValue;

   String encoding = req.getParameter("encoding");

   return resolveValue(value, encoding);

  

 

  nacos 的 server 与 client使用了http协议来交互,那么在server端必定提供了http接口的入口,并且在core模块看到其依赖了spring boot starter,所以它的http接口由集成了Spring的web服务器支持,简单地说就是像我们平时写的业务服务一样,有controller层和service层。

  以OpenAPI作为入口来学习,我们找到/nacos/v1/ns/instance服务注册接口,在nacos-naming工程中我们可以看到InstanceController正是我们要找的对象,如下图:

  处理服务注册,我们直接找对应的POST方法即可,代码如下:

  

/**

 

   * Register new instance.

   * 接收客户端注册信息

   * @param request http request

   * @return ok if success

   * @throws Exception any error during register

  @CanDistro

  @PostMapping

  @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)

  public String register(HttpServletRequest request) throws Exception {

   //获取namespaceid,该参数是可选参数

   final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

   //获取服务名字

   final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

   //校验服务的名字,服务的名字格式为groupName@@serviceName

   NamingUtils.checkServiceNameFormat(serviceName);

   //创建实例

   final Instance instance = parseInstance(request);

   //注册服务

   serviceManager.registerInstance(namespaceId, serviceName, instance);

   return "ok";

  

 

  如上图,该方法主要用于接收客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到Nacos中,注册的方法如下:

  

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

 

   //判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否

   //存在该服务,如果不存在就创建空的服务

   //如果实例为空,则创建实例,并且会将创建的实例存入到serviceMap集合中

   createEmptyService(namespaceId, serviceName, instance.isEphemeral());

   //从serviceMap集合中获取创建的实例

   Service service = getService(namespaceId, serviceName);

   if (service == null) {

   throw new NacosException(NacosException.INVALID_PARAM,

   "service not found, namespace: " + namespaceId + ", service: " + serviceName);

   //服务注册,这一步才会把服务的实例信息和服务绑定起来

   addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

  

 

  注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起。

  Distro协议介绍

  Distro是阿里巴巴的私有协议, 是一种分布式一致性算法,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议:该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失 。

  Distro 协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。

  Distro 协议具有以下特点:

  

1:专门为了注册中心而创造出的协议;

 

  2:客户端与服务端有两个重要的交互,服务注册与心跳发送;

  3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;

  4:客户端请求失败则换一个节点重新发送请求;

  5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;

  6:每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;

  7:服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;

  8:服务端如果长时间未收到客户端心跳,则下线该服务;

  9:负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他节点;

  10:节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。

  

 

  Distro寻址

  Distro协议主要用于nacos 服务端节点之间的相互发现,nacos使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:

  

单机模式(StandaloneMemberLookup)

 

  文件模式(FileConfigMemberLookup)

  服务器模式(AddressServerMemberLookup)

  

 

  三种寻址模式如下图:

  1.2.3.1 单机模式

  在com.alibaba.nacos.core.cluster.lookup.LookupFactory中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式,如果是集群启动,

  

/**

 

   * Create the target addressing pattern.

   * 创建寻址模式

   * @param memberManager {@link ServerMemberManager}

   * @return {@link MemberLookup}

   * @throws NacosException NacosException

  public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {

   //NacosServer 集群方式启动

   if (!EnvUtil.getStandaloneMode()) {

   String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);

   //由参数中传入的寻址方式得到LookupType对象

   LookupType type = chooseLookup(lookupType);

   //选择寻址方式

   LOOK_UP = find(type);

   //设置当前寻址方式

   currentLookupType = type;

   } else {

   //NacosServer单机启动

   LOOK_UP = new StandaloneMemberLookup();

   LOOK_UP.injectMemberManager(memberManager);

   Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());

   return LOOK_UP;

  
private static MemberLookup find(LookupType type) {

   //文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,

   // 通过配置文件寻找其他节点,以达到和其他节点通信的目的

   if (LookupType.FILE_CONFIG.equals(type)) {

   LOOK_UP = new FileConfigMemberLookup();

   return LOOK_UP;

   //服务器模式

   if (LookupType.ADDRESS_SERVER.equals(type)) {

   LOOK_UP = new AddressServerMemberLookup();

   return LOOK_UP;

   // unpossible to run here

   throw new IllegalArgumentException();

  

 

  单节点寻址模式会直接创建StandaloneMemberLookup对象,而文件寻址模式会创建FileConfigMemberLookup对象,服务器寻址模式会创建AddressServerMemberLookup;

  1.2.3.2 文件寻址模式

  文件寻址模式主要在创建集群的时候,通过cluster.conf来配置集群,程序可以通过监听cluster.conf文件变化实现动态管理节点,FileConfigMemberLookup源码如下:

  

public class FileConfigMemberLookup extends AbstractMemberLookup {

 

   //创建文件监听器

   private FileWatcher watcher = new FileWatcher() {

   //文件发生变更事件

   @Override

   public void onChange(FileChangeEvent event) {

   readClusterConfFromDisk();

   //检查context是否包含cluster.conf

   @Override

   public boolean interest(String context) {

   return StringUtils.contains(context, "cluster.conf");

   @Override

   public void start() throws NacosException {

   if (start.compareAndSet(false, true)) {

   readClusterConfFromDisk();

   // 使用inotify机制来监视文件更改,并自动触发对cluster.conf的读取

   try {

   WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);

   } catch (Throwable e) {

   Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());

   @Override

   public void destroy() throws NacosException {

   WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);

   private void readClusterConfFromDisk() {

   Collection Member tmpMembers = new ArrayList ();

   try {

   List String tmp = EnvUtil.readClusterConf();

   tmpMembers = MemberUtil.readServerConf(tmp);

   } catch (Throwable e) {

   Loggers.CLUSTER

   .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());

   afterLookup(tmpMembers);

  

 

  1.2.3.3 服务器寻址模式

  使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;

  

public class AddressServerMemberLookup extends AbstractMemberLookup {

 

   private final GenericType RestResult String genericType = new GenericType RestResult String () {

   public String domainName;

   public String addressPort;

   public String addressUrl;

   public String envIdUrl;

   public String addressServerUrl;

   private volatile boolean isAddressServerHealth = true;

   private int addressServerFailCount = 0;

   private int maxFailCount = 12;

   private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);

   private volatile boolean shutdown = false;

   @Override

   public void start() throws NacosException {

   if (start.compareAndSet(false, true)) {

   this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));

   initAddressSys();

   run();

   /***

   * 获取服务器地址

   private void initAddressSys() {

   String envDomainName = System.getenv("address_server_domain");

   if (StringUtils.isBlank(envDomainName)) {

   domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");

   } else {

   domainName = envDomainName;

   String envAddressPort = System.getenv("address_server_port");

   if (StringUtils.isBlank(envAddressPort)) {

   addressPort = EnvUtil.getProperty("address.server.port", "8080");

   } else {

   addressPort = envAddressPort;

   String envAddressUrl = System.getenv("address_server_url");

   if (StringUtils.isBlank(envAddressUrl)) {

   addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");

   } else {

   addressUrl = envAddressUrl;

   addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;

   envIdUrl = "http://" + domainName + ":" + addressPort + "/env";

   Loggers.CORE.info("ServerListService address-server port:" + addressPort);

   Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);

   @SuppressWarnings("PMD.UndefineMagicConstantRule")

   private void run() throws NacosException {

   // With the address server, you need to perform a synchronous member node pull at startup

   // Repeat three times, successfully jump out

   boolean success = false;

   Throwable ex = null;

   int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);

   for (int i = 0; i maxRetry; i++) {

   try {

   //拉取集群节点信息

   syncFromAddressUrl();

   success = true;

   break;

   } catch (Throwable e) {

   ex = e;

   Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));

   if (!success) {

   throw new NacosException(NacosException.SERVER_ERROR, ex);

   //创建定时任务

   GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);

   @Override

   public void destroy() throws NacosException {

   shutdown = true;

   @Override

   public Map String, Object info() {

   Map String, Object info = new HashMap (4);

   info.put("addressServerHealth", isAddressServerHealth);

   info.put("addressServerUrl", addressServerUrl);

   info.put("envIdUrl", envIdUrl);

   info.put("addressServerFailCount", addressServerFailCount);

   return info;

   private void syncFromAddressUrl() throws Exception {

   RestResult String result = restTemplate

   .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());

   if (result.ok()) {

   isAddressServerHealth = true;

   Reader reader = new StringReader(result.getData());

   try {

   afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));

   } catch (Throwable e) {

   Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",

   ExceptionUtil.getAllExceptionMsg(e));

   addressServerFailCount = 0;

   } else {

   addressServerFailCount++;

   if (addressServerFailCount = maxFailCount) {

   isAddressServerHealth = false;

   Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());

   // 定时任务

   class AddressServerSyncTask implements Runnable {

   @Override

   public void run() {

   if (shutdown) {

   return;

   try {

   //拉取服务列表

   syncFromAddressUrl();

   } catch (Throwable ex) {

   addressServerFailCount++;

   if (addressServerFailCount = maxFailCount) {

   isAddressServerHealth = false;

   Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));

   } finally {

   GlobalExecutor.scheduleByCommon(this, 5_000L);

  

 

  Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据增加的时候,只同步增加的数据。

  全量同步流程比较复杂,流程如上图:

  

1:启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据

 

  2:调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据

  3:从namingProxy代理获取所有的数据data

  4:构造http请求,调用httpGet方法从指定的server获取数据

  5:从获取的结果result中获取数据bytes

  6:处理数据processData

  7:从data反序列化出datumMap

  8:把数据存储到dataStore,也就是本地缓存dataMap

  9:监听器不包括key,就创建一个空的service,并且绑定监听器

  10:监听器listener执行成功后,就更新data store

  

 

  在com.alibaba.nacos.core.distributed.distro.DistroProtocol的构造函数中调用startDistroTask()方法,该方法会执行startVerifyTask()和startLoadTask(),我们重点关注startLoadTask(),该方法代码如下:

  

/***

 

   * 启动DistroTask

  private void startDistroTask() {

   if (EnvUtil.getStandaloneMode()) {

   isInitialized = true;

   return;

   //启动startVerifyTask,做数据同步校验

   startVerifyTask();

   //启动DistroLoadDataTask,批量加载数据

   startLoadTask();

  //启动DistroLoadDataTask

  private void startLoadTask() {

   //处理状态回调对象

   DistroCallback loadCallback = new DistroCallback() {

   //处理成功

   @Override

   public void onSuccess() {

   isInitialized = true;

   //处理失败

   @Override

   public void onFailed(Throwable throwable) {

   isInitialized = false;

   //执行DistroLoadDataTask,是一个多线程

   GlobalExecutor.submitLoadDataTask(

   new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));

   * 启动startVerifyTask

   * 数据校验

  private void startVerifyTask() {

   GlobalExecutor.schedulePartitionDataTimedSync(

   new DistroVerifyTask(

   memberManager,

   distroComponentHolder),

   distroConfig.getVerifyIntervalMillis());

  

 

  数据如何执行加载

  上面方法会调用DistroLoadDataTask对象,而该对象其实是个线程,因此会执行它的run方法,run方法会调用load()方法实现数据全量加载,代码如下:

  

/***

 

   * 数据加载过程

  @Override

  public void run() {

   try {

   //加载数据

   load();

   if (!checkCompleted()) {

   GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());

   } else {

   loadCallback.onSuccess();

   Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");

   } catch (Exception e) {

   loadCallback.onFailed(e);

   Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);

  
private void load() throws Exception {

   while (memberManager.allMembersWithoutSelf().isEmpty()) {

   Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");

   TimeUnit.SECONDS.sleep(1);

   while (distroComponentHolder.getDataStorageTypes().isEmpty()) {

   Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");

   TimeUnit.SECONDS.sleep(1);

   //同步数据

   for (String each : distroComponentHolder.getDataStorageTypes()) {

   if (!loadCompletedMap.containsKey(each) !loadCompletedMap.get(each)) {

   //从远程机器上同步所有数据

   loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));

  

 

  数据同步会通过Http请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:

  

1:loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步到本机

 

  2:transportAgent.getDatumSnapshot()远程加载数据,通过Http请求执行远程加载

  3:dataProcessor.processSnapshot()处理数据同步到本地

  

 

  数据处理完整逻辑代码如下:loadAllDataSnapshotFromRemote()方法

  

/***

 

   * 从远程机器上同步所有数据

  private boolean loadAllDataSnapshotFromRemote(String resourceType) {

   DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);

   DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);

   if (null == transportAgent null == dataProcessor) {

   Loggers.DISTRO.warn("[DISTRO-INIT] Cant find component for type {}, transportAgent: {}, dataProcessor: {}",

   resourceType, transportAgent, dataProcessor);

   return false;

   //遍历集群成员节点,不包括自己

   for (Member each : memberManager.allMembersWithoutSelf()) {

   try {

   Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());

   //从远程节点加载数据,调用http请求接口: distro/datums;

   DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());

   //处理数据

   boolean result = dataProcessor.processSnapshot(distroData);

   Loggers.DISTRO

   .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),

   result);

   if (result) {

   return true;

   } catch (Exception e) {

   Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);

   return false;

  

 

  远程加载数据代码如下:transportAgent.getDatumSnapshot()方法

  

/***

 

   * 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;

   * @param targetServer target server.

   * @return

  @Override

  public DistroData getDatumSnapshot(String targetServer) {

   try {

   //从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;

   byte[] allDatum = NamingProxy.getAllData(targetServer);

   //将数据封装成DistroData

   return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);

   } catch (Exception e) {

   throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);

  
//组装URL,并执行HttpGet请求,获取结果集

   RestResult String result = HttpClient.httpGet(

   "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,

   new ArrayList (), params);

   //返回数据

   if (result.ok()) {

   return result.getData().getBytes();

   throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()

   + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "

   + result.getMessage());

  

 

  处理数据同步到本地代码如下:dataProcessor.processSnapshot()

  

/**

 

   * 数据处理并更新本地缓存

   * @param data

   * @return

   * @throws Exception

  private boolean processData(byte[] data) throws Exception {

   if (data.length 0) {

   //从data反序列化出datumMap

   Map String, Datum Instances datumMap = serializer.deserializeMap(data, Instances.class);

   // 把数据存储到dataStore,也就是本地缓存dataMap

   for (Map.Entry String, Datum Instances entry : datumMap.entrySet()) {

   dataStore.put(entry.getKey(), entry.getValue());

   //监听器不包括key,就创建一个空的service,并且绑定监听器

   if (!listeners.containsKey(entry.getKey())) {

   // pretty sure the service not exist:

   if (switchDomain.isDefaultInstanceEphemeral()) {

   // create empty service

   //创建一个空的service

   Loggers.DISTRO.info("creating service {}", entry.getKey());

   Service service = new Service();

   String serviceName = KeyBuilder.getServiceName(entry.getKey());

   String namespaceId = KeyBuilder.getNamespace(entry.getKey());

   service.setName(serviceName);

   service.setNamespaceId(namespaceId);

   service.setGroupName(Constants.DEFAULT_GROUP);

   // now validate the service. if failed, exception will be thrown

   service.setLastModifiedMillis(System.currentTimeMillis());

   service.recalculateChecksum();

   // The Listener corresponding to the key value must not be empty

   // 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager

   RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();

   if (Objects.isNull(listener)) {

   return false;

   //为空的绑定监听器

   listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);

   //循环所有datumMap

   for (Map.Entry String, Datum Instances entry : datumMap.entrySet()) {

   if (!listeners.containsKey(entry.getKey())) {

   // Should not happen:

   Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());

   continue;

   try {

   //执行监听器的onChange监听方法

   for (RecordListener listener : listeners.get(entry.getKey())) {

   listener.onChange(entry.getKey(), entry.getValue().value);

   } catch (Exception e) {

   Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);

   continue;

   // Update data store if listener executed successfully:

   // 监听器listener执行成功后,就更新dataStore

   dataStore.put(entry.getKey(), entry.getValue());

   return true;

  

 

  到此实现数据全量同步,其实全量同步最终封装的协议还是Http。

  新增数据使用异步广播同步:

  

1:DistroProtocol 使用 sync() 方法接收增量数据

 

  2:向其他节点发布广播任务

   调用 distroTaskEngineHolder 发布延迟任务

  3:调用 DistroDelayTaskProcessor.process() 方法进行任务投递:将延迟任务转换为异步变更任务

  4:执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送消息

   调用 DistroHttpAgent.syncData() 方法发送数据

   调用 NamingProxy.syncData() 方法发送数据

  5:异常任务调用 handleFailedTask() 方法进行处理

   调用 DistroFailedTaskHandler 处理失败任务

   调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新投递成延迟任务。

  

 

  增量数据入口

  我们回到服务注册,服务注册的InstanceController.register()就是数据入口,它会调用ServiceManager.registerInstance(),执行数据同步的时候,调用addInstance(),在该方法中会执行DistroConsistencyServiceImpl.put(),该方法是增量同步的入口,会调用distroProtocol.sync()方法,代码如下:

  

/***

 

   * 数据保存

   * @param key key of data, this key should be globally unique

   * @param value value of data

   * @throws NacosException

  @Override

  public void put(String key, Record value) throws NacosException {

   //将数据存入到dataStore中

   onPut(key, value);

   //使用distroProtocol同步数据

   distroProtocol.sync(

   new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),

   DataOperation.CHANGE,

   globalConfig.getTaskDispatchPeriod() / 2);

  

 

  sync()方法会执行任务发布,代码如下:

  

public void sync(DistroKey distroKey, DataOperation action, long delay) {

 

   //向除了自己外的所有节点广播

   for (Member each : memberManager.allMembersWithoutSelf()) {

   DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),

   each.getAddress());

   DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);

   //从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来

   //执行延时任务发布

   distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);

   if (Loggers.DISTRO.isDebugEnabled()) {

   Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());

  

 

  增量同步操作

  延迟任务对象我们可以从DistroTaskEngineHolder构造函数中得知是DistroDelayTaskProcessor,代码如下:

  

/***

 

   * 构造函数指定任务处理器

   * @param distroComponentHolder

  public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {

   DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);

   //指定任务处理器defaultDelayTaskProcessor

   delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);

  

 

  它延迟执行的时候会执行process方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:

  

/***

 

   * 任务处理过程

   * @param task task.

   * @return

  @Override

  public boolean process(NacosTask task) {

   if (!(task instanceof DistroDelayTask)) {

   return true;

   DistroDelayTask distroDelayTask = (DistroDelayTask) task;

   DistroKey distroKey = distroDelayTask.getDistroKey();

   if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {

   //将延迟任务变更成异步任务,异步任务对象是一个线程

   DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);

   //将任务添加到NacosExecuteTaskExecuteEngine中,并执行

   distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);

   return true;

   return false;

  

 

  DistroSyncChangeTask实质上是任务的开始,它自身是一个线程,所以会执行它的run方法,而run方法这是数据同步操作,代码如下:

  

/***

 

   * 执行数据同步

  @Override

  public void run() {

   Loggers.DISTRO.info("[DISTRO-START] {}", toString());

   try {

   //获取本地缓存数据

   String type = getDistroKey().getResourceType();

   DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());

   distroData.setType(DataOperation.CHANGE);

   //向其他节点同步数据

   boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());

   if (!result) {

   handleFailedTask();

   Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);

   } catch (Exception e) {

   Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);

   handleFailedTask();

  

 

  数据同步会执行调用syncData,该方法其实就是通过Http协议将数据发送到其他节点实现数据同步,代码如下:

  

/***

 

   * 向其他节点同步数据

   * @param data data

   * @param targetServer target server

   * @return

  @Override

  public boolean syncData(DistroData data, String targetServer) {

   if (!memberManager.hasMember(targetServer)) {

   return true;

   //获取数据字节数组

   byte[] dataContent = data.getContent();

   //通过Http协议同步数据

   return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());

  

 

  最后:一定要跟着讲师所给的源码自行走一遍!!!

  本文由传智教育博学谷 - 狂野架构师教研团队发布
 

  如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
 

  转载请注明出处!

  以上就是手撕Nacos源码,今日撕服务户端源码(手撕svm)的详细内容,想要了解更多 手撕Nacos源码,今日撕服务户端源码的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: