springboot rabbitmq延迟队列,rabbitmq死信队列和延时队列

  springboot rabbitmq延迟队列,rabbitmq死信队列和延时队列

  

目录

简介实例代码路由配置控制器发送器接收器应用程序。阳明海运股份有限公司实例测试

 

  

简介

说明

 

  本文用示例介绍跳羚整合兔子q时如何处理死信队列/延迟队列。

  RabbitMQ消息简介

  兔子q的消息默认不会超时。

  什么是死信队列?什么是延迟队列?

  死信队列:

  DLX,全称为死信交换,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(死信)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定尊贵型的队列就称之为死信队列。

  以下几种情况会导致消息变成死信:

  消息被拒绝(基础。拒绝/基本. Nack),并且设置请求参数为假的;消息过期;队列达到最大长度。延迟队列:

  延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

  相关网址

  详解兔子q中死信队列和延迟队列的使用详解

  

实例代码

 

  

路由配置

包com。举例。配置;导入org。spring框架。amqp。核心。*;导入org。spring框架。豆子。工厂。注释。自动连线;导入org。spring框架。语境。注释。豆;导入org。spring框架。语境。注释。配置;@ configuration公共类RabbitRouterConfig { public static final String EXCHANGE _ TOPIC _ WELCOME= EXCHANGE @ TOPIC。‘欢迎’;公共静态最终字符串EXCHANGE _ FANOUT _ un route= EXCHANGE @ FANOUT。“联合国路线”;公共静态最终字符串EXCHANGE _ TOPIC _ DELAY= EXCHANGE @ TOPIC。延迟;public static final String routing key _ HELLOS= hello .#;公共静态最终字符串路由key _ DELAY= DELAY .#;public static final String QUEUE _ HELLO= QUEUE @ HELLO ;公共静态最终字符串QUEUE _ HI= QUEUE @ HI public static final String QUEUE _ un route= QUEUE @ un route ;公共静态最终字符串QUEUE _ DELAY= Queue @ delay公共静态最终整数TTL _ QUEUE _ MESSAGE=5000 @ auto wired amqpad min amqpad min @ Bean Object init binding test(){ amqpad min。申报交易所(交易所生成器。扇出交换(交换扇出路由).耐用(真)。自动删除()。build());amqp管理。申报交易所(交易所生成器。话题交换(EXCHANGE _ TOPIC _ DELAY))。耐用(真)。自动删除()。build());amqp管理。申报交易所(交易所生成器。话题交流(EXCHANGE _ TOPIC _ WELCOME))。耐用(真)。autoDe

 

  lete() .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO) .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY) .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY) .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build()); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null)); return new Object(); }}

 

  

控制器

package com.example.controller; import com.example.config.RabbitRouterConfig;import com.example.mq.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestControllerpublic class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); }}

 

  

发送器

package com.example.mq; import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; import java.util.Date; @Componentpublic class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); }}

 

  

接收器

package com.example.mq; import com.example.config.RabbitRouterConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void hello(String hello) throws InterruptedException { // System.out.println ("Receiver(hello) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(hello):sleep over"); // } // // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) // public void unroute(String hello) throws InterruptedException { // System.out.println ("Receiver(unroute) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(unroute):sleep over"); // } @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY) public void delay(String hello) throws InterruptedException { System.out.println ("Receiver(delay) : " + hello); Thread.sleep(5 * 1000); System.out.println("(delay):sleep over"); }}

 

  

application.yml

server:# port: 9100 port: 9101spring: application:# name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456# virtualHost: / publisher-confirms: true publisher-returns: true# listener:# simple:# acknowledge-mode: manual# direct:# acknowledge-mode: manual

 

  

实例测试

分别启动发送者和接收者。

 

  访问:http://localhost:9100/hello2

  五秒钟后输出:

  

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548(delay):sleep over

 

  

以上就是SpringBoot整合RabbitMQ处理死信队列和延迟队列的详细内容,更多关于SpringBoot RabbitMQ死信队列 延迟队列的资料请关注盛行IT其它相关文章!

 

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: