本篇文章为你整理了RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,包含有rabbitmq如何保证消息不丢 rabbitmq怎么保证消息不重复消费 rabbitmq消息积压如何解决 rabbitmq和kafka的区别 RabbitMQ,希望能帮助你了解 RabbitMQ。
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Connection: publisher / consumer和 broker之间的TCP连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP
Connection的开销将是巨大的,效率也较低。Channel是在connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker识别 channel,所以channel 之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建TCP connection的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic(publish-subscribe) and fanout
(multicast)
Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则
binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列
vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 "交换机exchange、绑定Binding、队列Queue",更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。
五种常用模式
1.simple (简单模式)
一个消费者消费一个生产者生产的信息
2.Work queues(工作模式)
一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次
3.Publish/Subscribe(发布订阅模式)
生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息
4.Routing(路由模式)
生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列
5.Topics(主题模式)
生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者
保证消息的稳定性
消息持久化
RabbitMQ的消息默认存在内存中的,一旦服务器意外挂掉,消息就会丢失
消息持久化需做到三点
Exchange设置持久化
Queue设置持久化
Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
ACK确认机制
多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除
这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。
设置集群镜像模式
我们先来介绍下RabbitMQ三种部署模式:
1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
3)镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案
为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。
消息发送失败补偿方案
消息发送失败处理方案
场景一:消息找不到队列导致消息发送失败。
设置mandatory=true
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。
这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。
channel.basicPublish("EXCHANGE_NAME", "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "text".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("返回结果是:" + message);
场景二:生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失
方案一:开启MQ的事务机制
在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。
缺点:只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能。
方案二:生产者将信道设置成confirm(确认)模式
一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。
消息发送失败补偿方案
当消息发送失败后,结合MQ配置,对消息进行重试并记录error日志,达到重试次数后,将处理结果通过回调接口的方式告诉生产者,生产者去进行额外的补偿机制。
confirm方案对比
客户端实现生产者confirm有三种方式:
普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方
注意:
批量模式极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
消息消费失败处理方案
设置死信队列
当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。
监控死信队列长度,日志记录及时预警。
实现延迟队列
RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果
TTL来设置一个消息的的过期时间,DLX设置一个死信队列,将过期的消息推送到死信队列中,消费端监听死信队列来消费数据,从而达到消息延迟的效果。
死信队列补偿机制
当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。
消息防堆积方案
加强对不合理使用MQ的审批。
监控消费能力(耗时 300ms),及时预警。
框架层实现发送方限流。(默认值:100条/s)
设置消息TTL。
使用惰性队列。
关键节点日志记录
MQ成功接受消息时。(info)
生产者消息发送失败时。(error)
生产者confrim确认失败时。(error)
生产消息量过大,限流时。(error)
生产者连接MQ超时时。 (error)
消息大小大于10KB时。(error)
消费者成功消费消息是。(info)
消费者连接MQ超时时。(error)
消费者消费失败时。(error)
消费者进入死信队列时。(error)
消费耗时低于300ms时。(error)
maven引入
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-amqp /artifactId
/dependency
yam配置
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
virtual-host: itclub
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
经典案例代码实现
1. 延迟队列实现
/**
* 延迟队列
* @Author : onePiece
@Configuration
public class TtlQueueConfig {
* 普通交换机名称
public static final String X_EXCHANGE = "X";
* 死信交换机名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
* 普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
* 死信队列名称
public static final String DEAD_LETTER_QUEUE = "QD";
public static final String DEAD_LETTER_QUEUE_KEY = "YD";
* 声明 XExchange
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
* 声明 yExchange
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
* 声明队列QA
@Bean
public Queue queueA(){
Map String, Object arguments = new HashMap (3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
// 设置过期时间
arguments.put("x-message-ttl", 2000);
return new Queue(QUEUE_A, true, true, false, arguments);
* 声明队列QB
@Bean
public Queue queueB(){
Map String, Object arguments = new HashMap (3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
// 设置过期时间
arguments.put("x-message-ttl", 4000);
return new Queue(QUEUE_B, true, true, false, arguments);
* 死信队列QD
@Bean
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
* 绑定交换机
@Bean
public Binding queueABindingX(){
return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
@Bean
public Binding queueBBindingX(){
return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
@Bean
public Binding queueDBindingY(){
return BindingBuilder.bind(queueD()).to(yExchange()).with(DEAD_LETTER_QUEUE_KEY);
生产端
/**
* 延迟队列-生产端
* @author: onePiece
* @date: 2023-02-21
@RestController
@RequestMapping("demo")
public class TtlProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("ttl")
public AjaxResult ttl(){
for (int i = 0; i i++) {
String messageId = Convert.toStr(UUID.randomUUID());
String messageData = "ttl hello rabbit";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map String, Object map = new HashMap (16);
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("X", "XA", map);
}else {
rabbitTemplate.convertAndSend("X", "XB", map);
return AjaxResult.success();
消费端
/**
* 延迟消费
* @author: onePiece
* @date: 2023-02-21
@Component
public class TtlReceiver {
// @RabbitListener(queues = "QA")
// public void xa(Message message) throws Exception {
// String nowDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// byte[] body = message.getBody();
// ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
// System.out.println("QA队列消费: 当前时间" + nowDate + ois.readObject().toString());
@RabbitListener(queues = "QD")
public void xb(Message message) throws Exception {
String nowDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
byte[] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
System.out.println("死信队列消费: 当前时间" + nowDate + ois.readObject().toString());
2. 惰性队列实现
/**
* 惰性队列
* 防止消息堆积
* @author: onePiece
@SuppressWarnings("all")
@Configuration
public class LazyQueueConfig {
public static final String LAZY_TOPIC_EXCHANGE = "lazy.topic.exchange";
public static final String LAZY_TOPIC_QUEUE = "lazy_topic_queue";
public static final String LAZY_TOPIC_ROUTING_KEY = "*.topic.*";
* 声明队列
@Bean
public Queue topicLazyQueue(){
Map String, Object args = new HashMap (2);
args.put("x-queue-mode", "lazy");
* 设置持久化队列
return QueueBuilder.durable(LAZY_TOPIC_QUEUE).withArguments(args).build();
public TopicExchange topicLazyExchange(){
TopicExchange exchange = new TopicExchange(LAZY_TOPIC_EXCHANGE);
return exchange;
* Topic交换器和队列通过bindingKey绑定
* @return
@Bean
public Binding bindingTopicLazyExchangeQueue(){
return BindingBuilder.bind(topicLazyQueue()).to(topicLazyExchange()).with(LAZY_TOPIC_ROUTING_KEY);
消费端
/**
* 惰性队列-生产端
* @author: onePiece
* @date: 2023-02-21
@SuppressWarnings("all")
@RestController("demo")
public class LazyQueueProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
* 创建一个消息是否投递成功的回调方法
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
* @param correlationData 消息的附加信息
* @param ack true for ack, false for nack
* @param cause 是一个可选的原因,对于nack,如果可用,否则为空。
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack){
//可以进行日志记录、异常处理、补偿处理等
System.err.println("异常ack-"+ack+",id-"+correlationData.getId()+",cause:"+cause);
}else {
//更新数据库,可靠性投递机制
System.out.println("正常ack-"+ack+",id-"+correlationData.getId());
try{
System.out.println(new String(correlationData.getReturnedMessage().getBody()));
} catch (Exception e){
* 创建一个消息是否被队列接收的监听对象,如果没有队列接收发送出的消息,则调用此方法进行后续处理
private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
* @param message 被退回的消息
* @param replyCode 错误编码
* @param replyText 错误描述
* @param exchange 交换器
* @param routingKey 路由
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("spring_returned_message_correlation:"+message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY)
+"return exchange: " + exchange
+ ", routingKey: "+ routingKey
+ ", replyCode: " + replyCode
+ ", replyText: " + replyText
+ ",message:" + message);
try {
System.out.println(new String(message.getBody()));
} catch (Exception e){
* 扩展点,在消息转换完成之后,发送之前调用;可以修改消息属性、消息头信息
private final MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = message.getMessageProperties();
* 设置消息发送到队列之后多久被丢弃,单位:毫秒
* 此种方案需要每条消息都设置此属性,比较灵活;
* 还有一种方案是在声明队列的时候指定发送到队列中的过期时间;
* * Queue queue = new Queue("test_queue2");
* * queue.getArguments().put("x-message-ttl", 10000);
* 这两种方案可以同时存在,以值小的为准
//properties.setExpiration("10000");
* 设置消息的优先级
properties.setPriority(9);
* 设置消息发送到队列中的模式,持久化非持久化(只存在于内存中)
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
* 发送消息
* @param exchange 交换器
* @param route 路由键
* @param message 消息
* @param properties
@GetMapping("sendlazyQueue")
public void sendMsg(String exchange, String routingKey, String message, MessageProperties properties){
* 设置生产者消息publish-confirm回调函数
this.rabbitTemplate.setConfirmCallback(confirmCallback);
* 设置消息退回回调函数
this.rabbitTemplate.setReturnCallback(returnCallback);
* 新增消息转换完成后、发送之前的扩展点
this.rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);
try {
if(null == properties){
properties = new MessageProperties();
* 设置消息唯一标识
properties.setMessageId(UUID.randomUUID().toString());
* 创建消息包装对象
Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(properties).build();
* 将消息主题和属性封装在Message类中
Message returnedMessage = MessageBuilder.withBody(message.getBytes()).build();
* 相关数据
CorrelationData correlationData = new CorrelationData();
* 消息ID,全局唯一
correlationData.setId(msg.getMessageProperties().getMessageId());
* 设置此相关数据的返回消息
correlationData.setReturnedMessage(returnedMessage);
* 如果msg是org.springframework.amqp.core.Message对象的实例,则直接返回,否则转化为Message对象
this.rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
} catch (Exception e){
e.printStackTrace();
消费端
/**
* 惰性队列-消费端
* @author: onePiece
* @date: 2023-02-21
@SuppressWarnings("all")
@Component
public class LazyQueueReceiver {
* @param channel 信道
* @param message 消息
* @throws Exception
@RabbitListener(queues = LazyQueueConfig.LAZY_TOPIC_QUEUE)
public void onMessage(Channel channel, Message message) throws Exception {
System.out.println("--------------------------------------");
System.out.println("消费端Payload: " + message.getPayload()+"-ID:"+message.getHeaders().getId()+"-messageId:"+message.getHeaders());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK,获取deliveryTag
channel.basicAck(deliveryTag, false);
以上就是RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,想要了解更多 RabbitMQ的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。