RabbitMQ(rabbitmq怎么保证消息不重复消费)

  本篇文章为你整理了RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,包含有rabbitmq如何保证消息不丢 rabbitmq怎么保证消息不重复消费 rabbitmq消息积压如何解决 rabbitmq和kafka的区别 RabbitMQ,希望能帮助你了解 RabbitMQ。

  Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker

  Connection: publisher / consumer和 broker之间的TCP连接

  Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP

  Connection的开销将是巨大的,效率也较低。Channel是在connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker识别 channel,所以channel 之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建TCP connection的开销

  Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic(publish-subscribe) and fanout

  (multicast)

  Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则

  binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列

  vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 "交换机exchange、绑定Binding、队列Queue",更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。

  五种常用模式

  1.simple (简单模式)

  一个消费者消费一个生产者生产的信息

  2.Work queues(工作模式)

  一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次

  3.Publish/Subscribe(发布订阅模式)

  生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息

  4.Routing(路由模式)

  生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列

  5.Topics(主题模式)

  生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者

  保证消息的稳定性

  消息持久化

  RabbitMQ的消息默认存在内存中的,一旦服务器意外挂掉,消息就会丢失

  消息持久化需做到三点

  Exchange设置持久化

  Queue设置持久化

  Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

  ACK确认机制

  多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

  这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

  设置集群镜像模式

  我们先来介绍下RabbitMQ三种部署模式:

  1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

  2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

  3)镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

  为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

  消息发送失败补偿方案

  消息发送失败处理方案

  场景一:消息找不到队列导致消息发送失败。

  设置mandatory=true

  当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。

  这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。

  

channel.basicPublish("EXCHANGE_NAME", "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "text".getBytes());

 

  channel.addReturnListener(new ReturnListener() {

   @Override

   public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {

   String message = new String(body);

   System.out.println("返回结果是:" + message);

  

 

  场景二:生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失

  方案一:开启MQ的事务机制

  在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。

  缺点:只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能。

  方案二:生产者将信道设置成confirm(确认)模式

  一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。

  消息发送失败补偿方案

  当消息发送失败后,结合MQ配置,对消息进行重试并记录error日志,达到重试次数后,将处理结果通过回调接口的方式告诉生产者,生产者去进行额外的补偿机制。

  confirm方案对比

  客户端实现生产者confirm有三种方式:

  普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。

  批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。

  异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方

  注意:

  批量模式极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

  消息消费失败处理方案

  设置死信队列

  当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。

  监控死信队列长度,日志记录及时预警。

  实现延迟队列

  RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果

  TTL来设置一个消息的的过期时间,DLX设置一个死信队列,将过期的消息推送到死信队列中,消费端监听死信队列来消费数据,从而达到消息延迟的效果。

  死信队列补偿机制

  当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。

  消息防堆积方案

  加强对不合理使用MQ的审批。

  监控消费能力(耗时 300ms),及时预警。

  框架层实现发送方限流。(默认值:100条/s)

  设置消息TTL。

  使用惰性队列。

  关键节点日志记录

  MQ成功接受消息时。(info)

  生产者消息发送失败时。(error)

  生产者confrim确认失败时。(error)

  生产消息量过大,限流时。(error)

  生产者连接MQ超时时。 (error)

  消息大小大于10KB时。(error)

  消费者成功消费消息是。(info)

  消费者连接MQ超时时。(error)

  消费者消费失败时。(error)

  消费者进入死信队列时。(error)

  消费耗时低于300ms时。(error)

  maven引入

  

 dependency 

 

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-amqp /artifactId

   /dependency

  

 

  yam配置

  

spring:

 

   #配置rabbitMq 服务器

   rabbitmq:

   host: 127.0.0.1

   port: 5672

   username: guest

   password: guest

   #虚拟host 可以不设置,使用server默认host

   virtual-host: itclub

   # 开启publisher-confirm,

   # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

   publisher-confirm-type: correlated

   # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

   publisher-returns: true

   # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

   template:

   mandatory: true

  

 

  经典案例代码实现

  1. 延迟队列实现

  

/**

 

   * 延迟队列

   * @Author : onePiece

  @Configuration

  public class TtlQueueConfig {

   * 普通交换机名称

   public static final String X_EXCHANGE = "X";

   * 死信交换机名称

   public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

   * 普通队列名称

   public static final String QUEUE_A = "QA";

   public static final String QUEUE_B = "QB";

   * 死信队列名称

   public static final String DEAD_LETTER_QUEUE = "QD";

   public static final String DEAD_LETTER_QUEUE_KEY = "YD";

   * 声明 XExchange

   @Bean

   public DirectExchange xExchange(){

   return new DirectExchange(X_EXCHANGE);

   * 声明 yExchange

   @Bean

   public DirectExchange yExchange(){

   return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);

   * 声明队列QA

   @Bean

   public Queue queueA(){

   Map String, Object arguments = new HashMap (3);

   // 设置死信交换机

   arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

   // 设置死信路由键

   arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);

   // 设置过期时间

   arguments.put("x-message-ttl", 2000);

   return new Queue(QUEUE_A, true, true, false, arguments);

   * 声明队列QB

   @Bean

   public Queue queueB(){

   Map String, Object arguments = new HashMap (3);

   // 设置死信交换机

   arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

   // 设置死信路由键

   arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);

   // 设置过期时间

   arguments.put("x-message-ttl", 4000);

   return new Queue(QUEUE_B, true, true, false, arguments);

   * 死信队列QD

   @Bean

   public Queue queueD(){

   return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();

   * 绑定交换机

   @Bean

   public Binding queueABindingX(){

   return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");

   @Bean

   public Binding queueBBindingX(){

   return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");

   @Bean

   public Binding queueDBindingY(){

   return BindingBuilder.bind(queueD()).to(yExchange()).with(DEAD_LETTER_QUEUE_KEY);

  

 

  生产端

  

/**

 

   * 延迟队列-生产端

   * @author: onePiece

   * @date: 2023-02-21

  @RestController

  @RequestMapping("demo")

  public class TtlProvider {

   @Autowired

   private RabbitTemplate rabbitTemplate;

   @GetMapping("ttl")

   public AjaxResult ttl(){

   for (int i = 0; i i++) {

   String messageId = Convert.toStr(UUID.randomUUID());

   String messageData = "ttl hello rabbit";

   String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

   Map String, Object map = new HashMap (16);

   map.put("messageId", messageId);

   map.put("messageData", messageData);

   map.put("createTime", createTime);

   if (i % 2 == 0) {

   rabbitTemplate.convertAndSend("X", "XA", map);

   }else {

   rabbitTemplate.convertAndSend("X", "XB", map);

   return AjaxResult.success();

  

 

  消费端

  

/**

 

   * 延迟消费

   * @author: onePiece

   * @date: 2023-02-21

  @Component

  public class TtlReceiver {

  // @RabbitListener(queues = "QA")

  // public void xa(Message message) throws Exception {

  // String nowDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

  // byte[] body = message.getBody();

  // ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));

  // System.out.println("QA队列消费: 当前时间" + nowDate + ois.readObject().toString());

   @RabbitListener(queues = "QD")

   public void xb(Message message) throws Exception {

   String nowDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

   byte[] body = message.getBody();

   ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));

   System.out.println("死信队列消费: 当前时间" + nowDate + ois.readObject().toString());

  

 

  2. 惰性队列实现

  

/**

 

   * 惰性队列

   * 防止消息堆积

   * @author: onePiece

  @SuppressWarnings("all")

  @Configuration

  public class LazyQueueConfig {

   public static final String LAZY_TOPIC_EXCHANGE = "lazy.topic.exchange";

   public static final String LAZY_TOPIC_QUEUE = "lazy_topic_queue";

   public static final String LAZY_TOPIC_ROUTING_KEY = "*.topic.*";

   * 声明队列

   @Bean

   public Queue topicLazyQueue(){

   Map String, Object args = new HashMap (2);

   args.put("x-queue-mode", "lazy");

   * 设置持久化队列

   return QueueBuilder.durable(LAZY_TOPIC_QUEUE).withArguments(args).build();

  
public TopicExchange topicLazyExchange(){

   TopicExchange exchange = new TopicExchange(LAZY_TOPIC_EXCHANGE);

   return exchange;

   * Topic交换器和队列通过bindingKey绑定

   * @return

   @Bean

   public Binding bindingTopicLazyExchangeQueue(){

   return BindingBuilder.bind(topicLazyQueue()).to(topicLazyExchange()).with(LAZY_TOPIC_ROUTING_KEY);

  

 

  消费端

  

/**

 

   * 惰性队列-生产端

   * @author: onePiece

   * @date: 2023-02-21

  @SuppressWarnings("all")

  @RestController("demo")

  public class LazyQueueProvider {

   @Autowired

   private RabbitTemplate rabbitTemplate;

   * 创建一个消息是否投递成功的回调方法

   private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

   * @param correlationData 消息的附加信息

   * @param ack true for ack, false for nack

   * @param cause 是一个可选的原因,对于nack,如果可用,否则为空。

   @Override

   public void confirm(CorrelationData correlationData, boolean ack, String cause) {

   if(!ack){

   //可以进行日志记录、异常处理、补偿处理等

   System.err.println("异常ack-"+ack+",id-"+correlationData.getId()+",cause:"+cause);

   }else {

   //更新数据库,可靠性投递机制

   System.out.println("正常ack-"+ack+",id-"+correlationData.getId());

   try{

   System.out.println(new String(correlationData.getReturnedMessage().getBody()));

   } catch (Exception e){

   * 创建一个消息是否被队列接收的监听对象,如果没有队列接收发送出的消息,则调用此方法进行后续处理

   private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {

   * @param message 被退回的消息

   * @param replyCode 错误编码

   * @param replyText 错误描述

   * @param exchange 交换器

   * @param routingKey 路由

   @Override

   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

   System.err.println("spring_returned_message_correlation:"+message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY)

   +"return exchange: " + exchange

   + ", routingKey: "+ routingKey

   + ", replyCode: " + replyCode

   + ", replyText: " + replyText

   + ",message:" + message);

   try {

   System.out.println(new String(message.getBody()));

   } catch (Exception e){

   * 扩展点,在消息转换完成之后,发送之前调用;可以修改消息属性、消息头信息

   private final MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

   @Override

   public Message postProcessMessage(Message message) throws AmqpException {

   MessageProperties properties = message.getMessageProperties();

   * 设置消息发送到队列之后多久被丢弃,单位:毫秒

   * 此种方案需要每条消息都设置此属性,比较灵活;

   * 还有一种方案是在声明队列的时候指定发送到队列中的过期时间;

   * * Queue queue = new Queue("test_queue2");

   * * queue.getArguments().put("x-message-ttl", 10000);

   * 这两种方案可以同时存在,以值小的为准

   //properties.setExpiration("10000");

   * 设置消息的优先级

   properties.setPriority(9);

   * 设置消息发送到队列中的模式,持久化非持久化(只存在于内存中)

   properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

   return message;

   * 发送消息

   * @param exchange 交换器

   * @param route 路由键

   * @param message 消息

   * @param properties

   @GetMapping("sendlazyQueue")

   public void sendMsg(String exchange, String routingKey, String message, MessageProperties properties){

   * 设置生产者消息publish-confirm回调函数

   this.rabbitTemplate.setConfirmCallback(confirmCallback);

   * 设置消息退回回调函数

   this.rabbitTemplate.setReturnCallback(returnCallback);

   * 新增消息转换完成后、发送之前的扩展点

   this.rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);

   try {

   if(null == properties){

   properties = new MessageProperties();

   * 设置消息唯一标识

   properties.setMessageId(UUID.randomUUID().toString());

   * 创建消息包装对象

   Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(properties).build();

   * 将消息主题和属性封装在Message类中

   Message returnedMessage = MessageBuilder.withBody(message.getBytes()).build();

   * 相关数据

   CorrelationData correlationData = new CorrelationData();

   * 消息ID,全局唯一

   correlationData.setId(msg.getMessageProperties().getMessageId());

   * 设置此相关数据的返回消息

   correlationData.setReturnedMessage(returnedMessage);

   * 如果msg是org.springframework.amqp.core.Message对象的实例,则直接返回,否则转化为Message对象

   this.rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);

   } catch (Exception e){

   e.printStackTrace();

  

 

  消费端

  

/**

 

   * 惰性队列-消费端

   * @author: onePiece

   * @date: 2023-02-21

  @SuppressWarnings("all")

  @Component

  public class LazyQueueReceiver {

   * @param channel 信道

   * @param message 消息

   * @throws Exception

   @RabbitListener(queues = LazyQueueConfig.LAZY_TOPIC_QUEUE)

   public void onMessage(Channel channel, Message message) throws Exception {

   System.out.println("--------------------------------------");

   System.out.println("消费端Payload: " + message.getPayload()+"-ID:"+message.getHeaders().getId()+"-messageId:"+message.getHeaders());

   Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

   //手工ACK,获取deliveryTag

   channel.basicAck(deliveryTag, false);

  

 

  以上就是RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,想要了解更多 RabbitMQ的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: