springboot+rabbitmq,springboot rabbitmq分布式事务

  springboot+rabbitmq,springboot rabbitmq分布式事务

  00-1010前言环境配置配置文件业务消费者截止日期消费者测试

  00-1010 springboot用于实现以下功能。有两个队列1和2,消息被发送到它们。如果处理失败并出现异常,您可以重试三次。如果三次重试失败,消息将被发送到死信队列进行统一处理,如记录数据库、报警等完整的演示项目代码https://gitee.com/daenmax/rabbit-mq-demo.

  

目录

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.41。双击c : program files rabbit MQ server rabbit MQ _ server-3 . 10 . 4 sbin rabbit MQ-server . bat启动MQ服务。2.然后访问http://localhost336015672/,默认账号密码都是guest。3.手动添加虚拟主机作为admin_host,并手动创建具有管理员密码的用户帐户。

 

  pom.xml

  !-rabbit MQ-dependency groupIdorg.springframework.boot/groupId artifactId spring-boot-starter-amqp/artifactId版本2.7.0/version /dependency

  

前言

spring : rabbit MQ : host : 127 . 0 . 0 . 1 port : 5672 username : admin password : admin virtual-host : admin _ host publisher-co nfirm-type : correlated publisher-returns : true listener 3360 simple : acknowledge-mode 3: manual Retry : enabled :

 

  

环境

兔子配置

 

  包com . example . rabitmqdemo . my demo . config;导入lombok . extern . SLF 4j . SLF 4j;导入org . spring framework . amqp . core . *;导入org . spring framework . beans . factory . annotation . qualifier;导入org . spring framework . context . annotation . bean;导入org . spring framework . stereotype.component;导入Java . util . hashmap;导入Java . util . map;/* * * Broker3360它提供传输服务。它的作用是维护一条从生产者到消费者的路线,并确保数据能够以指定的方式传输。* Exchange:消息开关,它指定消息路由到的规则和队列。*承运队列:条消息,每条消息将被投射到一个或多个队列中。* Binding:绑定,其作用是根据路由规则绑定交换和队列。* RoutingKey3360路由关键字,exchange根据该关键字传递邮件。* vhost:虚拟主机。一个代理中可以有多个VHOSTs,用来分隔不同用户的权限。* Producer:消息生成器是传递消息的程序。* Consumer:消息消费者是接收消息的程序。* Channel:消息通道,在每个连接的客户端中,你可以设置多个通道。*/@ slf4j @ component公共类rabbit config {//公共静态最终字符串exchange _ phcp= phcp//业务队列1公共静态最终字符串queue _ company= company//业务队列1的key公共静态最终St

  ring ROUTINGKEY_COMPANY = "companyKey"; //业务队列2 public static final String QUEUE_PROJECT = "project"; //业务队列2的key public static final String ROUTINGKEY_PROJECT = "projectKey"; //死信交换机 public static final String EXCHANGE_PHCP_DEAD = "phcp_dead"; //死信队列1 public static final String QUEUE_COMPANY_DEAD = "company_dead"; //死信队列2 public static final String QUEUE_PROJECT_DEAD = "project_dead"; //死信队列1的key public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead"; //死信队列2的key public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";// /**// * 解决重复确认报错问题,如果没有报错的话,就不用启用这个// *// * @param connectionFactory// * @return// */// @Bean// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// factory.setConnectionFactory(connectionFactory);// factory.setMessageConverter(new Jackson2JsonMessageConverter());// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// return factory;// } /** * 声明业务交换机 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Bean("exchangePhcp") public DirectExchange exchangePhcp() { return new DirectExchange(EXCHANGE_PHCP); } * 声明死信交换机 @Bean("exchangePhcpDead") public DirectExchange exchangePhcpDead() { return new DirectExchange(EXCHANGE_PHCP_DEAD); * 声明业务队列1 * * @return @Bean("queueCompany") public Queue queueCompany() { Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD); //绑定该队列到死信交换机的队列1 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD); return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build(); * 声明业务队列2 @Bean("queueProject") public Queue queueProject() { //绑定该队列到死信交换机的队列2 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD); return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build(); * 声明死信队列1 @Bean("queueCompanyDead") public Queue queueCompanyDead() { return new Queue(QUEUE_COMPANY_DEAD); * 声明死信队列2 @Bean("queueProjectDead") public Queue queueProjectDead() { return new Queue(QUEUE_PROJECT_DEAD); * 绑定业务队列1和业务交换机 * @param queue * @param directExchange @Bean public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY); * 绑定业务队列2和业务交换机 public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT); * 绑定死信队列1和死信交换机 public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD); * 绑定死信队列2和死信交换机 public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);}生产者

  RabbltProducer

  

package com.example.rabitmqdemo.mydemo.producer;import com.example.rabitmqdemo.mydemo.config.RabbitConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.nio.charset.StandardCharsets;import java.util.UUID;@Component@Slf4jpublic class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Resource private RabbitTemplate rabbitTemplate; /** * 初始化消息确认函数 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setMandatory(true); } /** * 发送消息服务器确认函数 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息发送成功" + correlationData); } else { System.out.println("消息发送失败:" + cause); } } /** * 消息发送失败,消息回调函数 * @param returnedMessage */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { String str = new String(returnedMessage.getMessage().getBody()); System.out.println("消息发送失败:" + str); } /** * 处理消息发送到队列1 * @param str */ public void sendCompany(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData); } /** * 处理消息发送到队列2 * @param str */ public void sendProject(String str){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData); //也可以用下面的方式 //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData); }}

 

  

业务消费者

RabbitConsumer

 

  

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * 监听业务交换机 * @author JeWang */@Component@Slf4jpublic class RabbitConsumer { /** * 监听业务队列1 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "company") public void company(Message message, Channel channel) throws IOException { try{ System.out.println("次数" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("处理消息"+s); //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机 //String str = null; //str.split("1"); //处理成功,确认应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("处理消息时发生异常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("异常重试次数已到达设置次数,将发送到死信交换机"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即将返回队列处理重试"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } /** * 监听业务队列2 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "project") public void project(Message message, Channel channel) throws IOException { try{ System.out.println("次数" + message.getMessageProperties().getDeliveryTag()); channel.basicQos(1); Thread.sleep(2000); String s = new String(message.getBody()); log.info("处理消息"+s); //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机 //String str = null; //str.split("1"); //处理成功,确认应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("处理消息时发生异常:"+e.getMessage()); Boolean redelivered = message.getMessageProperties().getRedelivered(); if(redelivered){ log.error("异常重试次数已到达设置次数,将发送到死信交换机"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); }else { log.error("消息即将返回队列处理重试"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}

 

  

死信消费者

RabbitConsumer

 

  

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * 监听死信交换机 * @author JeWang */@Component@Slf4jpublic class RabbitConsumerDead { /** * 处理死信队列1 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "company_dead") public void company_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("处理死信"+s); //在此处记录到数据库、报警之类的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收异常:"+e.getMessage()); } } /** * 处理死信队列2 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "project_dead") public void project_dead(Message message, Channel channel) throws IOException { try{ channel.basicQos(1); String s = new String(message.getBody()); log.info("处理死信"+s); //在此处记录到数据库、报警之类的操作 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("接收异常:"+e.getMessage()); } }}

 

  

测试

MqController

 

  

package com.example.rabitmqdemo.mydemo.controller;import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RequestMapping("/def")@RestController@Slf4jpublic class MsgController { @Resource private RabbltProducer rabbltProducer; @RequestMapping("/handleCompany") public void handleCompany(@RequestBody String jsonStr){ rabbltProducer.sendCompany(jsonStr); }}

到此这篇关于SpringBoot整合RabbitMQ实战附加死信交换机的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ死信交换机内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

 

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: