rabbitmq消息过期时间,rabbitmq保证消息持久化成功的条件

  rabbitmq消息过期时间,rabbitmq保证消息持久化成功的条件

  00-1010 1.前言2。设置消息1的有效期。设置队列TTL2的有效期。设置队列Expire3的有效期。发送消息时设置有效期3。截止时间开关DLX

  

目录

RabbitMQ的TTL称为生存时间,表示消息的有效期。如果消息在队列中没有被消耗,并且其存在时间超过TTL,则该消息将成为“死消息”,并且在将来不能被消耗。如果未设置TTL,此消息将永远有效(默认消息不会无效)。如果TTL设置为0,意味着如果消息不能被立即消费,它将被立即丢弃。这个特性可以部分替换RabbitMQ3.0支持的immediate参数,之所以部分替换,是因为immediate参数会有basic.return方法在传递失败时返回消息体(这个功能可以通过死信队列实现)。

 

  设置TTL有两种方法:

  队列有效期:声明队列时在队列的属性中设置,使队列中的所有消息具有相同的有效期。消息有效期:发送消息时,可以为每条消息设置不同的TTL。如果两种方法都设置了,则以较小的一个为准。

  区别:如果声明队列时设置了有效期,则消息过期后会被删除;如果是发送消息时设置的过期日期,消息过期后不会立即删除,因为此时消息是否过期是在送达消费者时判断的。

  

一.前言

 

  00-1010定义队列的方法如下:

  排队。declare ok queue declare(String queue、boolean durable、boolean exclusive、boolean autoDelete、MapString、Object arguments)抛出IOException这个方法的arguments参数可以设置队列的属性,属性名是x-message-ttl,单位是毫秒。添加如下背景词:

  代码设置如下:

  MapString,Object arguments=new hashmap string,Object();arguments.put(x-message-ttl ,10000);//10秒的单位是毫秒channel.queuedeclare (queuename,durable,exclusive,autodelete,arguments);要设置的命令行模式:

   Bitmqctl set _ policyttl 。* { message-TTL 336010000 } -apply-to-queues是通过HTTP接口调用的:

  $ curl-I-u guest : guest-H content-type : application/JSON -XPUT-d { auto _ delete : false, durable:true, arguments : { x-message-TTL : 100000 } } http://IP :15672/API/queues/{ vhost }/{ queue name }

  00-1010 Expire可以使队列自动过期,如果在指定时间内“未使用”则删除。未使用意味着队列中没有使用者,队列没有被重新声明,并且在到期时间内没有调用basic.get命令。例如,这种方法可用于RPC样式的回复队列,其中许多将被创建但从未被使用过。

  服务器将确保队列在到期时间后被删除,但它不保证删除会有多及时。服务器重新启动后,将重新计算持久队列的超时值。x-expires参数的值以毫秒为单位,它受与x-message-ttl相同的约束,不能设置为0。因此,如果将该参数设置为10000,则意味着如果在10s内没有使用,队列将被删除。

  代码如下:

  MapString,Object args=new HashMapString,Object();args.put(x-expires ,10000);channel.queueDeclare(queue ,fa

  lse, false, false, args);

 

  

3.通过发送消息时设置有效期

发送消息的方法如下:

 

  

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在该方法的props参数可以设置其有效期:

 

  

 Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 消息持久 .contentEncoding("UTF-8") // 编码方式 .contentType("text/plain") .expiration("100000") .headers(headers) .build(); channel.basicPublish("", queueName, properties, message.getBytes());

通过HTTPAPI 接口设置:

 

  

$ curl -i -u guest:guest -H "content-type:application/json" -XPOST -d{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"} http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

 

  

三.死信交换机DLX

介绍

 

  死信队列:DLX,dead-letter-exchange利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX消息变成死信几种情况

  消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false消息过期队列达到最大长度死信处理过程

  DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息做相应的处理。用途

  通过监控消费死信队列中消息,来观察和分析数据。结合TTL实现延迟队列(比如下单超过多长时间自动关闭)

  使用

  代码如下:

  

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchangeMap<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置死信交换机args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)channel.queueDeclare("myqueue" , false , false , false , args);

实例

 

  

public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//声明一个交换机,做死信交换机用channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);//声明一个队列,做死信队列用channel.queueDeclare("dlx_queue", true, false, false, null);//队列绑定到交换机上channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);Map<String, Object> arguments=new HashMap<String, Object>();arguments.put("x-message-ttl" , 1000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLXarguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLXarguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键//为队列normal_queue 添加DLXchannel.queueDeclare("normal_queue", true, false, false, arguments);channel.queueBind("normal_queue", "normal_exchange", "");channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));channel.close();connection.close();}

说明:

 

  申明死信队列dlx_queue的绑定如下,与死信交换机dlx_exchange(topic类型)进行绑定,routing key为"dlx.*"申明队列normal_queue,与交换机normal_exchange(fanout类型)进行绑定

  执行流程:

  消息发送到交换机normal_exchange,然后路由到队列normal_queue上因为队列normal_queue没有消费者,消息过期后成为死信消息死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechagedlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。参考:

  https://www.jianshu.com/p/986ee5eb78bc

  https://blog.csdn.net/u012988901/article/details/88958654

  到此这篇关于RabbitMQ之消息有效期与死信的文章就介绍到这了,更多相关RabbitMQ消息有效期内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: