rabbitmq死信队列,rabbitmq延迟消息队列

  rabbitmq死信队列,rabbitmq延迟消息队列

  00-1010死信队列简介示例延迟队列简介使用场景

  00-1010本文介绍RabbitMQ的死信队列和延迟队列。

  这个内容也是Java后端面试的常见问题。

  

目录

 

  

简介

DLX,全称为死信交换,可以被称为死信交换器,或者有人称之为死信邮箱。当一个消息在一个队列中成为死消息时,它可以被重新发送到另一个交换机,即DLX。绑定到DLX的队列称为死消息队列。

 

  以下情况会导致邮件成为一封死信:

  消息被拒绝(基本。拒绝/基本。Nack),requeue参数设置为false消息已过期;已达到最大队列长度。DLX是一个普通的开关,和普通的开关没什么区别。可以在任何队列上指定,其实就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ会自动将消息重新发布到设定的DLX,然后路由到另一个队列,即死信队列。您可以监视此队列中的消息以进行相应的处理。此功能与将消息的TTL设置为0结合使用时,可以弥补immediate参数的功能。

  为队列添加DLX的方法

  1.编码方法

  //create dlx : dlx _ exchange channel . exchange declare( dlx _ exchange , direct );MapString,Object args=new HashMapString,Object;args . put( x-dead-letter-exchange , dlx _ exchange );//为队列myqueue添加dlx channel . queue declare( my queue ,false,false,false,args);您还可以为此DLX指定路由关键字。(除非另有说明,否则将使用原始队列的路由关键字)

  args . put( x-dead-letter-routing-key , dlx-routing-key );法2:命令方式

  rabbitmqctl set_policy DLX。* { dead-letter-exchange : dlx _ exchange } -应用于队列

  00-1010代码

  channel . exchange declare( exchange . dlx , direct ,true);channel . exchange declare( exchange . normal , fanout ,true);MapString,Object args=new HashMapString,Object();args.put(x-message-ttl ,10000);args . put( x-dead-letter-exchange , exchange . dlx );args . put( x-dead-letter-routing-key , routing key );channel . queue declare( queue . normal ,true,false,false,args);channel . queue bind( queue . normal , exchange.normal , );channel . queue declare( queue . dlx ,true,false,false,null);channel.queueBind(queue.dlx , exchange.dlx , routing key );channel . basic publish( exchange . normal , rk ,MessageProperties。PERSISTENT_TEXT_PLAIN, dlx 。getBytes());这里创建了两个交换机exchange.normal和exchange.dlx,分别绑定了两个队列queue.normal和queue.dlx。

  Web管理页面结果

  从下图(图1-1)的Web管理页面可以看出,两个队列都标有“D”,是durable的缩写,即设置了队列持久性。Queue.normal这个队列也配置了TTL、DLX和DLK,其中DLX指的是x-dead-letter-routing-ke。

  y这个属性。

  

 

  

图1-1

  案例分析

  参考下图(图1-2),生产者首先发送一条携带路由键为rk的消息,然后经过交换器exchange.normal顺利地存储到队列queue.normal中。由于队列queue.normal设置了过期时间为10s,在这10s内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了DLX,过期之时,消息被丢给交换器exchange.dlx中,这时找到与exchange.dlx匹配的队列queue.dlx,最后消息被存储在queue.dk这个死信队列中。

  

 

  

图1-2

  对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。DLX配合TTL使用还可以实现延迟队列的功能,详细请看下一节。

  

 

  

延迟队列

简介

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

 

  在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是有两种方案来间接实现:

  方案1:采用rabbitmq-delayed-message-exchange 插件实现。(RabbitMQ 3.6.x开始支持)方案2:通过前面所介绍的DLX和TTL模拟出延迟队列的功能。在图1-2中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于queue.dlx这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为10秒的延迟,

  生产者通过exchange.normal这个交换器将发送的消息存储在queue.normal这个队列中。消费者订阅的并非是queue.normal这个队列,而是queue.dlx这个队列。当消息从queue.normal这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。

  在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为5秒、10秒、30秒、1分钟、5分钟、10分钟、30分钟、1小时这几个维度,当然也可以再细化一下。

  以下图(图2-1)为例进行说明。为简化,只设置5秒、10秒、30秒、1分钟这四个等级。根据需求的不同,生产者发送消息的时候通过设置不同的路由键,将消息发送到与交换器绑定的不同的队列中。这里队列也分别配置了DLX和相应的死信队列,当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

  

 

  

图2-1

  

 

  

使用场景

延迟队列的使用场景有很多,比如:

 

  用户下订单场景:用户下单后有30分钟的时间支付,若30分钟内没有支付,则将这个订单取消。

  方案:用户下单后将取消订单的消息发送到延迟队列,延迟时间设置为30分钟。取消订单这个消息的订阅者程序在30分钟后收到消息,判断该订单的状态是否为已支付,若还没支付,则将该订单状态设置为:已取消。

  定时遥控场景:用户想用手机远程遥控家里的智能设备在指定的时间工作。

  方案:假设用户想要的操作是:开启热水器。首先,将开启热水器这个消息发送到延迟队列,延迟时间设置到用户想要的时间到现在时间的差值。开启热水器这个消息的订阅者程序在指定时间收到消息,再将指令推送到智能设备。

  需要注意的是,延迟队列的消息是不能取消的,解决方案是:在消费消息的时候判断这个消息对应的业务的当前状态。例如:对于取消订单来说,收到消息时,读取这个消息所对应的数据库信息,如果已经是已付款状态了,就不进行任何操作了,如果是未支付状态,则改为已取消。

  到此这篇关于详解RabbitMQ中死信队列和延迟队列的使用详解的文章就介绍到这了,更多相关RabbitMQ死信队列 延迟队列内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: