RabbitMQ(rabbitmq怎么保证消息不重复消费)_3

  本篇文章为你整理了RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,包含有rabbitmq如何保证消息不丢 rabbitmq怎么保证消息不重复消费 rabbitmq消息积压如何解决 rabbitmq和kafka的区别 RabbitMQ,希望能帮助你了解 RabbitMQ。

  流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

  
3.MQ实现

  MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

  几种常见MQ的对比:

  
在课前资料已经提供了镜像包:

  上传到虚拟机中后,使用命令加载镜像即可:

  

docker load -i mq.tar

 

  

 

  1.2.安装MQ

  执行下面的命令来运行MQ容器:

  

docker run \

 

   -e RABBITMQ_DEFAULT_USER=itcast \

   -e RABBITMQ_DEFAULT_PASS=123321 \

   --name mq \

   --hostname mq1 \

   -p 15672:15672 \

   -p 5672:5672 \

   -d \

   rabbitmq:3-management

  

 

  注意5672是MQ消息通信的端口,15672是ui端口

  三、RabbitMQ

  大致分为两种

  1.**无交换机的 **基本消息队列 和 工作消息队列

  2.有交换机的发布订阅,路由,主题

  2.基于SpringAMQP使用

  2.1基础消息队列

  
public static void main(String[] args) throws IOException, TimeoutException {

   // 1.建立连接

   ConnectionFactory factory = new ConnectionFactory();

   // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码

   factory.setHost("116.62.32.68");

   factory.setPort(5672);

   factory.setVirtualHost("/");

   factory.setUsername("itcast");

   factory.setPassword("123321");

   // 1.2.建立连接

   Connection connection = factory.newConnection();

   // 2.创建通道Channel

   Channel channel = connection.createChannel();

   // 3.创建队列

   String queueName = "simple.queue";

   channel.queueDeclare(queueName, false, false, false, null);

   // 4.订阅消息

   channel.basicConsume(queueName, true, new DefaultConsumer(channel){

   @Override

   public void handleDelivery(String consumerTag, Envelope envelope,

   AMQP.BasicProperties properties, byte[] body) throws IOException {

   // 5.处理消息

   String message = new String(body);

   System.out.println("接收到消息:【" + message + "】");

   System.out.println("等待接收消息。。。。");

  

 

 

  
groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-amqp /artifactId

   /dependency

  

 

 

  
@RabbitListener(queues = "simple.queue")

   public void listenSimpleQueueMessage(String msg) throws InterruptedException {

   System.out.println("spring 消费者接收到消息:【" + msg + "】");

  

 

 

  
2.2工作消息队列

  消费者:

  

//工作队列

 

  @RabbitListener(queues = "simple.queue")

  public void listenWorkQueue1Message(String msg) throws InterruptedException {

   System.out.println("spring 消费者1接收到消息:【" + msg + "】");

  @RabbitListener(queues = "simple.queue")

  public void listenSimpleQueue2Message(String msg) throws InterruptedException {

   System.out.println("spring 消费者2接收到消息:【" + msg + "】");

  

 

  

logging:

 

   pattern:

   dateformat: MM-dd HH:mm:ss:SSS

  spring:

   rabbitmq:

   host: # 主机名

   port: 5672 # 端口

   virtual-host: / # 虚拟主机

   username: itcast # 用户名

   password: 123321 # 密码

   listener:

   direct:

   prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个

  

 

  2.3广播Fanout

  多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。

  广播:只要队列绑定了交换机,就发送消息到队列

  1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)

  

//3.广播(绑定队列,同时绑定交换机)

 

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "fanout.queue1"),

   exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)

  public void listenFanoutQueue1(String message){

   System.out.println("广播方式队列1接受消息:"+message);

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "fanout.queue2"),

   exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)

  public void listenFanoutQueue2(String message){

   System.out.println("广播方式队列2接受消息:"+message);

  

 

  2.生产者

  

//广播类型交换机

 

  @Test

  public void testFanoutExchangeQueue(){

   //交换机名称

   String exchangeName = "fanout.exange";

   //消息

   String message = "广播消息";

   //发送,第二个参数是队列的key,在路由类型中可以使用到

   rabbitTemplate.convertAndSend(exchangeName,"",message);

  

 

  2.4路由Route

  和广播相比,在队列中多了一个key属性。

  交换机通过消息的key来对应路由到相同key的队列上。

  1.消费者:

  

/**

 

   4.路由

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "direct.queue1"),

   exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),

   key = {"red","blue"}

  public void listenDirectQueue1(String message){

   System.out.println("路由红、蓝队列接受消息:"+message);

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "direct.queue2"),

   exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),

   key = {"red","green"}

  public void listenDirectQueue2(String message){

   System.out.println("路由红、绿队列接受消息:"+message);

  

 

  2.生产者

  

//路由类型交换机

 

  @Test

  public void testDirectExchangeQueue(){

   //交换机名称

   String exchangeName = "direct.exchange";

   //消息

   String message = "路由消息";

   //发送,第二个参数是队列的key,在路由类型中可以使用到

   rabbitTemplate.convertAndSend(exchangeName,"red",message);

  

 

  2.5主题Topic

  key不再是一组单词,而是一个规则

  通配符规则:

  #:匹配一个或多个词

  *:匹配不多不少恰好1个词

  举例:

  item.#:能够匹配item.spu.insert 或者 item.spu

  item.*:只能匹配item.spu

  1.消费者

  

/**

 

   * 主题

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "topic.queue1"),

   exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),

   key = "china.#"

  public void listenTopicQueue1(String message){

   System.out.println("中国队列接受消息:"+message);

  @RabbitListener(bindings = @QueueBinding(

   value = @Queue(name = "topic.queue2"),

   exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),

   key = "#.news"

  public void listenTopicQueue2(String message){

   System.out.println("新闻队列接受消息:"+message);

  

 

  2.生产者

  

//主题类型交换机

 

  @Test

  public void testTopicExchangeQueue(){

   //交换机名称

   String exchangeName = "topic.exchange";

   //消息

   String message = "china.lala消息";

   //发送,第二个参数是队列的key,在路由类型中可以使用到

   rabbitTemplate.convertAndSend(exchangeName,"china.news",message);

  

 

  3.消息转换器

  Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

  默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  因此需要配置一个Json转换器

  在publisher和consumer两个服务中都引入依赖:

  

 dependency 

 

   groupId com.fasterxml.jackson.dataformat /groupId

   artifactId jackson-dataformat-xml /artifactId

   version 2.9.10 /version

   /dependency

  

 

  配置消息转换器。在启动类中添加一个Bean即可:

  

@Bean

 

  public MessageConverter jsonMessageConverter(){

   return new Jackson2JsonMessageConverter();

  

 

  以上就是RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,想要了解更多 RabbitMQ的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: