rocketmq广播模式风险,rocketmq事务消息实现
消息队列消息模式主要有两种:广播模式、集群模式(负载均衡模式)
广播模式是每个消费者,都会消费消息;
负载均衡模式是每一个消费只会被某一个消费者消费一次;
我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;
我们可以通过@RocketMQMessageListener的消息模型属性值来设置,消息模型.广播是广播模式,消息模型.使聚集是默认集群负载均衡模式
下面来介绍下springboot rockermq整合实现广播消息
创建跳羚项目,添加rockermq依赖!- rocketMq依赖-依赖关系groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId版本2 . 2 . 1/版本/依赖关系配置rocketmq#端口服务器:端口: 8083
# 配置rocketmqrocketmq:名称-服务器: 127 .0 .0 .133609876 #生产者生产者: #生产者组名,规定在一个应用里面必须唯一组:组1 #消息发送的超时时间默认3000毫秒发送消息超时: 3000 #消息达到4096字节的时候,消息就会被压缩。默认4096压缩消息正文阈值: 4096 #最大的消息限制,默认为128K最大邮件大小: 4194304 #同步消息发送失败重试次数发送失败时重试次数: 3 #在内部发送失败时是否重试其他代理,这个参数在有多个经纪人时才生效重试-下一个-服务器:真#异步消息发送失败重试的次数发送异步失败时的重试次数: 3
生产端:新建一个控制器来做消息发送生产端按正常发送逻辑发送消息即可
包com。举例。springbootrockeddemo。控制器;导入org。阿帕奇。火箭MQ。春天。核心。火箭MQ模板;导入org。spring框架。豆子。工厂。注释。自动连线;导入org。spring框架。网络。绑定。注释。请求映射;导入org。spring框架。网络。绑定。注释。休息控制器;/** * 广播消息* @ author qzz */@ RestControllerpublic class RocketMQBroadCOntroller { @ auto wired private RocketMQTemplate RocketMQTemplate;/** * 发送广播消息*/@请求映射(/testBroadSend )public void testSyncSend(){//参数一:主题如果想添加标签,可以使用主题:标签的写法//参数二:消息内容for(int I=0;i10I){ rocket MQ模板。convertandsend(测试主题范围,测试消息 I);} }}创建两个消费者来消费消息我们先集群负载均衡测试,加上消息模型=消息模型。使聚集
消费者1:
包com。举例。springbootrockedemo。配置;导入org。阿帕奇。火箭MQ。春天。注释。消息模型;导入组织。阿帕奇。火箭MQ。春天。注释。rocket MQ messagelistener进口
org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); }}消费者2: 与消费者1在 同一个consumerGroup 和 topic
package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); }}
启动服务,测试 集群模式消费集群模式测试: 两个消费者平摊 消息
把上面两个消费者的 messageModel 属性值修改成 广播模式消费者1:
package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); }}
消费者2: 与消费者1在 同一个consumerGroup 和 topic
package com.example.springbootrocketdemo.config;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */@Service@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); }}
重启服务,测试 广播模式消费
广播模式消费下,两个消费者都消费到Topic的所有消息。
测试成功!
到此这篇关于Springboot详解RocketMQ实现广播消息流程的文章就介绍到这了,更多相关Springboot广播消息内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。