rabbitmq 消息发送的流程,rabbitmq发送消息

  rabbitmq 消息发送的流程,rabbitmq发送消息

  

目录
环境配置消息丢失分析生产阶段生产端模拟消息丢失兔子q消费端

  

环境配置

跳靴整合兔子q实现消息的发送。

  1.添加专家依赖

  依赖性groupIdorg.springframework.boot/groupId工件id spring-boot-starter/工件id/依赖性依赖性groupIdorg.springframework.boot/groupId工件id spring-boot-starter-web/工件id/依赖项依赖性groupIdorg.springframework.boot/groupId工件id spring-boot-starter-amqp/工件id/依赖关系2 .添加应用程序。阳明海运股份有限公司配置文件

  春天:兔子MQ :主机: 192。168 .3 .19端口: 5672用户名:管理员密码: xxxx3 .配置交换机、队列以及绑定

  @ Bean public direct exchange my exchange(){ direct exchange direct exchange=new direct exchange(我的exchange );返回directExchange} @Bean公共队列my Queue(){ Queue Queue=new Queue( my Queue );返回队列;} @Bean公共绑定Binding(){返回绑定生成器。bind(我的队列()).到(myExchange()).with( myRoutingKey );}4.生产发送消息

  @ Autowired私兔模板兔模板;@ get mapping(/send )public String send(字符串消息){ rabbit template。convertandsend(我的交换, myRoutingKey ,message);System.out.println(【发送消息】消息)返回【发送消息】消息;}5.消费者接收消息

  @ rabbit listener(queuesToDeclare=@ Queue( my Queue ))public void process(String msg,Channel channel,Message Message){ simple date format SDF=new simple date format( yyyy-MM-DD hh :MM 3360 ss );Date Date=new Date();字符串时间=SDF。格式(日期);System.out.println(【接收信息】味精当前时间时间);6.调用生产端发送消息你好,控制台输出:

  【发送消息】你好【接收信息】你好当前时间2022-05-12 10:21:14

  说明消息已经被成功接收。

  odian">

  

消息丢失分析

  一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

  生产端丢失: 生产者无法传输到RabbitMQ存储端丢失:RabbitMQ存储自身挂了消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费RabbitMQ从生产端、储存端、消费端都对可靠性传输做很好的支持。

  

  

生产阶段

生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

  配置application.yml

  

spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: true
配置

  

@Configuration@Slf4jpublic class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; }}
消息从生产者交换机, 有confirmCallback确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据ack判断消息是否发送成功。

  消息从交换机队列,有returnCallback退回模式。

  发送消息product message控制台输出如下:

  

【发送消息】product message【接收信息】product message 当前时间2022-05-12 11:27:56【correlationData】:null【ack】true【cause】null【发送成功】

  

  

生产端模拟消息丢失

这里有两个方案:

  发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。发送不存在的交换机:

// myExchange 修改成 myExchangexxxxxrabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
结果:

  

【correlationData】:null【ack】false【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)【发送失败】

  

当发送失败可以对消息进行重试

  交换机正确,发送不存在的队列:

  交换机接收到消息,返回成功通知,控制台输出:

  

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]【ack】true【cause】null【发送成功】

  

交换机没有找到队列,返回失败信息:

  

【消息发送失败】【message】product message【replyCode】312

  

  

RabbitMQ

开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

  修改队列的持久化,修改成非持久化:

  

 @Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
发送消息之后,消息存放在队列中,然后重启RabbitMQ,消息不存在了。设置队列持久化:

  

 @Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
重启之后,队列的消息还存在。

  

  

消费端

消费端默认开始ack自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

  修改application.yml文件

  

spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual
消费接收消息之后需要手动确认:

  

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 @RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }
如果不添加:

  

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
发送两条消息

  消息被接收后,没有确认,重新放到队列中:

  

  重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

  加上channel.basicAck之后,再重启项目

  

  队列消息就被删除了

  basicAck方法最后一个参数multiple表示是删除之前的队列。

  multiple设置成true,把后面的队列都清理掉了

  

  到此这篇关于SpringBoot+RabbitMQ实现消息可靠传输详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ消息可靠传输内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: