rocketmq广播模式风险,rocketmq事务消息实现

  rocketmq广播模式风险,rocketmq事务消息实现

  消息队列消息模式主要有两种:广播模式、集群模式(负载均衡模式)

  广播模式是每个消费者,都会消费消息;

  负载均衡模式是每一个消费只会被某一个消费者消费一次;

  我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;

  我们可以通过@RocketMQMessageListener的消息模型属性值来设置,消息模型.广播是广播模式,消息模型.使聚集是默认集群负载均衡模式

  下面来介绍下springboot rockermq整合实现广播消息

  创建跳羚项目,添加rockermq依赖!- rocketMq依赖-依赖关系groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId版本2 . 2 . 1/版本/依赖关系配置rocketmq#端口服务器:端口: 8083

  # 配置rocketmqrocketmq:名称-服务器: 127 .0 .0 .133609876 #生产者生产者: #生产者组名,规定在一个应用里面必须唯一组:组1 #消息发送的超时时间默认3000毫秒发送消息超时: 3000 #消息达到4096字节的时候,消息就会被压缩。默认4096压缩消息正文阈值: 4096 #最大的消息限制,默认为128K最大邮件大小: 4194304 #同步消息发送失败重试次数发送失败时重试次数: 3 #在内部发送失败时是否重试其他代理,这个参数在有多个经纪人时才生效重试-下一个-服务器:真#异步消息发送失败重试的次数发送异步失败时的重试次数: 3

  生产端:新建一个控制器来做消息发送生产端按正常发送逻辑发送消息即可

  包com。举例。springbootrockeddemo。控制器;导入org。阿帕奇。火箭MQ。春天。核心。火箭MQ模板;导入org。spring框架。豆子。工厂。注释。自动连线;导入org。spring框架。网络。绑定。注释。请求映射;导入org。spring框架。网络。绑定。注释。休息控制器;/** * 广播消息* @ author qzz */@ RestControllerpublic class RocketMQBroadCOntroller { @ auto wired private RocketMQTemplate RocketMQTemplate;/** * 发送广播消息*/@请求映射(/testBroadSend )public void testSyncSend(){//参数一:主题如果想添加标签,可以使用主题:标签的写法//参数二:消息内容for(int I=0;i10I){ rocket MQ模板。convertandsend(测试主题范围,测试消息 I);} }}创建两个消费者来消费消息我们先集群负载均衡测试,加上消息模型=消息模型。使聚集

  消费者1:

  包com。举例。springbootrockedemo。配置;导入org。阿帕奇。火箭MQ。春天。注释。消息模型;导入组织。阿帕奇。火箭MQ。春天。注释。rocket MQ messagelistener进口

  org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); }}消费者2: 与消费者1在 同一个consumerGroup 和 topic

  

package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); }}

启动服务,测试 集群模式消费集群模式测试: 两个消费者平摊 消息

 

  

 

  把上面两个消费者的 messageModel 属性值修改成 广播模式消费者1:

  

package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); }}

消费者2: 与消费者1在 同一个consumerGroup 和 topic

 

  

package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); }}

重启服务,测试 广播模式消费

 

  广播模式消费下,两个消费者都消费到Topic的所有消息。

  测试成功!

  到此这篇关于Springboot详解RocketMQ实现广播消息流程的文章就介绍到这了,更多相关Springboot广播消息内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: