【RocketMQ】消息的拉取(rocketmq消息推送)

  本篇文章为你整理了【RocketMQ】消息的拉取(rocketmq消息推送)的详细内容,包含有rocketmq接收消息 rocketmq消息推送 rocketmq消息存储 rocketmq消息堆积怎么处理 【RocketMQ】消息的拉取,希望能帮助你了解 【RocketMQ】消息的拉取。

  RocketMQ消息的消费以组为单位,有两种消费模式:

  广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费。
 

  集群模式:同一个消费组下,一个消息队列同一时间只能分配给组内的一个消费者,也就是一条消息只能被组内的一个消费者进行消费。(一般情况下都使用的是集群模式)
 

  消息的获取也有两种模式:

  拉模式:消费者主动发起拉取消息的请求,获取消息进行消费。

  推模式:消息到达Broker后推送给消费者。RocketMQ对拉模式进行了包装去实现推模式,本质还是需要消费者去拉取,一个拉取任务完成后继续下一次拉取。

  首先来看一个RocketMQ源码中基于推模式DefaultMQPushConsumer进行消费的例子,首先为消费者设置了消费者组名称,然后注册了消息监听器,并设置订阅的主题,最后调用start方法启动消费者,接下来就去看看DefaultMQPushConsumer如何进行消息消费的:

  

@RunWith(MockitoJUnitRunner.class)

 

  public class DefaultMQPushConsumerTest {

   private String consumerGroup;

   private String topic = "FooBar";

   private String brokerName = "BrokerA";

   private MQClientInstance mQClientFactory;

   @Mock

   private MQClientAPIImpl mQClientAPIImpl;

   private static DefaultMQPushConsumer pushConsumer;

   @Before

   public void init() throws Exception {

   // ...

   // 消费者组

   consumerGroup = "FooBarGroup" + System.currentTimeMillis();

   // 实例化DefaultMQPushConsumer

   pushConsumer = new DefaultMQPushConsumer(consumerGroup);

   pushConsumer.setNamesrvAddr("127.0.0.1:9876");

   // 设置拉取间隔

   pushConsumer.setPullInterval(60 * 1000);

   // 注册消息监听器

   pushConsumer.registerMessageListener(new MessageListenerConcurrently() {

   @Override

   public ConsumeConcurrentlyStatus consumeMessage(List MessageExt msgs,

   ConsumeConcurrentlyContext context) {

   return null;

   // ...

   // 设置订阅的主题

   pushConsumer.subscribe(topic, "*");

   // 启动消费者

   pushConsumer.start();

  

 

  消费者的启动

  DefaultMQPushConsumer实现了MQPushConsumer接口,它引用了默认的消息推送实现类DefaultMQPushConsumerImpl,在构造函数中可以看到对其进行了实例化,并在start方法中进行了启动:

  

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

 

   * 默认的消息推送实现类

   protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

   * 构造函数

   public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,

   AllocateMessageQueueStrategy allocateMessageQueueStrategy) {

   this.consumerGroup = consumerGroup;

   this.namespace = namespace;

   this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;

   // 实例化DefaultMQPushConsumerImpl

   defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);

   * 启动

   @Override

   public void start() throws MQClientException {

   setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));

   // 启动消费者

   this.defaultMQPushConsumerImpl.start();

   if (null != traceDispatcher) {

   try {

   traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());

   } catch (MQClientException e) {

   log.warn("trace dispatcher start failed ", e);

  

 

  DefaultMQPushConsumerImpl的start方法中处理逻辑如下:

  调用copySubscription方法处理消息订阅,主要是将订阅信息包装成SubscriptionData对象,加入到负载均衡对象rebalanceImpl中

  创建客户端实例对象mQClientFactory,对应实现类为MQClientInstance,拉取服务线程、负载均衡线程都是通过MQClientInstance启动的

  为负载均衡对象RebalanceImpl设置消费组、消费模式、分配策略,RebalanceImpl是一个抽象类,在实例化时可以看到使用的是RebalancePushImpl类型的

  创建消息拉取API对象PullAPIWrapper,用于向Broker发送拉取消息的请求

  根据消费模式,初始化消费进度存储对象offsetStore

  集群模式:消息的消费进度保存在Broker中,使用RemoteBrokerOffsetStore。

  广播模式:消息的消费进度保存在消费者端,使用LocalFileOffsetStore。

  
调用MQClientInstance 的registerConsumer将消费者组的信息注册到MQClientInstance的consumerTable中

  调用mQClientFactory的start方法启动MQClientInstance

  调用mQClientFactory的rebalanceImmediately方法进行负载均衡

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   // MQClientInstance

   private MQClientInstance mQClientFactory;

   // 负载均衡对象,具体使用的是RebalancePushImpl进行实例化

   private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);

   // 消息拉取API对象PullAPIWrapper

   private PullAPIWrapper pullAPIWrapper;

   // 消费进度存储对象

   private OffsetStore offsetStore;

   public synchronized void start() throws MQClientException {

   // 判断状态

   switch (this.serviceState) {

   case CREATE_JUST: // 如果是创建未启动状态

   log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),

   this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

   // 先置为失败状态

   this.serviceState = ServiceState.START_FAILED;

   // 检查配置

   this.checkConfig();

   // 处理消息订阅

   this.copySubscription();

   // 如果是集群模式

   if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

   this.defaultMQPushConsumer.changeInstanceNameToPID();

   // 创建MQClientInstance

   this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

   // 设置消费者组

   this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

   // 设置消费模式

   this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

   // 设置分配策略

   this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

   // 设置MQClientInstance

   this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

   // 创建消息拉取API对象

   this.pullAPIWrapper = new PullAPIWrapper(

   mQClientFactory,

   this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

   // 注册消息过滤钩子

   this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

   if (this.defaultMQPushConsumer.getOffsetStore() != null) {

   this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

   } else {

   // 消费模式判断

   switch (this.defaultMQPushConsumer.getMessageModel()) {

   case BROADCASTING: // 广播模式

   // 消费进度存储在消费者本地,从本地获取

   this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

   break;

   case CLUSTERING: // 集群模式

   // 消费进度需要从Broker获取

   this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

   break;

   default:

   break;

   // 设置消费进度

   this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

   // 加载消费进度

   this.offsetStore.load();

   // 如果是顺序消费

   if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

   this.consumeOrderly = true;

   // 创建顺序消费service:ConsumeMessageOrderlyService

   this.consumeMessageService =

   new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

   } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

   this.consumeOrderly = false;

   // 非顺序消费,使用ConsumeMessageConcurrentlyService

   this.consumeMessageService =

   new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

   // 启动消费服务

   this.consumeMessageService.start();

   // 将消费者信息注册到mQClientFactory中,key为消费者组名称,value为消费者也就是当前对象

   boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

   if (!registerOK) {

   this.serviceState = ServiceState.CREATE_JUST;

   this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());

   throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()

   + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

   null);

   // 启动MQClientInstance

   mQClientFactory.start();

   log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());

   // 状态更改为运行中

   this.serviceState = ServiceState.RUNNING;

   break;

   case RUNNING:

   case START_FAILED:

   case SHUTDOWN_ALREADY:

   throw new MQClientException("The PushConsumer service state not OK, maybe started once, "

   + this.serviceState

   + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

   null);

   default:

   break;

   this.updateTopicSubscribeInfoWhenSubscriptionChanged();

   this.mQClientFactory.checkClientInBroker();

   this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

   // 进行负载均衡

   this.mQClientFactory.rebalanceImmediately();

  public class MQClientInstance {

   // 注册消费者

   public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {

   if (null == group null == consumer) {

   return false;

   // 将消费者组信息添加到consumerTable中

   MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);

   if (prev != null) {

   log.warn("the consumer group[" + group + "] exist already.");

   return false;

   return true;

  

 

  主题订阅处理

  在copySubscription方法中,从defaultMQPushConsumer获取了设置的主题订阅信息,在前面的例子中可以看到向defaultMQPushConsumer中添加了订阅的主题信息,所以这里获取到了之前添加的主题信息MAP集合,其中KEY为主题,VALUE为表达式,然后遍历订阅信息集合,将订阅信息包装成SubscriptionData对象,并加入到负载均衡对象rebalanceImpl中:

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   // DefaultMQPushConsumer

   private final DefaultMQPushConsumer defaultMQPushConsumer;

   private void copySubscription() throws MQClientException {

   try {

   // 获取订阅信息,KEY为主题,VALUE为表达式

   Map String, String sub = this.defaultMQPushConsumer.getSubscription();

   if (sub != null) {

   for (final Map.Entry String, String entry : sub.entrySet()) {

   // 获取主题

   final String topic = entry.getKey();

   // 获取表达式

   final String subString = entry.getValue();

   // 构建主题信息对象

   SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);

   // 加入到负载均衡实现类中

   this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

   if (null == this.messageListenerInner) {

   this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();

   switch (this.defaultMQPushConsumer.getMessageModel()) {

   case BROADCASTING:

   break;

   case CLUSTERING:

   // 获取重试主题

   final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());

   SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);

   // 订阅重试主题

   this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);

   break;

   default:

   break;

   } catch (Exception e) {

   throw new MQClientException("subscription exception", e);

  

 

  创建MQClientInstance

  MQClientInstance中有以下几个主要的成员变量:

  pullMessageService:对应实现类为PullMessageService,是用来拉取消息的服务

  rebalanceService:对应的实现类为RebalanceService,是用来进行负载均衡的服务

  consumerTable:消费者组信息,key为消费者组名称,value为注册的消费者,上面可知在start方法中调用了registerConsumer方法进行了消费者注册

  RebalanceService和PullMessageService都继承了ServiceThread,在MQClientInstance的start方法中,分别调用了pullMessageService和rebalanceService的start方法启动拉取服务线程和负载均衡线程:

  

public class MQClientInstance {

 

   // 拉取消息Service

   private final PullMessageService pullMessageService;

   // 负载均衡service

   private final RebalanceService rebalanceService

   // 消费者组信息,key为消费者组名称,value为注册的消费者

   private final ConcurrentMap String, MQConsumerInner consumerTable = new ConcurrentHashMap String, MQConsumerInner

   public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {

   // ...

   // 创建MQClientAPIImpl

   this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

   // ...

   this.mQAdminImpl = new MQAdminImpl(this);

   // 创建拉取消息service

   this.pullMessageService = new PullMessageService(this);

   // 创建负载均衡service,并在构造函数中传入了当前对象

   this.rebalanceService = new RebalanceService(this);

   this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);

   this.defaultMQProducer.resetClientConfig(clientConfig);

   // ...

   // 启动

   public void start() throws MQClientException {

   synchronized (this) {

   switch (this.serviceState) {

   case CREATE_JUST:

   // ...

   this.startScheduledTask();

   // 启动拉取消息服务

   this.pullMessageService.start();

   // 启动负载均衡服务

   this.rebalanceService.start();

   // ...

   break;

   case START_FAILED:

   throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

   default:

   break;

  

 

  消息拉取服务启动

  PullMessageService继承了ServiceThread,并且使用了阻塞队列pullRequestQueue存储消息拉取请求,PullMessageService被启动后,在run方法中等待pullRequestQueue中拉取请求的到来,然后调用pullMessage方法拉取消息, 在pullMessage中又是调用DefaultMQPushConsumerImpl 的pullMessage进行消息拉取的:

  

public class PullMessageService extends ServiceThread {

 

   // 拉取请求阻塞队列

   private final LinkedBlockingQueue PullRequest pullRequestQueue = new LinkedBlockingQueue PullRequest

   @Override

   public void run() {

   log.info(this.getServiceName() + " service started");

   while (!this.isStopped()) {

   try {

   // 拉取消息

   PullRequest pullRequest = this.pullRequestQueue.take();

   this.pullMessage(pullRequest);

   } catch (InterruptedException ignored) {

   } catch (Exception e) {

   log.error("Pull Message Service Run Method exception", e);

   log.info(this.getServiceName() + " service end");

   // 拉取消息

   private void pullMessage(final PullRequest pullRequest) {

   final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

   if (consumer != null) {

   // 转换为DefaultMQPushConsumerImpl

   DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;

   // 调用pullMessage拉取消息

   impl.pullMessage(pullRequest);

   } else {

   log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);

  

 

  这里可能会有一个疑问,既然PullMessageService在等待拉取请求的到来,那么什么时候会往pullRequestQueue中添加拉取消息的请求?

  可以看到在PullMessageService的executePullRequestImmediately方法中,将拉取请求添加到了阻塞队列pullRequestQueue中:

  

public class PullMessageService extends ServiceThread {

 

   // 拉取请求阻塞队列

   private final LinkedBlockingQueue PullRequest pullRequestQueue = new LinkedBlockingQueue PullRequest

   public void executePullRequestImmediately(final PullRequest pullRequest) {

   try {

   // 向队列中添加拉取消息的请求信息

   this.pullRequestQueue.put(pullRequest);

   } catch (InterruptedException e) {

   log.error("executePullRequestImmediately pullRequestQueue.put", e);

  

 

  那么接下来只需看看哪里调用了PullMessageService的executePullRequestImmediately方法就可以找到在何时向队列中添加拉取请求的:

  可以看到DefaultMQPushConsumerImpl的executePullRequestImmediately方法中调用了PullMessageService的executePullRequestImmediately方法:

  

 public void executePullRequestImmediately(final PullRequest pullRequest) {

 

   // 调用PullMessageService的executePullRequestImmediately方法

   this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);

  

 

  接下来再看看哪里调用了DefaultMQPushConsumerImpl的executePullRequestImmediately:
 

  发现有两处进行了调用:

  DefaultMQPushConsumerImpl的pullMessage方法

  RebalancePushImpl的dispatchPullRequest方法

  前面可知PullMessageService处理拉取请求的时候就是调用的DefaultMQPushConsumerImpl的pullMessage方法进行处理的,所以如果是首次添加拉取请求,一定不是从这个入口添加的,那么首次大概就是从RebalancePushImpl这个地方添加的,接下来就去看看RebalancePushImpl如何添加拉取请求的。

  负载均衡服务启动

  MQClientInstance的start方法中,启动了负责均衡服务的线程,在RebalanceService的run方法中,调用了waitForRunning方法进行阻塞等待,如果负责均衡服务被唤醒,将会调用MQClientInstance的doRebalance进行负载均衡:

  

public class RebalanceService extends ServiceThread {

 

   private static long waitInterval =

   Long.parseLong(System.getProperty(

   "rocketmq.client.rebalance.waitInterval", "20000"));

   private final InternalLogger log = ClientLogger.getLog();

   private final MQClientInstance mqClientFactory; // 引用了MQClientInstance

   // 构造函数

   public RebalanceService(MQClientInstance mqClientFactory) {

   // 设置MQClientInstance

   this.mqClientFactory = mqClientFactory;

   @Override

   public void run() {

   log.info(this.getServiceName() + " service started");

   while (!this.isStopped()) {

   // 等待运行

   this.waitForRunning(waitInterval);

   // 进行负载均衡

   this.mqClientFactory.doRebalance();

   log.info(this.getServiceName() + " service end");

  

 

  负载均衡服务的唤醒

  前面可知DefaultMQPushConsumerImpl在启动的时候调用了MQClientInstance的rebalanceImmediately方法,在rebalanceImmediately方法中可以看到,调用了rebalanceService的wakeup方法唤醒负载均衡线程,(关于wakeup方法的实现前面在讲解消息发送时已经分析过这里不再赘述):

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   public synchronized void start() throws MQClientException {

   // ...

   // 唤醒负载均衡服务,也就是调用MQClientInstance的rebalanceImmediately方法

   this.mQClientFactory.rebalanceImmediately();

  public class MQClientInstance {

   public void rebalanceImmediately() {

   // 唤醒负载均衡服务

   this.rebalanceService.wakeup();

  

 

  负责均衡服务被唤醒后,会调用MQClientInstance的doRebalance进行负载均衡,处理逻辑如下:

  从consumerTable中获取注册的消费者组信息,前面可知consumerTable中存放了注册的消费者信息,Key为组名称,value为消费者

  对consumerTable进行遍历,调用消费者的doRebalance方法对每一个消费者进行负载均衡,前面可知消费者是DefaultMQPushConsumerImpl类型的

  

public class MQClientInstance {

 

   public void doRebalance() {

   // 遍历注册的消费者

   for (Map.Entry String, MQConsumerInner entry : this.consumerTable.entrySet()) {

   MQConsumerInner impl = entry.getValue();

   if (impl != null) {

   try {

   // 负载均衡,前面可知消费者是DefaultMQPushConsumerImpl类型的

   impl.doRebalance();

   } catch (Throwable e) {

   log.error("doRebalance exception", e);

  

 

  接下来进入到DefaultMQPushConsumerImpl的doRebalance,可以看到它又调用了rebalanceImpl的doRebalance进行负载均衡:

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   @Override

   public void doRebalance() {

   if (!this.pause) {

   // 这里又调用了rebalanceImpl的doRebalance进行负载均衡

   this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

  

 

  RebalanceImpl

  RebalanceImpl的doRebalance处理逻辑如下:

  获取订阅的主题信息集合,在订阅处理章节中,可以看到将订阅的主题信息封装成了SubscriptionData并加入到了RebalanceImpl中

  对获取到的订阅主题信息集合进行遍历,调用rebalanceByTopic对每一个主题进行负载均衡

  

public abstract class RebalanceImpl {

 

   public void doRebalance(final boolean isOrder) {

   // 获取订阅的主题信息

   Map String, SubscriptionData subTable = this.getSubscriptionInner();

   if (subTable != null) {

   // 遍历所有订阅的主题

   for (final Map.Entry String, SubscriptionData entry : subTable.entrySet()) {

   final String topic = entry.getKey();

   try {

   // 根据主题进行负载均衡

   this.rebalanceByTopic(topic, isOrder);

   } catch (Throwable e) {

   if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

   log.warn("rebalanceByTopic Exception", e);

   this.truncateMessageQueueNotMyTopic();

  

 

  根据主题进行负载均衡

  rebalanceByTopic方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:

  
如果主题对应的消息队列集合和消费者ID都不为空,对消息队列集合和消费ID集合进行排序

  
获取分配策略,根据分配策略,为当前的消费者分配对应的消费队列,RocketMQ默认提供了以下几种分配策略:

  
AllocateMessageQueueAveragely:平均分配策略,根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数。
 

  
AllocateMessageQueueAveragelyByCircle:平均轮询分配策略,将消息队列逐个分发给每个消费者。
 

  
根据最新分配的消息队列,调用updateProcessQueueTableInRebalance更新当前消费者消费的队列信息

  
// 根据主题进行负载均衡

   private void rebalanceByTopic(final String topic, final boolean isOrder) {

   switch (messageModel) {

   case BROADCASTING: { // 广播模式

   Set MessageQueue mqSet = this.topicSubscribeInfoTable.get(topic);

   // ...

   break;

   case CLUSTERING: { // 集群模式

   // 根据主题获取订阅的消息队列

   Set MessageQueue mqSet = this.topicSubscribeInfoTable.get(topic);

   // 获取所有订阅了该主题的消费者id

   List String cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

   // ...

   if (mqSet != null cidAll != null) { // 如果都不为空

   List MessageQueue mqAll = new ArrayList MessageQueue

   mqAll.addAll(mqSet);

   // 对消息队列排序

   Collections.sort(mqAll);

   // 对消费者排序

   Collections.sort(cidAll);

   // 获取分配策略

   AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

   List MessageQueue allocateResult = null;

   try {

   // 根据分配策略,为当前的消费者分配消费队列

   allocateResult = strategy.allocate(

   this.consumerGroup,

   this.mQClientFactory.getClientId(),

   mqAll,

   cidAll);

   } catch (Throwable e) {

   log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),

   return;

   // 分配给当前消费的消费队列

   Set MessageQueue allocateResultSet = new HashSet MessageQueue

   if (allocateResult != null) {

   // 将分配结果加入到结果集合中

   allocateResultSet.addAll(allocateResult);

   // 根据分配信息更新处理队列

   boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

   // ...

   break;

   default:

   break;

  

 

 

  更新处理队列

  updateProcessQueueTableInRebalance方法同样在RebalanceImpl中,RebalanceImpl中使用了一个ConcurrentMap类型的处理队列表存储消息队列及对应的队列处理信息,updateProcessQueueTableInRebalance方法的入参中topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列,主要处理逻辑如下:

  获取处理队列表processQueueTable进行遍历,处理每一个消息队列,如果队列表为空直接进入第2步:

  判断消息队列所属的主题是否与方法中指定的主题一致,如果不一致继续遍历下一个消息队列

  如果主题一致,判断mqSet中是否包含当前正在遍历的队列,如果不包含,说明此队列已经不再分配给当前的消费者进行消费,需要将消息队列置为dropped,表示删除

  
创建消息拉取请求集合pullRequestList,并遍历本次分配的消息队列集合,如果某个消息队列不在processQueueTable中,需要进行如下处理:

  计算消息拉取偏移量,如果消息拉取偏移量大于0,创建ProcessQueue,并放入处理队列表中processQueueTable

  构建PullRequest,设置消息的拉取信息,并加入到拉取消息请求集合pullRequestList中

  
可以看到,经过这一步,如果分配给当前消费者的消费队列不在processQueueTable中,就会构建拉取请求PullRequest,然后调用dispatchPullRequest处理消息拉取请求。

  

public abstract class RebalanceImpl {

 

   // 处理队列表,KEY为消息队列,VALUE为对应的处理信息

   protected final ConcurrentMap MessageQueue, ProcessQueue processQueueTable = new ConcurrentHashMap MessageQueue, ProcessQueue (64);

   // 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列

   private boolean updateProcessQueueTableInRebalance(final String topic, final Set MessageQueue mqSet,

   final boolean isOrder) {

   boolean changed = false;

   // 处理队列表

   Iterator Entry MessageQueue, ProcessQueue it = this.processQueueTable.entrySet().iterator();

   while (it.hasNext()) {

   Entry MessageQueue, ProcessQueue next = it.next();

   // 获取消息队列

   MessageQueue mq = next.getKey();

   // 获取处理队列

   ProcessQueue pq = next.getValue();

   // 主题是否一致

   if (mq.getTopic().equals(topic)) {

   // 如果队列集合中不包含当前的队列

   if (!mqSet.contains(mq)) {

   // 设置为dropped

   pq.setDropped(true);

   if (this.removeUnnecessaryMessageQueue(mq, pq)) {

   it.remove();

   changed = true;

   log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);

   } else if (pq.isPullExpired()) { // 是否过期

   switch (this.consumeType()) {

   case CONSUME_ACTIVELY:

   break;

   case CONSUME_PASSIVELY:

   pq.setDropped(true); // 设置为删除

   // ...

   break;

   default:

   break;

   // 创建拉取请求集合

   List PullRequest pullRequestList = new ArrayList PullRequest

   // 遍历本次分配的消息队列集合

   for (MessageQueue mq : mqSet) {

   // 如果之前不在processQueueTable中

   if (!this.processQueueTable.containsKey(mq)) {

   // ...

   // 创建ProcessQueue

   ProcessQueue pq = new ProcessQueue();

   long nextOffset = -1L;

   try {

   // 计算消息拉取偏移量

   nextOffset = this.computePullFromWhereWithException(mq);

   } catch (Exception e) {

   log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);

   continue;

   // 如果偏移量大于等于0

   if (nextOffset = 0) {

   // 放入处理队列表中

   ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

   // 如果之前已经存在,不需要进行处理

   if (pre != null) {

   log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);

   } else {

   // 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取

   log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

   PullRequest pullRequest = new PullRequest();

   pullRequest.setConsumerGroup(consumerGroup);// 设置消费组

   pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量

   pullRequest.setMessageQueue(mq);// 设置消息队列

   pullRequest.setProcessQueue(pq);// 设置处理队列

   pullRequestList.add(pullRequest);// 加入到拉取消息请求集合

   changed = true;

   } else {

   log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);

   // 添加消息拉取请求

   this.dispatchPullRequest(pullRequestList);

   return changed;

  

 

  添加拉取请求

  在dispatchPullRequest方法中可以看到,对pullRequestList进行了遍历,然后将每一个拉取请求调用defaultMQPushConsumerImpl的executePullRequestImmediately方法添加到了PullMessageService的阻塞队列中等待进行消息拉取:

  

public class RebalancePushImpl extends RebalanceImpl {

 

   @Override

   public void dispatchPullRequest(List PullRequest pullRequestList) {

   for (PullRequest pullRequest : pullRequestList) {

   // 加入到阻塞队列中

   this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);

   log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);

  

 

  上面可知,如果阻塞队列中添加了拉取消息的请求,接下来会调用DefaultMQPushConsumerImpl的pullMessage方法进行消息拉取,处理逻辑如下:

  从拉取请求中获取处理队列processQueue,判断是否置为Dropped删除状态,如果处于删除状态不进行处理

  从处理队列中获取消息的数量和大小,判断是否超过限制,如果超过限制延迟50毫秒后重新加入到拉取请求队列中进行处理

  判断是否是顺序消费,这里先不讨论顺序消费,如果是非顺序消费,判断processQueue中队列最大偏移量和最小偏移量的间距是否超过ConsumeConcurrentlyMaxSpan的值,如果超过需要进行流量控制,延迟50毫秒后重新加入队列中进行处理

  获取拉取主题的订阅信息,如果为空,延迟3000毫秒后重新进行拉取

  创建消息拉取后的回调函数PullCallback

  构建消息拉取系统标记

  通过PullAPIWrapper的pullKernelImpl方法向Broker发送拉取消息请求

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   * 拉取延迟毫秒数

   private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;

   * 出现异常后的延迟处理毫秒数

   private long pullTimeDelayMillsWhenException = 3000;

   public void pullMessage(final PullRequest pullRequest) {

   // 从请求中获取处理队列

   final ProcessQueue processQueue = pullRequest.getProcessQueue();

   // 如果被置为Droppe。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: