rabbitmq延迟发送,springboot rabbitmq 死信队列
00-1010 0.导言1。死信队列1.2什么是死信?1.3什么是死信队列?1.4创建死信交换机和死信队列1.5实现死信消息1.5.1实现基于消费者reject或nack的死信消息1.5.2实现基于生存时间的死信消息1.5.3实现基于队列max_length的消息1.6实现基于死信队列的消息延迟发送2 .延迟开关3。应用场景4。练习题
00-1010死信队列是消息队列中一个非常重要的概念。同时,我们在所有业务场景中都需要延迟发货的概念,比如12306中30分钟后取消未付款订单。那么本期我们就来讲解一下死信队列,以及如何通过延时开关实现延时发送的需求。
目录
00-1010在了解死信队列之前,我们先来解释一下什么是死信。所谓死信,就是没有消费成功的消息,但并不是所有没有消费成功的消息都是死信消息。死信消息来自以下三种途径:(1)被消费者拒绝的消息,参数requeue设置为false(2)过期消息。过期消息有两种:a .发送消息时,设置一条消息的消息TTL,如果该消息尚未消费,则标记为死信消息。b .设置队列的消息生存期,如果消息未被消费,队列中的所有消息都将被标记为死信消息。(3)当队列达到最大长度时,再次发送的消息将直接成为死信消息。
00-1010直接来说,用来存放死信的队列就是死信队列,这似乎是一句废话,所以它的重点是理解死信的概念。
死信队列的作用:(1)当队列满时,会将消息发送到死信队列,这样消息就不会丢失。您可以将消息从死信队列中取出,供以后使用。(2)基于死信队列可以达到延缓消耗的效果。具体实现我们后面再解释。
00-1010死信交换机和死信队列都是普通的交换机和队列,只是专门声明用于存储死信消息。我们只需要用deadLetterExchange方法声明死信开关,然后用deadLetterRoutingKey方法声明死信队列。
如下面的代码所示,我们创建了test.queue、test.exchange和dead.queue、dead.exchange,并将死信开关和死信路由分配给test.queue中的测试队列。
注意:如果涉及修改队列和交换机的属性,如果队列和交换机已经存在,需要删除后才能生效;否则,可能会报告错误。
@ configuration public class rabbit MQ config { private static final String TEST _ EXCHANGE= TEST . EXCHANGE ;私有静态最终字符串TEST _ QUEUE= test.queue私有静态最终字符串TEST _ ROUTING _ KEY= TEST . ROUTING . KEY ;私有静态最终字符串DEAD _ EXCHANGE= dead.exchange私有静态最终字符串DEAD _ QUEUE= dead.queue私有静态最终字符串DEAD _ ROUTING _ KEY= DEAD . ROUTING . KEY ;@Bean公共队列deadQueue(){返回新队列(DEAD _ Queue);} public Direct exchange dead exchange(){//设置演示,使用Direct exchange,可以根据自己的业务情况为其他类型的交易所申报返回新的Direct exchange(dead _ exchange);公共绑定deadBinding(Queue deadQueue,Exchange dead Exchange){ return Binding builder . bind(dead Queue)。至(deadExchange)。with(DEAD_ROUTING_KEY)。noargs();公共队列testQueue(){ return QueueBu
ilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build(); public DirectExchange testExchange(){ return new DirectExchange(TEST_EXCHANGE); public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){ return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);}
1.5 实现死信消息
1.5.1 基于消费者进行reject或nack实现死信消息
@Componentpublic class QueueListener { @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE) public void handler(MyMessage messageInfo, Message message, Channel channel) { try{ System.out.println("接收的消息:"+messageInfo.toString()); // requeue参数设置为false 设置死信消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); // multiple和requeue设置为false 设置死信消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // 返回ack 确认接收到消息// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (IOException e){ try { channel.basicRecover(); } catch (IOException ex) { ex.printStackTrace(); log.error("消息处理失败:{}",e.getMessage()); } } }}
1.5.2 基于生存时间实现
(1)发送消息时设置生存时间
@GetMapping("sendTestQueueWithExpiration") public String sendTestQueueWithExpiration(){ MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date()); rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> { msg.getMessageProperties().setExpiration("5000"); return msg; }); return "发送成功"; }
(2)队列设置生存时间
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 10s 过期 .ttl(10000) .build(); }
1.5.3 基于队列max_length实现
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 容量最大100条 .maxLength(100) .build(); }
1.6 基于死信队列实现消息延迟发送
上述我们说过死信队列还可以消息延迟发送,其思路就是: (1)消息发送时设置消息的生存时间,其生存时间就是我们想要延迟的时间 (2)消息者监控死信队列进行消费
正常队列的消息因为没有消费者消费,同时又指定了生存时间,到达时间后消息转发到死信队列中,消费者监听了死信队列从而将其消费掉。
基于死信队列实现消息延迟发送的问题
如果有两个消息,一个是5s生存时间,一个是10s生存时间,当我们先发送了10s生存时间的消息到queue中时,因为rabbitmq只会监控队列最外侧的消息的生存时间,也就是监控10s生存时间的消息,而5s生存时间的消息只会在最外侧的10s消息到期后才会监控,也就导致我实际需要5s生存的消息,实际需要10s才监听到了。
所以呢,基于死信队列实现的延迟消息,只使用于延迟时间一致的消息。
为了适配更多的延迟场景,已经更加简单的实现延迟消息,我们引入了延迟交换机
2. 延迟交换机
延迟交换机并不是rabbitmq自带的功能,而是要通过安装延迟交换机插件delayed_message_exchange
来实现
其插件的安装我们之间已经讲解过,不再累叙,可以参考如下博文 springcloud:安装rabbitmq并配置延迟队列插件
通过延迟交换机实现的延迟消息,其重点主要在交换机上,队列就是普通队列,消息发送到交换机上后,会记录消息的延迟时间,到达时间后才会发送到队列中,这样消费者通过监控队列,就能在指定时间获取到消息
因此延迟交换机与普通交换机的实现,只在创建交换机时,其他的操作与普通交换机无异,因此使用起来也很方便
创建延迟交换机,通过x-delayed-type
属性声明交换机类型,可以是direct也可以是topic,具体支持4中交换机类型,如果不清楚的可以参考之前的博文
@Configurationpublic class RabbitMqDelayConfig { public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_QUEUE = "delay.queue"; public static final String DELAY_ROUTING_KEY = "delay.routing.key"; @Bean public Exchange delayExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type","direct"); return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue delayQueue(){ return new Queue(DELAY_QUEUE); } @Bean public Binding delayBinding(Queue delayQueue, Exchange delayExchange){ return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); }}
发送消息时指定延迟时间,单位毫秒
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(30000); return message; } });
我们还可以将该方法封装为工具类方法,方便之后调用
/** * 发送 延迟队列 * @param exchange 交换机 * @param routeKey 路由 * @param message 消息 * @param delaySecond 延迟秒数 */public void send(String exchange, String routeKey, Object message, int delaySecond){ rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {// 消息持久化 msg.getMessageProperties().setDelay(delaySecond * 1000);return msg;});}
3. 应用场景
延迟消息的应用场景丰富,除了我们开篇所说的30分钟未支付自动取消订单,还比如到货后72小时未签收自动签收
基本上所有需要延迟触发的业务场景都可以用rabbitmq延迟队列来实现。
4. 练习题
对于刚接触rabbitmq的同学,这里我提供一个练习题给大家,也让大家在实操中加强对于rabbitmq的理解:
需求:订单到货后72小时未签收,自动签收 讲解:我们这里要实现订单到货后的自动签收功能,订单到货后会触发发送自动签收消息的方法,订单已签收的状态status为2,到货状态为1,如果72小时前已经签收了即status被更新为2了,那么需要取消自动签收(不执行自动签收,即忽略自动签收消息)
到此这篇关于springcloud:RabbitMQ死信队列与延迟交换机实现的文章就介绍到这了,更多相关springcloudRabbitMQ死信队列与延迟交换机内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。