Spring Cloud GateWay基于nacos如何去做灰度发布(springboot 灰度发布)

  本篇文章为你整理了Spring Cloud GateWay基于nacos如何去做灰度发布(springboot 灰度发布)的详细内容,包含有springcloud灰度发布功能实现 springboot 灰度发布 spring cloud nacos gateway 灰度发布数据库如何处理 Spring Cloud GateWay基于nacos如何去做灰度发布,希望能帮助你了解 Spring Cloud GateWay基于nacos如何去做灰度发布。

  如果想直接查看修改部分请跳转 动手-点击跳转

  本文基于 ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer

  灰度发布

  灰度发布,又称为金丝雀发布,是一种新旧版本平滑发布的方式。在上面可以对同一个API进行两个版本 的内容,由一部分用户先行体验,如无问题,逐步扩大发布范围
 

  本文将讲述如何基于基于nacos的matedata与Ribbon如何去做灰度发布

  重点知识

  Spring Cloud Gateway两种负载均衡器

  官网说明两种负载均衡器
 

  Gateway有两种客户端负载均衡器,LoadBalancerClientFilter和ReactiveLoadBalancerClientFilter。
 

  LoadBalancerClientFilter使用一个Ribbon的阻塞式LoadBalancerClient,Gateway建议使用ReactiveLoadBalancerClientFilter。
 

  可以通过设置spring.cloud.loadbalancer.ribbon.enabled=false,切换到ReactiveLoadBalancerClientFilter。无论使用Ribbon还是LoadBalancer,在Route中配置的lb是一样的

  本节采用 ReactiveLoadBalancerClientFilter 进行设置

  采用ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer

  灰度发布服务器选择 简单示意图

  Client —- gateway —- GlobalFilter 拦截 选择一个灰度发布服务器 如果没有灰度服务则选取正常服务器 —- 转发到服务

  nacos的matedata

  我们在向 Nacos Server 进行服务注册的时候往往会附加一些 metadata ,可以参考官方文档中 Dubbo 融合 Nacos 成为注册中心 章节。
 

  充分利用好服务实例的 metadata ,可以衍生出许多有意思的实践。
 

  完全可以把相关内容放进 metadata 中,好比说版本号,特性名等等

  然后再根据负载均衡路由到不同的服务

  

1
2
spring.cloud.nacos.discovery.metadata.version=1.15
spring.cloud.nacos.discovery.metadata.advance=true

 

  准备工作

  nacos 部署
 

  gateway 部署 -可以参考
 

  部署两台服务A

  开始

  跟踪代码

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Mono Void filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
 String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
 if (url != null ("lb".equals(url.getScheme()) "lb".equals(schemePrefix))) {
 ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
 if (log.isTraceEnabled()) {
 log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
 }
 return this.choose(exchange).doOnNext((response) - {
 if (!response.hasServer()) {
 throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
 } else {
 ServiceInstance retrievedInstance = (ServiceInstance)response.getServer();
 URI uri = exchange.getRequest().getURI();
 String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
 if (schemePrefix != null) {
 overrideScheme = url.getScheme();
 }
 DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
 URI requestUrl = this.reconstructURI(serviceInstance, uri);
 if (log.isTraceEnabled()) {
 log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
 }
 exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
 }
 }).then(chain.filter(exchange));
 } else {
 return chain.filter(exchange);
 }
}
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
 return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
private Mono Response ServiceInstance choose(ServerWebExchange exchange) {
 URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
 ReactorLoadBalancer ServiceInstance loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);
 if (loadBalancer == null) {
 throw new NotFoundException("No loadbalancer available for " + uri.getHost());
 } else {
 return loadBalancer.choose(this.createRequest());
 }
}

 

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SuppressWarnings("rawtypes")
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono Response ServiceInstance choose(Request request) {
 // TODO: move supplier to Request?
 // Temporary conditional logic till deprecated members are removed.
 if (serviceInstanceListSupplierProvider != null) {
 ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
 .getIfAvailable(NoopServiceInstanceListSupplier::new);
 return supplier.get().next().map(this::getInstanceResponse);
 }
 ServiceInstanceSupplier supplier = this.serviceInstanceSupplier
 .getIfAvailable(NoopServiceInstanceSupplier::new);
 return supplier.get().collectList().map(this::getInstanceResponse);
}
private Response ServiceInstance getInstanceResponse(
 List ServiceInstance instances) {
 if (instances.isEmpty()) {
 log.warn("No servers available for service: " + this.serviceId);
 return new EmptyResponse();
 }
 // TODO: enforce order?
 int pos = Math.abs(this.position.incrementAndGet());
 ServiceInstance instance = instances.get(pos % instances.size());
 return new DefaultResponse(instance);
}

 

  通过代码跟踪 ReactiveLoadBalancerClientFilter 与 RoundRobinLoadBalancer 可以发现,最终 我们只需要对 getInstanceResponse 进行改造 即可满足所有需要

  动手!

  开始修改代码

  我们只需要新增一个 GlobalFilter 在 AdvanceReactiveLoadBalancerClientFilter 执行之前 ,并且对LoadBalancer 的getInstanceResponse 做一下稍微改造就OK了

  复制 RoundRobinLoadBalancer 内容 并修改 getInstanceResponse() 逻辑

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package top.lingma.gateway.loadbalancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.loadbalancer.core.*;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class AdvanceRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
 private static final Log log = LogFactory.getLog(AdvanceRoundRobinLoadBalancer.class);
 private final AtomicInteger position;
 private final AtomicInteger positionAdvance;
 @Deprecated
 private ObjectProvider ServiceInstanceSupplier serviceInstanceSupplier;
 private ObjectProvider ServiceInstanceListSupplier serviceInstanceListSupplierProvider;
 private final String serviceId;
 @Deprecated
 public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider ServiceInstanceSupplier serviceInstanceSupplier) {
 this(serviceId, serviceInstanceSupplier, new Random().nextInt(1000));
 }
 public AdvanceRoundRobinLoadBalancer(ObjectProvider ServiceInstanceListSupplier serviceInstanceListSupplierProvider, String serviceId) {
 this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
 }
 public AdvanceRoundRobinLoadBalancer(ObjectProvider ServiceInstanceListSupplier serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
 this.serviceId = serviceId;
 this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
 this.position = new AtomicInteger(seedPosition);
 this.positionAdvance = new AtomicInteger(seedPosition);
 }
 @Deprecated
 public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider ServiceInstanceSupplier serviceInstanceSupplier, int seedPosition) {
 this.serviceId = serviceId;
 this.serviceInstanceSupplier = serviceInstanceSupplier;
 this.position = new AtomicInteger(seedPosition);
 this.positionAdvance = new AtomicInteger(seedPosition);
 }
 @Override
 public Mono Response ServiceInstance choose(Request request) {
 if (serviceInstanceListSupplierProvider != null) {
 ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
 return supplier.get().next().map((instances) - {
 // 此处做了选择逻辑的修改
 if (request instanceof AdvanceRequestContext) {
 List ServiceInstance advanceInstance = instances.stream().filter(s - s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
 return getInstanceResponse(advanceInstance, request);
 } else {
 List ServiceInstance routineInstance = instances.stream().filter(s - !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
 return getInstanceResponse(routineInstance, request);
 }
 });
 }
 ServiceInstanceSupplier supplier = this.serviceInstanceSupplier.getIfAvailable(NoopServiceInstanceSupplier::new);
 return supplier.get().collectList().map((instances) - {
 if (request instanceof AdvanceRequestContext) {
 // 此处做了选择逻辑的修改
 List ServiceInstance advanceInstance = instances.stream().filter(s - s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
 return getInstanceResponse(advanceInstance, request);
 } else {
 List ServiceInstance instance = instances.stream().filter(s - !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
 return getInstanceResponse(instance, request);
 }
 });
 }
 private Response ServiceInstance getInstanceResponse(List ServiceInstance instances, Request request) {
 if (instances.isEmpty()) {
 if (request instanceof AdvanceRequestContext) {
 return new AdvanceEmptyResponse();
 }
 log.warn("No servers available for service: " + this.serviceId);
 return new EmptyResponse();
 }
 int pos = 1;
 //灰度发布选择逻辑
 if (request instanceof AdvanceRequestContext) {
 pos = Math.abs(this.positionAdvance.incrementAndGet());
 } else {
 pos = Math.abs(this.position.incrementAndGet());
 }
 ServiceInstance instance = instances.get(pos % instances.size());
 return new DefaultResponse(instance);
 }
}

 

  AdvanceEmptyResponse 类是为了标识无灰度发布服务器,此时可以走正常服务器

  

1
2
3
4
5
6
7
8
9
10
11
12
13
package top.lingma.gateway.loadbalancer;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.CompletionContext;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
public class AdvanceEmptyResponse extends org.springframework.cloud.client.loadbalancer.EmptyResponse implements Response ServiceInstance {
 public AdvanceEmptyResponse() {
 }
 public void onComplete(CompletionContext completionContext) {
 }
}

 

  AdvanceRequestContext 是为了能从 GlobalFilter 传递信息到 LoadBalancer

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package top.lingma.gateway.loadbalancer;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.web.server.ServerWebExchange;
public class AdvanceRequestContext T implements Request {
 private T exchange;
 public AdvanceRequestContext(T exchange) {
 this.exchange = exchange;
 }
 @Override
 public T getContext() {
 return exchange;
 }
}

 

  AdvanceReactiveLoadBalancerClientFilter 复制于 ReactiveLoadBalancerClientFilter

  注意两点
 

  第一灰度服务器选择在ReactiveLoadBalancerClientFilter 之前 LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1;

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package top.lingma.gateway.loadbalancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.util.List;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
@Component
public class AdvanceReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
 private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
 private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1;
 private final LoadBalancerClientFactory clientFactory;
 private LoadBalancerProperties properties;
 public AdvanceReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
 this.clientFactory = clientFactory;
 this.properties = properties;
 }
 @Override
 public int getOrder() {
 return LOAD_BALANCER_CLIENT_FILTER_ORDER;
 }
 @Override
 @SuppressWarnings("Duplicates")
 public Mono Void filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 // 灰度用户专属服务器 判定是否是灰度用户,是否拥有灰度权限 不然直接进行下一步
 List String secChUa = exchange.getRequest().getHeaders().get("sec-ch-ua");
 if (secChUa == null secChUa.isEmpty() !secChUa.stream().findFirst().map(r - r.contains("Edge")).orElse(false)) {
 return chain.filter(exchange);
 }
 URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
 String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
 if (url == null (!"lb".equals(url.getScheme()) !"lb".equals(schemePrefix))) {
 return chain.filter(exchange);
 }
 // preserve the original url
 addOriginalRequestUrl(exchange, url);
 if (log.isTraceEnabled()) {
 log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
 }
 return choose(exchange).doOnNext(response - {
 if (response instanceof AdvanceEmptyResponse) {
 return;
 }
 if (!response.hasServer()) {
 throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
 }
 ServiceInstance retrievedInstance = response.getServer();
 URI uri = exchange.getRequest().getURI();
 // if the `lb: scheme ` mechanism was used, use ` scheme ` as the default,
 // if the loadbalancer doesnt provide one.
 String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
 if (schemePrefix != null) {
 overrideScheme = url.getScheme();
 }
 DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
 URI requestUrl = reconstructURI(serviceInstance, uri);
 if (log.isTraceEnabled()) {
 log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
 }
 exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
 }).then(chain.filter(exchange));
 }
 protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
 return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
 }
 @SuppressWarnings("deprecation")
 private Mono Response ServiceInstance choose(ServerWebExchange exchange) {
 URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
 ReactorLoadBalancer ServiceInstance loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);
 if (loadBalancer == null) {
 throw new NotFoundException("No loadbalancer available for " + uri.getHost());
 }
 return loadBalancer.choose(createRequest(exchange));
 }
 /***
 * 此处进行了改造 传入了内容 方便后续 LoadBalancer 处理信息
 * @param exchange
 * @return
 */
 @SuppressWarnings("deprecation")
 private AdvanceRequestContext ServerWebExchange createRequest(ServerWebExchange exchange) {
 return new AdvanceRequestContext(exchange);
 }
}

 

  以上已经完成了灰度发布的必要部分,再进行一下AutoConfiguration 注意,这里不能被Spring 扫描

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package top.lingma.gateway.loadbalancer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
@ConditionalOnDiscoveryEnabled
public class AdvanceLoadBalancerAutoConfiguration {
 @Bean
 @ConditionalOnMissingBean
 public ReactorLoadBalancer ServiceInstance reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
 String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
 return new AdvanceRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
 }
}

 

  最后 启动类配置 @LoadBalancerClients 的 defaultConfiguration

  

1
2
3
4
5
6
7
8
9
10
11

@SpringBootApplication()
@LoadBalancerClients(defaultConfiguration = AdvanceLoadBalancerAutoConfiguration.class)
public class LingmaGatewayApplication {
 public static void main(String[] args) {
 SpringApplication.run(LingmaGatewayApplication.class, args);
 }
}

 

  关注公众号 [龗孖] 或搜索公众号[lingmaW] , 获得更多新干货!!! - 本文链接: https://blog.lingma.top/2022/12/01/36d5a1ed4a38/spring-cloud-gateway基于nacos如何去做灰度发布/index.html

  版权声明: 本博客所有文章除特别声明外,均采用 反996许可证版本1.0 许可协议。转载请注明出处!

  以上就是Spring Cloud GateWay基于nacos如何去做灰度发布(springboot 灰度发布)的详细内容,想要了解更多 Spring Cloud GateWay基于nacos如何去做灰度发布的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: