RabbitMQ学习笔记(rabbit mq教程)

  本篇文章为你整理了RabbitMQ学习笔记(rabbit mq教程)的详细内容,包含有rabbitmq教程 pdf rabbit mq教程 rabbitmq简书 rabbitmq入门到精通 RabbitMQ学习笔记,希望能帮助你了解 RabbitMQ学习笔记。

   * 2、给容器中自动配置了

   * RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

   * 所有的属性都是在

   * @EnableConfigurationProperties(RabbitProperties.class)

   * @ConfigurationProperties(prefix = "spring.rabbitmq")

   * public class RabbitProperties

   * 3、给配置文件中配置 spring.rabbitmq 信息

   * 4、@EnableRabbit 开启功能

   * 5、监听消息:使用 @RabbitListener,必须有 @EnableRabbit

   * @RabbitListener:类 + 方法上

   * @RabbitHandler: 只能标在方法上

  

 

 

  

 dependency 

 

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-amqp /artifactId

   /dependency

  

 

  

# rabbit 配置文件

 

  spring.rabbitmq.host=192.168.106.101

  spring.rabbitmq.port=5672

  spring.rabbitmq.virtual-host=/

  

 

  测试

  

package com.atguigu.gulimall.order;

 

  import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;

  import lombok.extern.slf4j.Slf4j;

  import org.junit.Test;

  import org.junit.runner.RunWith;

  import org.springframework.amqp.core.AmqpAdmin;

  import org.springframework.amqp.core.Binding;

  import org.springframework.amqp.core.DirectExchange;

  import org.springframework.amqp.core.Queue;

  import org.springframework.amqp.rabbit.core.RabbitTemplate;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.boot.test.context.SpringBootTest;

  import org.springframework.test.context.junit4.SpringRunner;

  import java.util.Date;

  
* 2、如何收发消息 - RabbitTemplate

   * 如果发送的消息是个对象,使用序列化机制,将对象写出去,对象实现 Serializable 接口

   * 自定义序列化添加配置

   * @Configuration

   * public class MyRabbitConfig {

   * @Bean

   * public MessageConverter messageConverter() {

   * return new Jackson2JsonMessageConverter();

   @Test

   public void sendMessageTest() {

   String msg = "Hello World";

   OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();

   orderReturnApplyEntity.setId(1L);

   orderReturnApplyEntity.setSkuName("华为");

   orderReturnApplyEntity.setCreateTime(new Date());

   rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity);

   log.info("消息发送完成:{}", orderReturnApplyEntity);

   @Test

   public void createExchange() {

   //amqpAdmin

   * DirectExchange

   * public DirectExchange(String name, boolean durable, boolean autoDelete, Map String, Object arguments)

   DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false);

   amqpAdmin.declareExchange(exchange);

   log.info("Exchange[{}]创建成功", "hello.java.exchange");

   @Test

   public void createQueue() {

   * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)

   Queue queue = new Queue("hello-java-queue", true, false,true);

   amqpAdmin.declareQueue(queue);

   log.info("Queue[{}]创建成功", "hello-java-queue");

   @Test

   public void createBinding() {

   * public Binding(String destination【目的地】,

   * DestinationType destinationType【目的地类型】,

   * String exchange【交换机】,

   * String routingKey【路由键】,

   * Map String, Object arguments)【参数】

   * 将 exchange 指定交换机和 destination目的地进行绑定,使用routingKey作为指定路由键

   Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);

   amqpAdmin.declareBinding(binding);

   log.info("Binding == 创建成功");

  

 

  测试监听消息

  

/**

 

   * queues:声明需要监听的所欲队列

   * org.springframework.amqp.core.Message;

   * 参数可以写以下类型

   * 1、Message message;原生消息详细信息,头 + 体

   * 2、T 发送的消息的类型 OrderReturnApplyEntity content

   * 3、Channel channel:当前传输数据的通道

   * Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息

   * 1、订单服务启动多个:同一个消息,只能有一个客户端收到

   * 2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息

  //@RabbitListener(queues = {"hello-java-queue"})

  @RabbitHander

  public void receiveMessage(Message message, OrderReturnReasonEntity content) {

   System.out.println("接收到消息....:"+ message + "=== 内容;" + content + "类型是:" + message.getClass());

   byte[] body = message.getBody();

   //消息头属性信息

   MessageProperties properties = message.getMessageProperties();

   try {

   Thread.sleep(3000);

   } catch (InterruptedException e) {

   e.printStackTrace();

   System.out.println("消息处理完成=》" + content.getName());

  

 

  @RabbitListener

  简介:

  

1.用于标注在监听类或监听方法上,接收消息,需要指定监听的队列(数组)

 

  2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit

  3.@RabbitListener即可以标注在方法上又可以标注在类上

   标注在类上:表示该类是监听类,使得@RabbitHandler注解生效

   标注在方法上:表示该方法时监听方法,会监听指定队列获得消息

  4.一般只标注在方法上,并配合@RabbitHandler使用,重载的方式接收不同消息对象

  

 

  @RabbitHandler

  作用:

  

配合@RabbitListener,使用方法重载的方法接收不同的消息类型

 

  

 

  简介:

  

1.用于标注在监听方法上,接收消息,不需要指定监听的队列

 

  2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit

  3.@RabbitListener只可以标注在方法,重载的方式接收不同消息对象

  

 

  发送端消息确认配置

  1、配置

  2、定制 RabbitTemplate,设置确认回调

  

# rabbit 配置文件

 

  spring.rabbitmq.host=192.168.106.101

  spring.rabbitmq.port=5672

  spring.rabbitmq.virtual-host=/

  # 开启发送端确认

  spring.rabbitmq.publisher-confirms=true

  #开启发送端消息抵达确认

  spring.rabbitmq.publisher-returns=true

  #只要抵达队列。以异步发送优先回调returnconfirm

  spring.rabbitmq.template.mandatory=true

  # 手动ack消息

  spring.rabbitmq.listener.simple.acknowledge-mode=manual

  

 

  

package com.atguigu.gulimall.order.config;

 

  import org.springframework.amqp.core.Message;

  import org.springframework.amqp.rabbit.connection.CorrelationData;

  import org.springframework.amqp.rabbit.core.RabbitTemplate;

  import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

  import org.springframework.amqp.support.converter.MessageConverter;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

  import javax.annotation.PostConstruct;

  @Configuration

  public class MyRabbitConfig {

   @Autowired

   RabbitTemplate rabbitTemplate;

   @Bean

   public MessageConverter messageConverter() {

   return new Jackson2JsonMessageConverter();

   * 定制 rabbitTemplate

   * 1、服务收到消息就回调

   * 1、spring.rabbitmq.publisher-confirms=true

   * 2、设置确认回调ConfirmCallback

   * 2、消息正确地打队列进行回调

   * 1、spring.rabbitmq.publisher-returns=true

   * spring.rabbitmq.template.mandatory=true

   * 2、设置消息抵达队列的回调

   * 3、消费端确认【保证每一个消息被正确消费,此时才可以让broker删除】

   * 1、默认是自动确认,只要消息接受到,自动确认,服务端就会移除这个消息

   * 2、手动确认默认,只要没有明确告诉MQ,货物被签收,没有ACK,消息一直是unacked状态。

   * 即使Cosumer宕机,消息也不会丢失,会重新变成Ready,等待下一次新的consumer链接发给他

   * 3、如果手动确认:Channel channel - long deliveryTag = properties.getDeliveryTag(); - channel.basicAck(deliveryTag, false);

   * channel.basicAck(deliveryTag, false); 签收

   * channel.basicNack(deliveryTag, false, true); 拒签

   @PostConstruct // MyRabbitConfig 对象创建完成以后执行这个方法

   public void initRabbitTemplate(){

   rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

   * 只要抵达服务器,ack就确认为true

   * @param correlationData 当前消息的唯一关联数据(消息的唯一id)

   * @param ack 是否成功或者失败

   * @param cause 失败的原因

   @Override

   public void confirm(CorrelationData correlationData, boolean ack, String cause) {

   System.out.println("confirm..." + correlationData + "== ack:" + ack + "== cause:" + cause);

   //设置消息抵达队列的回调

   rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

   * 只要消息没有投递给指定的队列,就触发失败回调

   * @param message 投递失败的消息详细信息

   * @param replyCode 回复的状态码

   * @param replyText 回复的文本内容

   * @param exchange 消息发给那个交换机

   * @param routingKey 当时这个消息使用哪个路由键

   @Override

   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

   System.out.println("Fail Message:" + message + "== replyTest:" + replyText + "== exchange" + exchange + "== routingKey:" + routingKey);

  

 

  

/**

 

   * queues:声明需要监听的所欲队列

   * p

   * org.springframework.amqp.core.Message;

   * p

   * 参数可以写以下类型

   * 1、Message message;原生消息详细信息,头 + 体

   * 2、T 发送的消息的类型 OrderReturnApplyEntity content

   * 3、Channel channel:当前传输数据的通道

   * p

   * Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息

   * 1、订单服务启动多个:同一个消息,只能有一个客户端收到

   * 2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息

  @RabbitListener(queues = {"hello-java-queue"})

  public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {

   //System.out.println("接收到消息....:"+ message + "=== 内容;" + content + "类型是:" + message.getClass());

   System.out.println("接收到消息....:" + content);

   byte[] body = message.getBody();

   //消息头属性信息

   MessageProperties properties = message.getMessageProperties();

   /*try {

   Thread.sleep(3000);

   } catch (InterruptedException e) {

   e.printStackTrace();

   System.out.println("消息处理完成=》" + content.getName());

   long deliveryTag = properties.getDeliveryTag();

   System.out.println("deliverTag: " + deliveryTag);

   if (deliveryTag % 2 == 0) {

   //收货

   // 签收获取,非批量模式

   channel.basicAck(deliveryTag, false);

   } else {

   //requeue 重新入队

   //basicNack(long deliveryTag, boolean multiple, boolean requeue)

   channel.basicNack(deliveryTag, false, true);

   System.out.println("没有签收的货物....." + deliveryTag);

  

 

  

1.导入mq依赖

 

   !--amqp高级消息队列协议,rabbitmq实现--

   dependency

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-amqp /artifactId

   /dependency

  2.ware模块导入配置

  spring:

   rabbitmq:

   host: 192.168.56.10

   port: 5672

   # 虚拟主机

   virtual-host: /

   # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】

   publisher-confirm-type: correlated

   # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】

   publisher-returns: true

   # 消息在没有被队列接收时是否强行退回

   template:

   mandatory: true

   # 消费者手动确认模式,关闭自动确认,否则会消息丢失

   listener:

   simple:

   acknowledge-mode: manual

  3.添加注解

  // 开启rabbit

  @EnableRabbit

  4.创建配置类

   * @Author: wanzenghui

   * @Date: 2021/12/15 0:04

  @Configuration

  public class MyRabbitConfig {

   @Autowired

   RabbitTemplate rabbitTemplate;

   @Bean

   public MessageConverter messageConverter() {

   // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式

   return new Jackson2JsonMessageConverter();

   * 定制RabbitTemplate

   * 1、服务收到消息就会回调

   * 1、spring.rabbitmq.publisher-confirms: true

   * 2、设置确认回调

   * 2、消息正确抵达队列就会进行回调

   * 1、spring.rabbitmq.publisher-returns: true

   * spring.rabbitmq.template.mandatory: true

   * 2、设置确认回调ReturnCallback

   * p

   * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)

   @PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法)

   public void initRabbitTemplate() {

   * 发送消息触发confirmCallback回调

   * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)

   * @param ack:消息是否成功收到(ack=true,消息抵达Broker)

   * @param cause:失败的原因

   rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {

   System.out.println("发送消息触发confirmCallback回调" +

   "\ncorrelationData === " + correlationData +

   "\nack === " + ack + "" +

   "\ncause === " + cause);

   System.out.println("=================================================");

   * 消息未到达队列触发returnCallback回调

   * 只要消息没有投递给指定的队列,就触发这个失败回调

   * @param message:投递失败的消息详细信息

   * @param replyCode:回复的状态码

   * @param replyText:回复的文本内容

   * @param exchange:接收消息的交换机

   * @param routingKey:接收消息的路由键

   rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {

   // 需要修改数据库 消息的状态【后期定期重发消息】

   System.out.println("消息未到达队列触发returnCallback回调" +

   "\nmessage === " + message +

   "\nreplyCode === " + replyCode +

   "\nreplyText === " + replyText +

   "\nexchange === " + exchange +

   "\nroutingKey === " + routingKey);

   System.out.println("==================================================");

  5.创建ware解锁库存的延时队列、死信队列、交换机、绑定关系

   * 创建队列,交换机,延时队列,绑定关系 的configuration

   * 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖

   * 2.懒加载,只有第一次使用的时候才会创建(例如监听队列)

  @Configuration

  public class MyRabbitMQConfig {

   * 用于首次创建队列、交换机、绑定关系的监听

   * @param message

   @RabbitListener(queues = "stock.release.stock.queue")

   public void handle(Message message) {

   * 交换机

   * Topic,可以绑定多个队列

   @Bean

   public Exchange stockEventExchange() {

   //String name, boolean durable, boolean autoDelete, Map String, Object arguments

   return new TopicExchange("stock-event-exchange", true, false);

   * 死信队列

   @Bean

   public Queue stockReleaseStockQueue() {

   //String name, boolean durable, boolean exclusive, boolean autoDelete, Map String, Object arguments

   return new Queue("stock.release.stock.queue", true, false, false);

   * 延时队列

   @Bean

   public Queue stockDelay() {

   HashMap String, Object arguments = new HashMap ();

   arguments.put("x-dead-letter-exchange", "stock-event-exchange");

   arguments.put("x-dead-letter-routing-key", "stock.release");

   // 消息过期时间 2分钟

   arguments.put("x-message-ttl", 120000);

   return new Queue("stock.delay.queue", true, false, false,arguments);

   * 绑定:交换机与死信队列

   @Bean

   public Binding stockLocked() {

   //String destination, DestinationType destinationType, String exchange, String routingKey,

   // Map String, Object arguments

   return new Binding("stock.release.stock.queue",

   Binding.DestinationType.QUEUE,

   "stock-event-exchange",

   "stock.release.#",

   null);

   * 绑定:交换机与延时队列

   @Bean

   public Binding stockLockedBinding() {

   return new Binding("stock.delay.queue",

   Binding.DestinationType.QUEUE,

   "stock-event-exchange",

   "stock.locked",

   null);

  

 

  以上就是RabbitMQ学习笔记(rabbit mq教程)的详细内容,想要了解更多 RabbitMQ学习笔记的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: