为什么选择rocketmq,RocketMq

  为什么选择rocketmq,RocketMq

  java基础教程栏目今天详细介绍有关RocketMQ知识。

  很久没有写博客了。虽然我们可以找出无数不写博客的理由,但归根结底还是一个字“懒”。今天终于吃了一颗懒癌的药丸,决定写博客了。有什么好介绍的?想想吧。下面介绍一下RocketMQ。毕竟我写了30多个博客,还没写过一篇关于MQ的好博客。这个博客比较基础,不涉及源代码分析,只是素养。

  

MQ有什么用

  

解耦

  我觉得从某个角度来说,微服务推动了MQ的蓬勃发展。最初,一个系统有n个模块,所有模块都强耦合在一起。现在微服务,一个模块就是一个系统,系统之间必须要有交互。常见的交互方式有三种,一种是RPC,一种是HTTP,一种是MQ。

  

异步

  本来一个业务分为N步,要一步一步处理,最后结果才能返回给用户。现在有了MQ,先处理最关键的部分,然后把消息发给MQ,直接返回给用户OK。至于后面的步骤,在后台慢慢打理,真的是提升用户体验的神器。

  

削峰

  一个接口的请求量突然激增,势必会对应用服务器和数据库服务器造成很大的压力。现在有了MQ,就不怕来多少请求了,可以在后台慢慢处理。

  

RocketMQ简介

   RocketMQ是用Java写的,是阿里的开源消息中间件,吸收了Kafka的很多优点。Kafka也是流行的消息中间件,但是Kafka是用Scala编写的,不利于Java程序员阅读源代码,也不利于Java程序员做一些定制化的开发。接触过卡夫卡的朋友都知道,用好卡夫卡并不容易。相对来说,RocketMQ简单很多。况且RocketMQ得到了阿里的加持,经过了N次双11的考验,所以更适合国内的互联网公司。所以国内使用RocketMQ的公司很多。

  

RocketMQ四大组件

  图片来自gitee.com/mirrors/roc…

  您可以看到RocketMQ有四个主要组件:

  

NameServer

  无状态服务、注册表、集群部署,但名称服务器节点之间没有数据交互。博克将定期向所有域名服务器报告主题路由信息。生产者和消费者将随机选择一个名称服务器主题来定期更新路由信息。主题路由信息在名称服务器集群中采用最终一致性。保证AP。

Borker

   Rocket MQ的服务器用于存储和分发消息。Borker将定期向名称服务器报告其拥有的所有主题路由信息。博克有两个角色:主人和追随者。主设备负责读取(消耗消息)和写入(产生消息)。如果主人忙或不在,跟随者可以进行阅读。BorkerId=0,表示Matser,BorkerId!=0,表示跟随者。有两点需要注意:

  第一,到目前为止,BorkerId=1的Follower只能承担读操作;

  第二,只有更高版本的RocketMQ支持当主节点挂机时,从节点会自动升级到主节点。

Producer

  生产者定期向名称服务器发起主题的路由信息查询。

  

Consumer

  消费者定期向名称服务器发起主题的路由信息查询。

  

为什么注册中心不选用Zookeeper

  其实在RocketMQ的较低版本中,确实选择了Zookeeper作为注册中心,只是后来改成了现在的NameServer。我猜主要原因是:

  RocketMQ已经是中间件了,我不想依赖其他中间件。Zookeeper比较重,还有很多RocketMQ用不到的功能。不如写一个轻量级的注册表。Zookeeper是CP。一旦触发领导人选举,注册中心将不可用。但是RocketMQ的注册中心并不需要很强的一致性,只要保证最终的一致性即可。

RocketMQ消息领域模型

  

Message

  发送的消息。消息必须有主题。一个消息可以有多个标签和多个键,这些可以看作是消息的附加属性。

Topic

  一类消息的集合。每封邮件都必须有一个主题。消息的第一级类型。

Tag

  一条消息除了Topic之外,还可以有Tag,用来细分同一主题下的不同种类的消息。标签不是必需的。消息的第二级类型。

Group

  分为ProducerGroup和ConsumerGroup。我们更关注包含多个消费者的ConsumerGroup。

  在集群消费模式下,一个ConsumerGroup中的消费者共享一个主题,每个消费者会被分配到n个队列中,但一个队列只会被一个消费者消费,不同的消费者群可以消费同一个主题,一条消息会被订阅这个主题的所有消费者群消费。

  

Queue

  默认情况下,一个主题包含四个队列。在群集使用模式下,同一个ConsumerGroup中的使用者可以使用来自多个队列的消息,但是一个队列只能由一个使用者使用。队列中的消息是有序的。分为读队列和写队列。一般来说,读队列的数量和写队列的数量是一样的,否则很容易出错。

消费模式

  有两种消费模式:集群(集群消费)和广播(广播消费)。

  与其他MQ不同,其他MQ在发送消息时指定是集群消费还是广播消费,RocketMQ在消费端设置是集群消费还是广播消费。

  

Clustering(集群消费)

  是默认的集群消耗模式。在这种模式下,ConsumerGroup的所有消费者共享一个主题的消息,每个消费者负责消费n个队列的消息(n也可以是1甚至0,不分配给队列),但一个队列只会被一个消费者消费。如果一个消费者挂断电话,ConsumerGroup下的其他消费者将代替被挂断的消费者继续消费。

  在集群消费模式下,消费进度维护在Borker端,存储路径为$ { rocket _ home }/store/config/consumer offset . JSON,如下图所示:以topicName@consumerGroupName为键,以消费进度为值,值的形式为queueId:offset,这意味着如果有多个consumer group,每个消费组的消费进度是不同的,需要单独存储。

  

Broadcasting(广播消费)

  广播消费消息将发送给ConsumerGroup中的所有消费者。

  在广播模式下,消费进度在消费者端维护。

  

消费队列负载算法与重平衡机制

  

消费队列负载算法

  我们知道,在集群消费模式下,ConsumerGroup中的所有消费者共享一个主题消息,每个消费者负责消费N个队列的消息。那么是怎么分配的呢?这涉及到消费队列加载算法。

  RocketMQ提供了很多消耗队列加载的算法,其中常用的有两种算法,分别是AllocateMessageQueueAveragely和AllocateMessageQueueAveragelyByCircle。我们来看看这两种算法的区别。

  现在假设,一个主题有16个队列,用q0~q15表示,3个消费者,用c0-c2表示。

  使用AllocateMessageQueueAveragely消耗队列加载算法的结果如下:

  C0:Q0 Q1 Q2 Q3 Q4 q5 C1:q6 q 7 q 8 q 9 q 10 c 2:q 11 q 12 q 13 q 14 q 15分配messagequeueAveragelyByCircle消耗队列负载算法的结果如下:

  C0:q0q 3 q 6 q 9 q 12 q 15 c 1:q1q 4 q 7 q 10 q 13 c 2:Q2 q 5 q 8 q 11 q 14消费群所有消费者分享一个话题消息。每个消费者负责消费N个队列的消息,但是一个队列不能同时被N个消费者消费。这是什么意思?

  聪明的你可以认为,如果一个主题只有四个队列和五个消费者,那么一个消费者不会被分配到任何队列。所以在RocketMQ中,Topic下的队列数量直接决定了消费者的最大数量,也就是说不能仅仅通过增加消费者来提高消费率。

  

重平衡

  虽然建议在创建话题时充分考虑排队人数,但实际情况往往不尽如人意。即使排队人数不变,消费者人数也一定会变,比如消费者的线上和线下,比如一个挂断的消费者,或者一个新的消费者。队列的伸缩和消费者的伸缩会导致再平衡,也就是为消费者重新分配消费的队列。

  在RocketMQ中,消费者会定期查询话题的排队人数和消费者人数。如果有变化,就会引发再平衡。

  再平衡是RocketMQ内部实现的,所以程序员不需要关心。

  

Pull OR Push?

  一般来说,MQ有两种获取消息的方法:

  拉:消费者主动拉消息。好处是消费者可以控制消息的频率和数量。消费者知道自己的消费能力,所以在消费端不容易造成消息堆积,但是实时性不是很好,效率比较低。推送:Broker主动发送消息,具有实时、高效的优点。但是Broker无法知道消费者的消费能力,如果发送给消费者的消息太多,消费者身上的消息就会积累。如果发送给消费者的数据太少,消费者将再次闲置。不管是拉还是推,消费者总会和经纪人互动。一般有三种交互方式:短连接、长连接和轮询。

  看似RocketMQ同时支持拉和推,实际上推也是用拉实现的。那么,消费者如何与经纪人互动呢?

  这是RocketMQ设计的巧妙之处。既不是短连接,也不是长连接,也不是轮询,而是长轮询。

  

长轮询

  消费者发起提取消息的请求。有两种情况:

  消息:消费者得到消息后,连接断开。无消息:Borker保持连接一定时间,每隔5秒,检查是否有消息,如果有,给消费者,连接断开。

事务消息

   RocketMQ支持事务消息。生产者将交易消息发送给代理后,代理会将消息存储在系统主题中:rmq _ sys _ trans _ half _ topic,这样消费者就不能消费这个消息了。

  Broker会有一个预定的任务来消费RMQ_SYS_TRANS_HALF_TOPIC的消息,并向生产者发起一个backcheck。有三种回溯检查状态:提交、回滚和未知。

  如果复核状态为提交和回滚,则触发消息的提交和回滚;如果它是未知的,它将等待下一次检查。RocketMQ可以设置消息的检查间隔和检查次数。如果超过一定的检查次数,消息会自动回滚。

延迟消息

  延迟消息是指利息发送给经纪人后,不能立即被消费者消费,需要一定的时间才能被消费。RocketMQ只支持特定的延迟时间:1s 5s 10s 30s 1m 2m 3m 4m 6m 7m 9m 20m 30m 1h 2h。

  

消费形式

   RocketMQ支持两种消费形式:并发消费和顺序消费。

  如果是顺序消费,就要保证排序后的消息在同一个队列中。如何选择发送消息的队列?RocketMQ发送消息的方法中有几个重载,其中一个重载方法支持队列的选择。

  

同步刷盘、异步刷盘

   Producer向Borker发送消息,Borker需要持久化消息。RocketMQ支持两种持久性策略:

  同步刷盘:只有在消息持续之后,Borker才会向生产者返回ACK。优点是消息可靠性高,但是效率慢。异步刷盘:Broker将消息写入PageCache,并向生产者返回ACK。好处是效率极高,但如果服务器挂机,可能会丢失消息。如果只是RocketMQ服务挂机,消息不会丢失。

同步复制、异步复制

  为了MQ的可靠性和可用性,在生产环境中,通常部署从节点,从节点会复制主节点的数据。RocketMQ支持两种复制策略:

  同步:主节点和从节点都成功写入消息,然后向生产者返回ACK,可靠性高,但效率较低。异步复制:只要主节点写成功,就会向生产者返回ACK,效率更高,但可能会丢失消息。“写入”是写入PageCache还是硬盘取决于Follower Broker的配置。

  

再谈谈Producer

   RocketMQ提供了三种发送消息的方式:

  单向:火了就忘,单向消息,意思是消息发出后,就无所谓了。此方法没有返回值。同步:消息发送后,同步等待Borker的响应。异步:消息发出后,会立即返回。收到Boker的响应后,将执行函数调用方法。在实际开发中,一般采用同步的方法。如果要提升RocketMQ的性能,一般会修改Borker的参数,尤其是刷盘策略和复制策略。

  

发送消息重试

  发送消息时,如果使用MessageQueueSelector,消息发送的重试机制将无效。

  发送消息有四种可能的响应:

  公共枚举发送状态{

  SEND_OK,

  刷新磁盘超时,

  FLUSH_SLAVE_TIMEOUT

  SLAVE _不可用,

  }除了第一条,复制代码都有问题。为了确保消息不丢失,需要设置生产者参数:retryanotherbrokerwinnotstoreook为true。

  

故障规避机制

  如果消息发送失败,重试的时候还是发送到这个Borker,那么大概率还是发送失败。RockteMQ的巧妙设计在于,重试时会自动避开这个Borker,选择其他Borker。但是到目前为止,异步发送并没有那么智能,只会在一个Borker上重试,所以强烈建议选择同步发送方式。

  RocketMQ提供了两种故障避免机制。使用参数SendLatencyFaultEnable进行控制。

  True:默认值,只有当您再次尝试时,才会启用故障避免机制。例如,向BorkerA发送消息失败。当你再次尝试时,BorkerB将被选中,但下次发送消息时,你仍会选择发送给BorkerA。True:打开延迟回退机制。一旦发送给BorkerA的消息失败,就会悲观的认为BorkerA在一定时间内不可用,以后也不会再有消息发送给BorkerA。延迟退避机制看起来效果不错,但是一般来说,Borker侧比较忙,导致Borker不可用或者网络不可用。可以立即恢复。如果延迟退避机制开启,原来可用的Borker已经被绕过一段时间,其他Borker更忙,可能会使情况变得更糟。

  

再谈谈Consumer

  

Consumer线程注意事项

   Consumer有两个参数,分别是Consumer eradmin和ConsumeThreadMax,这两个参数似乎给人的感觉是,如果在消费端堆积的消息较少,那么消费线程的数量就是Consumer eradmin;如果在消费端堆积了更多的消息,将会自动启动一个新的线程进行消费,直到消费线程的数量达到ConsumeThreadMax。但事实并非如此。消费者内部持有一个线程池,选择一个无界队列,即ConsumerReadMax参数无效。因此,在实际开发中,ConsumeThreadMin和ConsumerReadMax往往被设置为相同的值。

  

ConsumeFromWhere

  如果无法查询消费进度,消费者从哪里开始消费?RocketMQ支持从最新消息、最早消息和指定时间戳进行消费。

  

消费消息重试

   RocketMQ将为每个ConsumerGroup设置一个主题名为% retry% ConsumerGroup的重试队列,用于保存ConsumerGroup需要重试的消息,但重试需要一定的延迟时间。RocketMQ通过将重试消息保存到主题名为SCHEDULE_TOPIC_XXXX的延迟队列来处理重试消息,后台调度任务根据相应的时间进行延迟,然后保存到%RETRY% consumerGroup的重试队列中。

  

消息堆积、消费能力不够,怎么办

  提高消费进度。这是最好的方法。增加排队人数,增加消费者。原来的消费者作为一个砖家,按照一定的规则把新闻“搬”到了多个新的话题上,然后开了几个ConsumerGroup消费不同的话题。打开一个新的ConsumerGroup进行消费,即两个consumer group同时消费一个话题,但要注意offset的判断。例如,一个ConsumerGroup使用奇数偏移量的消息,而另一个consumer group使用偶数偏移量的消息。本来觉得写扫盲文章应该是一帆风顺的,但还是想多了。因为是素养篇,所以针对接触RocketMQ不多的小伙伴。但是RocketMQ这么简单,不可能用一个博客让接触RocketMQ不多的小伙伴顺利上手吗?所以写博客的时候,我一直在想,这个东西重要吗?需要仔细描述吗?这个东西能不能忽略,能不能不引入等等?你可以看到这篇文章基本上介绍了各种概念,几乎不涉及API层面,因为一旦涉及到API,估计要两周才能写完。

  

End

  以上终于来了.RocketMQ素养文章详情,更多请关注我们的其他相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: