springcloud集成kafka,spring集成kafka的原理
00-1010一、什么是消息总线二。集成消息总线实现配置自动刷新2.1面向客户端的基础架构2.2面向服务器的架构三。用kafka实现消息总线3.1 Spring Boot集成kafka3.2实现动态刷新3.3指定刷新范围。
00-1010我相信大部分读者之前都用过各种消息队列,比如RabbitMQ,kafka等。消息总线类似于他的概念。在微服务系统的架构中,我们通常使用轻量级消息代理来构建一个公共消息主题,以连接系统中的所有微服务。由于本主题中生成的消息将被所有实例监控和使用,因此我们称之为消息总线。总线上的每个实例可以方便地广播一些消息,这些消息需要被连接到该主题的其他实例所知道,例如配置更改或其他管理操作。
00-1010在上一篇博客中,我们在spring cloud config中实现了微服务架构中的分布式配置中心。但是有一个问题,就是我们在git上修改配置的时候,需要手动通知每个服务实例。这样的操作会在有很多实例的项目中害死人。对于这样的问题,斯平云族一定会考虑并给出解决方案。让我们来看看吧。
目录
当我们的系统按照上图启动时,图中serviceA的三个实例会请求Config Server获取配置,Config Server从Git repository获取配置信息,并按照应用配置的规则返回。
此时,如果我们要修改serviceA的配置。首先去git服务器修改相应的参数值,但这不会触发serviceA实例的属性更新。此时,我们向实例3发送一个post请求。此时实例3会向消息总线发送刷新请求,serviceA的实例1和实例2从总线获取消息事件,并再次从config server获取它们的配置信息,实现配置信息的动态更新。
一、什么是消息总线
在以前的架构中,服务的配置更新需要通过特定服务的实例发送请求,然后触发整个服务集群的配置更新。虽然可以是sad和functional,但这样一来,我们指定的应用实例会和集群中的其他应用实例不一样,会增加集群内容的复杂性,对以后的运维工作不利。
二、整合消息总线实现配置自动刷新
00-1010可以参考这篇文章。
如果spring boot版本采用2.2.5,kafka版本采用2.4.0.RELEASE
00-1010我们利用了上一篇博客中config的两个项目来进行转换。
3.2.1 服务端改造
增加依赖性:
依赖groupIdorg.springframework.cloud/groupId artifactId spring-cloud-starter-bus-Kafka/artifactId/dependency依赖groupIdorg.springframework.cloud/groupId artifactId spring-cloud-starter-stream-Kafka/artifactId/dependency依赖groupIdorg.springframework.cloud/groupId artifactId spring-cloud-bus/artifactId/dependency依赖groupIdorg.springframework.boot/groupId artifactId spring-boot-starter-actuator/artifactor id
</dependency>增加配置:
spring.kafka.bootstrap-servers=211.159.167.180:9092spring.kafka.consumer.group-id=test-consumer-groupspring.cloud.bus.enabled=truemanagement.endpoints.web.exposure.include= *
关于management.endpoints.web.exposure.include= * 的配置需要注意
注意:
__如果是yum的话 ‘’ 需要加 ‘ ’ 单引号*include: ‘*’ http://localhost:8769/actuator/bus-refresh 刷新所有微服务include: ‘refresh’ http://localhost:8769/actuator/bus-refresh 不能访问3.2.2 客户端改造
增加依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-bus</artifactId> </dependency>
增加配置:
management.endpoints.web.exposure.include= *spring.kafka.bootstrap-servers=211.159.167.180:9092spring.cloud.bus.enabled=true
这样就ok 了,启动项目以后,当配置修改以后,我们 给服务端发发送POST请求:http://localhost:7071/actuator/bus-refresh
就可以实现动态刷新:
完整项目地址:https://github.com/zhenghaoxiao/spring-cloud-in-action/tree/bus bus 分支
3.3 指定刷新范围
在上面的例子中,我们通过向服务端请求/actuator/bus-refresh接口,从而触发总线上所有服务实例刷新,但是在一些特殊场景下,我们希望可以刷新服务中某个具体实例的配置,Spring Cloud Bus 对这种场景也有很好的支持,/actuator/bus-refreshdestination=customers:9000 提供了一个destination参数,用来定位具体要刷新的应用程序。当我们调用带有destination参数的 接口时,此时总线上的个应用实例会根据destination属性的值来判断是否为自己的实例名,若符合才进行配置刷新,若不符合就忽略该 消息。
到此这篇关于基于kafka实现Spring Cloud Bus消息总线的文章就介绍到这了,更多相关kafka消息总线Spring Cloud Bus内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。