springcloud rabbitmq消息确认,spring boot rabbitmq手动确认

  springcloud rabbitmq消息确认,spring boot rabbitmq手动确认

  

目录

简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册确认回拨和返回回调消费者消息确认介绍手动确认三种方式

 

  

简介

本文介绍跳羚整合兔子英语字母表中第十七个字母如何进行消息的确认。

 

  

生产者消息确认

 

  

介绍

发送消息确认:用来确认消息从生产者发送到经纪人然后经纪人的交换到长队过程中,消息是否成功投递。

 

  如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。

  

流程

如果消息没有到达交换,则确认回调,ack=false如果消息到达交换,则确认回调,ack=trueexchange到长队成功,则不回调退货交换到长队失败,则回调返回(需设置强制=真,否则不会回调,这样消息就丢了)

 

  

配置

应用程序.阳明海运股份有限公司

 

  # 发送者开启确认确认机制春天。兔子MQ。publisher-confirms=true #发送者开启返回确认机制publisher-returns=true

  

ConfirmCallback

确认回拨:消息只要被兔子q经纪人接收到就会触发确认方法。

 

  @Slf4j@Componentpublic类ConfirmCallbackService实现兔子模板.确认回调{ @ Override public void confirm(correlation data关联数据,boolean ack,String原因){ if(!ack) { log.error(confirm==发送到经纪人失败 r n correlation data={ } r n ack={ } r n cause={ } ,相关数据,ack,cause);} else { log.info(confirm==发送到经纪人成功 r n correlation data={ } r n ack={ } r n cause={ } ,相关数据,ack,cause);} } }相关数据:对象内部有id(消息的唯一性)和消息.

  (若确认字符(确认字符)为假的,则消息不为空,可将消息数据重新投递;若确认字符(确认字符)是没错,则相关数据为空)

  确认:消息投递到交换的状态,真的表示成功。

  原因:表示投递失败的原因。 (若确认字符(确认字符)为假的,则原因不为空若确认字符(确认字符)是没错,则原因为空)

  给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。

  public void send(String dataId,String exchangeName,String rountingKey,String message){ correlation data correlation data=new correlation data();相关数据。setid(dataId);兔子模板。convertandsend(交换名称、rountingKey、消息、相关数据);}公共字符串英语字母表中第十八个字母

  eceive(String queueName){ return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));}2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。

  

CorrelationData cd1 = new CorrelationData();this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

 

  

ReturnCallback

ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。

 

  若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

  注意:需要rabbitTemplate.setMandatory(true);

  当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。

  代码:

  

@Slf4j@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> rn" + "message={}rn" + "replyCode={}rn" + "replyText={}rn" + "exchange={}rn" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); }}

message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

 

  

 

  

注册ConfirmCallback和ReturnCallback

整合后的写法

 

  

package com.example.config; import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j@Configurationpublic class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); return rabbitTemplate; } // 下边这样写也可以 // @Autowired // private RabbitTemplate rabbitTemplate; // @PostConstruct // public void init() { // rabbitTemplate.setMandatory(true); // rabbitTemplate.setReturnCallback(this); // rabbitTemplate.setConfirmCallback(this); // } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("confirm==>发送到broker失败rn" + "correlationData={}rn" + "ack={}rn" + "cause={}", correlationData, ack, cause); } else { log.info("confirm==>发送到broker成功rn" + "correlationData={}rn" + "ack={}rn" + "cause={}", correlationData, ack, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> rn" + "message={}rn" + "replyCode={}rn" + "replyText={}rn" + "exchange={}rn" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); }}

 

  

消费者消息确认

介绍

确认方式简介详述auto(默认)根据消息消费的情况,智能判定若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。nonemq发出消息后直接确认消息manual消费端手动确认消息消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

 

  消息确认三种方式配置方法

  spring.rabbitmq.listener.simple.acknowledge-mode=manual

  spring.rabbitmq.listener.direct.acknowledge-mode=manual

  

 

  

手动确认三种方式

basicAck,basicNack,basicReject

 

  basicAck

  含义

  表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。

  函数原型

  void basicAck(long deliveryTag, boolean multiple)

  deliveryTag

  消息投递序号每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。multiple

  是否批量确认值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

  实例

  

@RabbitHandlerpublic void process(String content, Channel channel, Message message){ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}

basicNack

 

  含义

  表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

  函数原型

  void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  deliveryTag:表示消息投递序号。multiple:是否批量确认。requeue:值为 true 消息将重新入队列。basicReject

  含义

  拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

  函数原型

  void basicReject(long deliveryTag, boolean requeue)

  deliveryTag:表示消息投递序号。requeue:值为 true 消息将重新入队列。以上就是详解SpringBoot整合RabbitMQ如何实现消息确认的详细内容,更多关于SpringBoot RabbitMQ消息确认的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: