Springboot整合RabbitMQ基本使用()

  本篇文章为你整理了Springboot整合RabbitMQ基本使用()的详细内容,包含有 Springboot整合RabbitMQ基本使用,希望能帮助你了解 Springboot整合RabbitMQ基本使用。

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-amqp /artifactId

   /dependency

  

 

 

  2、rabbitmq链接配置

  

spring:

 

   rabbitmq:

   host: 127.0.0.1

   port: 5672

   username: wq

   password: qifeng

   virtual-host: /

   #开启ack

   listener:

   direct:

   acknowledge-mode: manual

   prefetch: 1 # 限制一次拉取消息的数量

   simple:

   acknowledge-mode: manual #采取手动应答

   #concurrency: 1 # 指定最小的消费者数量

   #max-concurrency: 1 #指定最大的消费者数量

   retry:

   enabled: true # 是否支持重试

  

 

  3、生产者

  声明交换机、队列、绑定关于

  

package com.wanqi.mq;

 

  import org.springframework.amqp.core.*;

  import org.springframework.beans.factory.annotation.Qualifier;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Configuration

  public class RabbitMQConfig {

   public static final String EXCHANGE_NAME = "boot_topic_exchange";

   public static final String QUEUE_NAME = "boot_queue";

   //交换机

   @Bean("bootExchange")

   public Exchange bootExchange(){

   return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true)

   .build();

   @Bean("bootQueue")

   public Queue bootQueue(){

   return QueueBuilder.durable(QUEUE_NAME).build();

   //队列和交换机绑定关系

   知道哪个队列

   知道哪个交换机

   知道routing key

   @Bean

   public Binding bindingQueueExchange(@Qualifier("bootQueue") Queue queue,

   @Qualifier("bootExchange") Exchange exchange){

   return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();

  

 

  

package com.wanqi;

 

  import com.wanqi.mq.RabbitMQConfig;

  import org.junit.jupiter.api.Test;

  import org.springframework.amqp.rabbit.core.RabbitTemplate;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.boot.test.context.SpringBootTest;

  @SpringBootTest

  class SpringbootMqProducersApplicationTests {

   @Autowired

   private RabbitTemplate rabbitTemplate;

   @Test

   void contextLoads() {

   rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "boot mq hello~~~");

  

 

  4、消费者

  

package com.wanqi.listener;

 

  import com.rabbitmq.client.Channel;

  import org.springframework.amqp.core.Message;

  import org.springframework.amqp.rabbit.annotation.RabbitListener;

  import org.springframework.stereotype.Component;

  import java.io.IOException;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Component

  public class RabbitMQListener {

   @RabbitListener(queues = {"boot_queue"})

   public void listenerQueue(Object msg, Message message, Channel channel) {

   final long deliveryTag = message.getMessageProperties().getDeliveryTag();

   try {

   System.out.println(msg.toString());

   System.out.println(new String(message.getBody()));

  // int x = 3/0;

   channel.basicAck(deliveryTag, true);

   } catch (Exception e) {

   try {

   channel.basicNack(deliveryTag, true, true);

   } catch (IOException ex) {

   throw new RuntimeException(ex);

  

 

  5、死信队列

  消息成为死信的三种情况
 

  1、队列消息长度到达限制;
 

  2、消费者拒接消费消息,并且不重回队列;
 

  3、原队列存在消息过期设置,消息到达超时时间未被消费;

  5.1、声明交换机、队列、绑定队列

  topic模式

  

package com.wanqi.mq;

 

  import org.springframework.amqp.core.*;

  import org.springframework.beans.factory.annotation.Qualifier;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Configuration

  public class QDRabbitMQConfig {

   //交换机名称

   public static final String ITEM_EXCHANGE = "item_exchange";

   public static final String DEAD_EXCHANGE = "dead_exchange";

   //队列名称

   public static final String ITEM_QUEUE = "item_queue";

   public static final String DEAD_QUEUE = "dead_queue";

   //声明业务交换机

   @Bean("itemExchange")

   public Exchange itemExchange(){

   return ExchangeBuilder.topicExchange(ITEM_EXCHANGE).durable(true).build();

   //声明死信交换机

   @Bean("deadExchange")

   public Exchange deadExchange(){

   return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();

   * 声明普通队列,设置队列消息过期时间,队列长度,绑定的死信队列

   @Bean("itemQueue")

   public Queue itemQueue(){

   return QueueBuilder

   .durable(ITEM_QUEUE)

   // 队列消息过期时间

   .ttl(10000)

   // 队列长度

   .maxLength(15)

   // 声明当前队列绑定的死信交换机

   .deadLetterExchange(DEAD_EXCHANGE)

   // 声明当前队列死信转发的路由key

   .deadLetterRoutingKey("infoDead.haha")

   .build();

   * 死信队列,消费者需要监听的队列

   @Bean("deadQueue")

   public Queue deadQueue(){

   return QueueBuilder

   .durable(DEAD_QUEUE)

   .build();

   //绑定队列和交换机(业务)

   @Bean

   public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,

   @Qualifier("itemExchange") Exchange exchange){

   return BindingBuilder.bind(queue).to(exchange).with("infoRouting.#").noargs();

   //绑定队列和交换机(死信)

   @Bean

   public Binding deadQueueExchange(@Qualifier("deadQueue") Queue queue,

   @Qualifier("deadExchange") Exchange exchange){

   return BindingBuilder.bind(queue).to(exchange).with("infoDead.#").noargs();

  

 

  5.2、发送测试消息

  

package com.wanqi;

 

  import com.wanqi.mq.RabbitMQConfig;

  import com.wanqi.mq.RabbitMQConfig2;

  import org.junit.jupiter.api.Test;

  import org.springframework.amqp.core.Message;

  import org.springframework.amqp.core.MessageProperties;

  import org.springframework.amqp.rabbit.core.RabbitTemplate;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.boot.test.context.SpringBootTest;

  @SpringBootTest

  class SpringbootMqProducersApplicationTests {

   @Autowired

   private RabbitTemplate rabbitTemplate;

   // 原队列存在消息过期设置,消息到达超时时间未被消费;

   @Test

   void contextLoads3() {

   MessageProperties messageProperties = new MessageProperties();

   // 设置过期时间,单位:毫秒

   messageProperties.setExpiration("5000");

   byte[] msgBytes = "rabbitmq ttl message ...".getBytes();

   Message message = new Message(msgBytes, messageProperties);

   //发送消息

   rabbitTemplate.convertAndSend(QDRabbitMQConfig.ITEM_EXCHANGE,"infoRouting.hehe",message);

   System.out.println("发送消息成功");

   // 模拟队列消息长度到达限制

   @Test

   void contextLoads4() {

   for (int i = 0; i i++) {

   rabbitTemplate.convertAndSend(QDRabbitMQConfig.ITEM_EXCHANGE,"infoRouting.hehe",i + "---message");

  

 

  5.3、消费者

  

package com.wanqi.listener;

 

  import com.rabbitmq.client.Channel;

  import org.springframework.amqp.core.Message;

  import org.springframework.amqp.rabbit.annotation.RabbitListener;

  import org.springframework.stereotype.Component;

  import java.io.IOException;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Component

  public class RabbitMQListener {

   @RabbitListener(queues = {"dead_queue"})

   public void listenerQueue2(Object msg, Message message, Channel channel) {

   final long deliveryTag = message.getMessageProperties().getDeliveryTag();

   try {

   System.out.println(msg.toString());

   channel.basicAck(deliveryTag, true);

   } catch (Exception e) {

   try {

   channel.basicNack(deliveryTag, true, true);

   } catch (IOException ex) {

   throw new RuntimeException(ex);

  

 

  6、延迟队列(TTL + 死信队列)

  6.1、声明交换机、队列、绑定队列

  direct模式

  

 

 

  package com.wanqi.mq;

  import org.springframework.amqp.core.*;

  import org.springframework.beans.factory.annotation.Qualifier;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Configuration

  public class TtlQueueConfig {

   //普通交换机名称

   public static final String X_CHANGE = "X_Exchange";

   //死信交换机名称

   public static final String Y_DEAD_CHANGE = "Y_Exchange";

   //普通队列

   public static final String QUEUE_A = "QA_QUEUE";

   public static final String QUEUE_B = "QB_QUEUE";

   //死信队列

   public static final String DEAD_QUEUE_D = "QD_QUEUE";

   //声明普通交换机

   @Bean("xExchange")

   public DirectExchange xExchange() {

   return ExchangeBuilder.directExchange(X_CHANGE).durable(true)

   .build();

   //声明死信交换机

   @Bean("yExchange")

   public DirectExchange yExchange() {

   return ExchangeBuilder.directExchange(Y_DEAD_CHANGE).durable(true)

   .build();

   * 声明队列,延迟10秒

   @Bean("queueA")

   public Queue queueA() {

   return QueueBuilder.durable(QUEUE_A)

   .deadLetterExchange(Y_DEAD_CHANGE) //死信交换机

   .deadLetterRoutingKey("YD") //死信RoutingKey

   .ttl(10000) //消息过期时间

   .build();

   * 声明队列,延迟40秒

   * @return

   @Bean("queueB")

   public Queue queueB() {

   return QueueBuilder.durable(QUEUE_B)

   .deadLetterExchange(Y_DEAD_CHANGE) //死信交换机

   .deadLetterRoutingKey("YD") //死信RoutingKey

   .ttl(40000) //消息过期时间

   .build();

  
@Bean

   public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {

   return BindingBuilder.bind(queueA).to(xExchange).with("XA");

   //绑定 X_CHANGE绑定queueB

   @Bean

   public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {

   return BindingBuilder.bind(queueB).to(xExchange).with("XB");

  
@Bean

   public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {

   return BindingBuilder.bind(queueD).to(yExchange).with("YD");

  

 

  6.2、发送消息

  

 @Test

 

   void contextLoads5() {

   String message = "延迟队列消息";

   rabbitTemplate.convertAndSend(TtlQueueConfig.X_CHANGE, "XA", "TTL=10s的队列:" + message);

   rabbitTemplate.convertAndSend(TtlQueueConfig.X_CHANGE, "XB", "TTL=40s的队列:" + message);

  

 

  6.3、消费

  

package com.wanqi.listener;

 

  import com.rabbitmq.client.Channel;

  import org.springframework.amqp.core.Message;

  import org.springframework.amqp.rabbit.annotation.RabbitListener;

  import org.springframework.stereotype.Component;

  import java.io.IOException;

  import java.util.Date;

   * @Description TODO

   * @Version 1.0.0

   * @Date 2022/11/19

   * @Author wandaren

  @Component

  public class RabbitMQListener {

   @RabbitListener(queues = "QD_QUEUE")

   public void receiveMessage(Object msg,Message message, Channel channel){

   final long deliveryTag = message.getMessageProperties().getDeliveryTag();

   try {

   System.out.println(msg.toString());

   System.out.print("当前时间: " + new Date());

   System.out.println(" 收到死信队列的消息:" + msg);

   channel.basicAck(deliveryTag, true);

   } catch (Exception e) {

   try {

   channel.basicNack(deliveryTag, true, true);

   } catch (IOException ex) {

   throw new RuntimeException(ex);

  

 

  以上就是Springboot整合RabbitMQ基本使用()的详细内容,想要了解更多 Springboot整合RabbitMQ基本使用的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: