rabbitmq实际应用,rabbitmq 实现

  rabbitmq实际应用,rabbitmq 实现

  00-10101、消息可靠传递1.1、确认模式1.2、返回模式1.3、确认机制2、消费端限流3、消息过期时间4、死信队列4.1、死信概念4.2、延迟队列。

  00-1010使用RabbitMQ时,如果生产者在传递消息时想知道消息是否成功传递到相应的交换机和队列,有两种方法可以控制消息传递的可靠性模式。

  按照上图的整个消息传递过程,生产者的消息在进入中间件时会先到达交换机,然后从交换机传递到队列,这是一个两步走的策略。那么消息的丢失就会发生在这两个阶段,RabbitMQ为我们提供了这两部分可靠的新交付模式:

  confirm 模式.return 模式.这两种回调模式用于确保消息的可靠传递。

  00-1010从生成器传递到交换机的消息将返回confirmCallback的回调。您可以直接在rabbitTemplate实例中设置确认逻辑。如果您使用XML配置,您需要在工厂配置中打开publisher-confirms="true"publisher-confirm-type: correlated,直接配置YAML,默认为无,需要手动打开。

  @ run with(spring JUnit 4 class runner . class)@ context configuration(locations= class path : spring-rabbit MQ . XML )公共类生成器{ @ Autowired private rabbit template rabbit template;@Test public void producer()抛出interrupted exception { rabbit template . setconfirm callback(new rabbit template。confirm callback(){ @ Override public void confirm(correlation data correlation data,boolean b,String s){ system . out . println();如果(!B) {//处理System.out.println(s)如消息重发;} else {System.out.println(交换机成功接收消息);} } });rabbit template . convertandsend( default _ exchange , default_queue , hello world beordie );时间单位。秒.睡眠(5);}}}上面的确认是由一个confirm函数执行的,该函数携带三个参数,第一个参数是配置相关信息,第二个参数表示交换机是否成功接收到消息,第三个参数是指消息没有成功接收的原因。

  00-1010从交换机到消息队列的传递失败将返回returnCallback。在出厂配置中开启回退模式publisher-returns="true",设置交换机处理报文失败的模式(默认为false,直接丢弃报文),增加回退处理的逻辑。

  @ run with(spring JUnit 4 class runner . class)@ context configuration(locations= class path : spring-rabbit MQ . XML )公共类生成器{ @ Autowired private rabbit template rabbit template

  e; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重发逻辑处理 System.out.println(message.getBody() + " 投递消息队列失败"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); }} returnedMessage 中携带五个参数、分别指的是消息对象、错误码、错误信息、交换机、路由键。

  

 

  

1.3、确认机制

 在消费者抓取消息队列中的数据取消费之后会有一个确认机制进行消息的确认,防止因为抓取消息之后但没有消费成功而导致的消息丢失。有三种确认方式:

 

  自动确认acknowledge="none"

  手动确认acknowledge="manual"

  根据异常情况确认acknowledge="auto"

   其中自动确认是指一旦消息被消费者抓取就自动默认成功,并将消息从消息队列中进行移除,如果这个时候消费端消费出现问题,那么也会是默认消息消费成功,但是实际上是没有消费成功的,也就是当前的消息丢失了。默认的情况就是自动确认机制。

   如果设置手动确认的方式,就需要在正常消费消息之后进行回调确认 channel.basicAck(),手动签收。如果业务处理过程中发生了异常则调用 channel.basicNack() 重新发送消息。

   首先需要在队列绑定时进行确认机制的配置,设置为手动签收。

  

<!-- 绑定队列 --><rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/></rabbit:listener-container>

 生产者一端不用更改,只需要改变消费者的实现进行消息自动签收就可以了,正常执行业务则签收消息,业务发生错误则选择消息拒签,消息重发或者丢弃。

 

  

public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息异常"); channel.basicNack(tag, true, true); e.printStackTrace(); } }}

 里面涉及三个简单的签收函数,一是正确签收的 basicAck ,二是单条拒签的 basicReject ,三是批量拒签的 basicNack

 

  basicAck 第一个参数表示消息在通道中的唯一ID,只针对当前的 Channel;第二个参数表示是否批量同意,如果是 false 的话只会同意签收当前ID的一条消息,将其从消息队列中进行删除,而如果是 true 的话将会把此ID之前的消息一起给同意签收了。basicReject 第一个参数依旧表示消息的唯一ID,第二个参数表示是否重新回队发送,false 表示直接丢弃该条消息或者有死信队列可以接收, true 则表示重新回队进行消息发送,所有操作只针对当前的消息。basicNack 比第二个多了一个参数,也就是处于中间位置的布尔值,表示是否批量进行。

 

  

2、消费端限流

 在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。

 

  

 

   只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。

  

<!-- 绑定队列 --><rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/></rabbit:listener-container>

 

  

3、消息过期时间

 消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。

 

   队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。

  

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments></rabbit:queue>

 单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。

 

   如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。

  

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置 message 的过期时间 message.getMessageProperties().setExpiration("5000"); //返回该消息 return message; }};rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。

 

  

 

  

4、死信队列

 

  

4.1、死信概念

死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。

 

  

 

  消息成为死信消息有几种情况队列的消息长度达到限制

  消费者拒接消息的时候不把消息重新放入队列中

  队列存在消息过期设置,消息超时未被消费

  消息存在过期时间,在投递给消费者时发现过期

   在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。

  

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交换机 --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 队列过期时间 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 队列长度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments></rabbit:queue>

 

  

4.2、延迟队列

 延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。

 

   消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL + 死信队列 的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。

   再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。

   例如实现一个下单超时支付取消订单的功能:

  

 

  到此这篇关于java中RabbitMQ高级应用的文章就介绍到这了,更多相关java RabbitMQ内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: