【RocketMQ】消息的消费(rocketmq消息模型)

  本篇文章为你整理了【RocketMQ】消息的消费(rocketmq消息模型)的详细内容,包含有rocketmq消息类型有多少种 rocketmq消息模型 rocketmq消息堆积消费慢 rocketmq消费过程 【RocketMQ】消息的消费,希望能帮助你了解 【RocketMQ】消息的消费。

  上一讲【RocketMQ】消息的拉取

  当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyService的submitConsumeRequest方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:

  如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可

  如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费

  如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交

  RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。

  

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

 

   @Override

   public void submitConsumeRequest(

   final List MessageExt msgs,

   final ProcessQueue processQueue,

   final MessageQueue messageQueue,

   final boolean dispatchToConsume) {

   final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

   // 如果消息的个数小于等于批量消费的大小

   if (msgs.size() = consumeBatchSize) {

   // 构建消费请求

   ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

   try {

   // 加入到消费线程池中

   this.consumeExecutor.submit(consumeRequest);

   } catch (RejectedExecutionException e) {

   this.submitConsumeRequestLater(consumeRequest);

   } else {

   // 遍历消息

   for (int total = 0; total msgs.size(); ) {

   // 创建消息列表,大小为consumeBatchSize,用于批量提交使用

   List MessageExt msgThis = new ArrayList MessageExt (consumeBatchSize);

   for (int i = 0; i consumeBatchSize; i++, total++) {

   if (total msgs.size()) {

   // 加入到消息列表中

   msgThis.add(msgs.get(total));

   } else {

   break;

   // 创建ConsumeRequest

   ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

   try {

   // 加入到消费线程池中

   this.consumeExecutor.submit(consumeRequest);

   } catch (RejectedExecutionException e) {

   for (; total msgs.size(); total++) {

   msgThis.add(msgs.get(total));

   // 如果出现拒绝提交异常,延迟进行提交

   this.submitConsumeRequestLater(consumeRequest);

  

 

  消费任务运行

  ConsumeRequest是ConsumeMessageConcurrentlyService的内部类,实现了Runnable接口,在run方法中,对消费任务进行了处理:

  
因为延迟消息的主题在后续处理的时候被设置为SCHEDULE_TOPIC_XXXX,所以这里需要重置。

  
获取消息监听器,调用消息监听器的consumeMessage进行消息消费,并返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESS和RECONSUME_LATER

  CONSUME_SUCCESS:表示消息消费成功。

  RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。

  
再次判断消息所属的处理队列是否处于删除状态,如果不处于删除状态,调用processConsumeResult方法处理消费结果

  


public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

 

   class ConsumeRequest implements Runnable {

   private final List MessageExt msgs;

   private final ProcessQueue processQueue; // 处理队列

   private final MessageQueue messageQueue; // 消息队列

   @Override

   public void run() {

   // 如果处理队列已被删除

   if (this.processQueue.isDropped()) {

   log.info("the message queue not be able to consume, because its dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);

   return;

   // 获取消息监听器

   MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

   ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);

   ConsumeConcurrentlyStatus status = null;

   // 重置消息重试主题名称

   defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

   ConsumeMessageContext consumeMessageContext = null;

   // 如果设置了钩子函数

   if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

   // ...

  // 执行钩子函数

   ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

   long beginTimestamp = System.currentTimeMillis();

   boolean hasException = false;

   ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

   try {

   if (msgs != null !msgs.isEmpty()) {

   for (MessageExt msg : msgs) {

   // 设置消费开始时间戳

   MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));

   // 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态

   status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

   } catch (Throwable e) {

   log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",

   RemotingHelper.exceptionSimpleDesc(e),

   ConsumeMessageConcurrentlyService.this.consumerGroup,

   msgs,

   messageQueue), e);

   hasException = true;

   // 计算消费时长

   long consumeRT = System.currentTimeMillis() - beginTimestamp;

   if (null == status) {

   if (hasException) {

   // 出现异常

   returnType = ConsumeReturnType.EXCEPTION;

   } else {

   // 返回NULL

   returnType = ConsumeReturnType.RETURNNULL;

   } else if (consumeRT = defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时

   returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时

   } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费

   returnType = ConsumeReturnType.FAILED; // 返回类置为失败

   } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态

   returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功

   // ...

   // 如果消费状态为空

   if (null == status) {

   log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",

   ConsumeMessageConcurrentlyService.this.consumerGroup,

   msgs,

   messageQueue);

   // 状态置为延迟消费

   status = ConsumeConcurrentlyStatus.RECONSUME_LATER;

   // 如果设置了钩子函数

   if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

   consumeMessageContext.setStatus(status.toString());

   consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);

   // 执行executeHookAfter方法

   ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);

   ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()

   .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

   if (!processQueue.isDropped()) {

   // 处理消费结果

   ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

   } else {

   log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);

  // 重置消息重试主题

  public class DefaultMQPushConsumerImpl implements MQConsumerInner {

   public void resetRetryAndNamespace(final List MessageExt msgs, String consumerGroup) {

   // 获取消费组的重试主题:%RETRY% + 消费组名称

   final String groupTopic = MixAll.getRetryTopic(consumerGroup);

   for (MessageExt msg : msgs) {

   // 获取消息的重试主题名称

   String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

   // 如果重试主题不为空并且与消费组的重试主题一致

   if (retryTopic != null groupTopic.equals(msg.getTopic())) {

   // 设置重试主题

   msg.setTopic(retryTopic);

   if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {

   msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

  // 消费结果状态

  public enum ConsumeConcurrentlyStatus {

   * 消费成功

   CONSUME_SUCCESS,

   * 消费失败,延迟进行消费

   RECONSUME_LATER;

  

 

  处理消费结果

  一、设置ackIndex

  ackIndex的值用来判断失败消息的个数,在processConsumeResult方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:

  CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。

  RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。

  二、处理消费失败的消息

  广播模式

  广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

  集群模式

  开启for循环,初始值为i = ackIndex + 1,结束条件为i consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

  消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。

  延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。

  三、移除消息,更新拉取偏移量

  以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。

  

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

 

   public void processConsumeResult(

   final ConsumeConcurrentlyStatus status,

   final ConsumeConcurrentlyContext context,

   final ConsumeRequest consumeRequest

   // 获取ackIndex

   int ackIndex = context.getAckIndex();

   if (consumeRequest.getMsgs().isEmpty())

   return;

   switch (status) {

   case CONSUME_SUCCESS: // 如果消费成功

   // 如果ackIndex大于等于消息的大小

   if (ackIndex = consumeRequest.getMsgs().size()) {

   // 设置为消息大小-1

   ackIndex = consumeRequest.getMsgs().size() - 1;

   // 计算消费成功的的个数

   int ok = ackIndex + 1;

   // 计算消费失败的个数

   int failed = consumeRequest.getMsgs().size() - ok;

   this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);

   this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);

   break;

   case RECONSUME_LATER: // 如果延迟消费

   // ackIndex置为-1

   ackIndex = -1;

   this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),

   consumeRequest.getMsgs().size());

   break;

   default:

   break;

   // 判断消费模式

   switch (this.defaultMQPushConsumer.getMessageModel()) {

   case BROADCASTING: // 广播模式

   for (int i = ackIndex + 1; i consumeRequest.getMsgs().size(); i++) {

   MessageExt msg = consumeRequest.getMsgs().get(i);

   log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());

   break;

   case CLUSTERING: // 集群模式

   List MessageExt msgBackFailed = new ArrayList MessageExt (consumeRequest.getMsgs().size());

   // 遍历消费失败的消息

   for (int i = ackIndex + 1; i consumeRequest.getMsgs().size(); i++) {

   // 获取消息

   MessageExt msg = consumeRequest.getMsgs().get(i);

   // 向Broker发送延迟消息

   boolean result = this.sendMessageBack(msg, context);

   // 如果发送失败

   if (!result) {

   // 消费次数+1

   msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

   // 加入失败消息列表中

   msgBackFailed.add(msg);

   // 如果不为空

   if (!msgBackFailed.isEmpty()) {

   consumeRequest.getMsgs().removeAll(msgBackFailed);

   // 稍后重新进行消费

   this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());

   break;

   default:

   break;

   // 从处理队列中移除消息

   long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());

   if (offset = 0 !consumeRequest.getProcessQueue().isDropped()) {

   // 更新拉取偏移量

   this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

  

 

  发送CONSUMER_SEND_MSG_BACK消息

  RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfig的messageDelayLevel变量中:

  

public class MessageStoreConfig {

 

   private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

  

 

  延迟级别与延迟时间对应关系:

  延迟级别0 --- 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费

  延迟级别1 --- 延迟时间5s

  延迟级别2 --- 延迟时间10s

  ...

  以此类推,最大的延迟时间为2h

  在sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImpl的sendMessageBack发送消息:

  

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

 

   public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {

   // 获取延迟级别

   int delayLevel = context.getDelayLevelWhenNextConsume();

   // 对主题添加上Namespace

   msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));

   try {

   // 向Broker发送消息

   this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());

   return true;

   } catch (Exception e) {

   log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);

   return false;

  // 并发消费上下文

  public class ConsumeConcurrentlyContext {

   * -1,不进行重试,加入DLQ队列

   * 0, Broker控制重试频率

   * 0, 客户端控制

   private int delayLevelWhenNextConsume = 0; // 默认为0

  

 

  DefaultMQPushConsumerImp的sendMessageBack方法中又调用了MQClientAPIImpl的consumerSendMessageBack方法进行发送:

  

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

 

   public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)

   throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

   try {

   // 获取Broker地址

   String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)

   : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

   // 调用consumerSendMessageBack方法发送消息

   this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,

   this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());

   } catch (Exception e) {

   // ...

   } finally {

   msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

  

 

  在MQClientAPIImpl的consumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:

  

public class MQClientAPIImpl {

 

   public void consumerSendMessageBack(

   final String addr,

   final MessageExt msg,

   final String consumerGroup,

   final int delayLevel,

   final long timeoutMillis,

   final int maxConsumeRetryTimes

   ) throws RemotingException, MQBrokerException, InterruptedException {

   // 创建请求头

   ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();

   // 设置请求类型为CONSUMER_SEND_MSG_BACK

   RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

   // 设置消费组

   requestHeader.setGroup(consumerGroup);

   requestHeader.setOriginTopic(msg.getTopic());

   // 设置消息物理偏移量

   requestHeader.setOffset(msg.getCommitLogOffset());

   // 设置延迟级别

   requestHeader.setDelayLevel(delayLevel);

   // 设置消息ID

   requestHeader.setOriginMsgId(msg.getMsgId());

   // 设置最大消费次数

   requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

   // 向Broker发送请求

   RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),

   request, timeoutMillis);

   assert response != null;

   switch (response.getCode()) {

   case ResponseCode.SUCCESS: {

   return;

   default:

   break;

   throw new MQBrokerException(response.getCode(), response.getRemark(), addr);

  

 

  Broker对请求的处理

  Broker对CONSUMER_SEND_MSG_BACK类型的请求在SendMessageProcessor中,处理逻辑如下:

  根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回

  获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息

  根据消息的物理偏移量从commitlog中获取消息

  判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:

  如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID

  如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别

  
新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:

  达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中

  未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费

  
调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】

  

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

 

   // 处理请求

   public CompletableFuture RemotingCommand asyncProcessRequest(ChannelHandlerContext ctx,

   RemotingCommand request) throws RemotingCommandException {

   final SendMessageContext mqtraceContext;

   switch (request.getCode()) {

   case RequestCode.CONSUMER_SEND_MSG_BACK:

   // 处理请求

   return this.asyncConsumerSendMsgBack(ctx, request);

   default:

   // ...

   private CompletableFuture RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx,

   RemotingCommand request) throws RemotingCommandException {

   final RemotingCommand response = RemotingCommand.createResponseCommand(null);

   final ConsumerSendMsgBackRequestHeader requestHeader =

   (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

   // ...

   // 根据消费组获取订阅信息配置

   SubscriptionGroupConfig subscriptionGroupConfig =

   this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());

   // 如果为空,直接返回

   if (null == subscriptionGroupConfig) {

   response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);

   response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "

   + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));

   return CompletableFuture.completedFuture(response);

   // ...

   // 获取消费组的重试主题

   String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

   // 从重试队列中随机选取一个队列

   int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();

   int topicSysFlag = 0;

   if (requestHeader.isUnitMode()) {

   topicSysFlag = TopicSysFlag.buildSysFlag(false, true);

   // 创建TopicConfig主题配置信息

   TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(

   newTopic,

   subscriptionGroupConfig.getRetryQueueNums(),

   PermName.PERM_WRITE PermName.PERM_READ, topicSysFlag);

   //...

   // 根据消息物理偏移量从commitLog文件中获取消息

   MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

   if (null == msgExt) {

   response.setCode(ResponseCode.SYSTEM_ERROR);

   response.setRemark("look message by offset failed, " + requestHeader.getOffset());

   return CompletableFuture.completedFuture(response);

   // 获取消息的重试主题

   final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

   if (null == retryTopic) {

   MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());

   msgExt.setWaitStoreMsgOK(false);

   // 延迟等级获取

   int delayLevel = requestHeader.getDelayLevel();

   // 获取最大消费重试次数

   int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();

   if (request.getVersion() = MQVersion.Version.V3_4_9.ordinal()) {

   Integer times = requestHeader.getMaxReconsumeTimes();

   if (times != null) {

   maxReconsumeTimes = times;

   // 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0

   if (msgExt.getReconsumeTimes() = maxReconsumeTimes

   delayLevel 0) {

   // 获取DLQ主题

   newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

   // 选取一个队列

   queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;

   // 创建DLQ的topicConfig

   topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,

   DLQ_NUMS_PER_GROUP,

   PermName.PERM_WRITE PermName.PERM_READ, 0);

   // ...

   } else {

   // 如果延迟级别为0

   if (0 == delayLevel) {

   // 更新延迟级别

   delayLevel = 3 + msgExt.getReconsumeTimes();

   // 设置延迟级别

   msgExt.setDelayTimeLevel(delayLevel);

   // 新建消息

   MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

   msgInner.setTopic(newTopic); // 设置主题

   msgInner.setBody(msgExt.getBody()); // 设置消息

   msgInner.setFlag(msgExt.getFlag());

   MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性

   msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

   msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

   msgInner.setQueueId(queueIdInt); // 设置队列ID

   msgInner.setSysFlag(msgExt.getSysFlag());

   msgInner.setBornTimestamp(msgExt.getBornTimestamp());

   msgInner.setBornHost(msgExt.getBornHost());

   msgInner.setStoreHost(msgExt.getStoreHost());

   msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数

   // 原始的消息ID

   String originMsgId = MessageAccessor.getOriginMessageId(msgExt);

   // 设置消息ID

   MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

   msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

   // 添加重试消息

   CompletableFuture PutMessageResult putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

   return putMessageResult.thenApply((r) - {

   if (r != null) {

   switch (r.getPutMessageStatus()) {

   case PUT_OK:

   // ...

   return response;

   default:

   break;

   response.setCode(ResponseCode.SYSTEM_ERROR);

   response.setRemark(r.getPutMessageStatus().name());

   return response;

   response.setCode(ResponseCode.SYSTEM_ERROR);

   response.setRemark("putMessageResult is null");

   return response;

  

 

  延迟消息处理

  由【消息的存储】文章可知,消息添加会进入到asyncPutMessage方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  
获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:

  

public class TopicValidator {

 

   // ...

   public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

  

 

  
更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理

  


public class CommitLog {

 

   public CompletableFuture PutMessageResult asyncPutMessage(final MessageExtBrokerInner msg) {

   // ...

   // 获取事务类型

   final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

   // 如果未使用事务或者提交事务

   if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE

   tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

   // 判断延迟级别

   if (msg.getDelayTimeLevel() 0) {

   // 如果超过了最大延迟级别

   if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

   msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

   // 获取RMQ_SYS_SCHEDULE_TOPIC

   topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;

   // 根据延迟级别选取对应的队列

   int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

   // 备份之前的TOPIC和队列ID

   MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

   MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

   msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

   // 设置SCHEDULE_TOPIC

   msg.setTopic(topic);

   msg.setQueueId(queueId);

   // ...

  

 

  拉取进度持久化

  RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

  LocalFileOffsetStore中使用了一个ConcurrentMap类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。

  在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中:

  

public class LocalFileOffsetStore implements OffsetStore {

 

   // offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量

   private ConcurrentMap MessageQueue, AtomicLong offsetTable =

   new ConcurrentHashMap MessageQueue, AtomicLong

   @Override

   public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {

   if (mq != null) {

   // 获取之前的拉取进度

   AtomicLong offsetOld = this.offsetTable.get(mq);

   if (null == offsetOld) {

   // 如果之前不存在,进行创建

   offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));

   // 如果不为空

   if (null != offsetOld) {

   if (increaseOnly) {

   MixAll.compareAndIncreaseOnly(offsetOld, offset);

   } else {

   // 更新拉取偏移量

   offsetOld.set(offset);

  

 

  由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。

  LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径

  

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(

 

   "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

  

 

  在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:

  

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +

 

   this.groupName + File.separator + "offsets.json";

  

 

  在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:

  

 public class LocalFileOffsetStore implements OffsetStore {

 

   // 文件路径

   public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(

   "rocketmq.client.localOffsetStoreDir",

   System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

   private final String storePath;

   // ...

   public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {

   this.mQClientFactory = mQClientFactory;

   this.groupName = groupName;

   // 设置拉取进度文件的路径

   this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +

   this.mQClientFactory.getClientId() + File.separator +

   this.groupName + File.separator +

   "offsets.json";

   @Override

   public void load() throws MQClientException {

   // 从本地读取拉取偏移量

   OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();

   if (offsetSerializeWrapper != null offsetSerializeWrapper.getOffsetTable() != null) {

   // 加入到offsetTable中

   offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

   for (Entry MessageQueue, AtomicLong mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {

   AtomicLong offset = mqEntry.getValue();

   log.info("load consumers offset, {} {} {}",

   this.groupName,

   mqEntry.getKey(),

   offset.get());

   // 从本地加载文件

   private OffsetSerializeWrapper readLocalOffset() throws MQClientException {

   String content = null;

   try {

   // 读取文件

   content = MixAll.file2String(this.storePath);

   } catch (IOException e) {

   log.warn("Load local offset store file exception", e);

   if (null == content content.length() == 0) {

   return this.readLocalOffsetBak();

   } else {

   OffsetSerializeWrapper offsetSerializeWrapper = null;

   try {

   // 序列化

   offsetSerializeWrapper =

   OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);

   } catch (Exception e) {

   log.warn("readLocalOffset Exception, and try to correct", e);

   return this.readLocalOffsetBak();

   return offsetSerializeWrapper;

  

 

  OffsetSerializeWrapper

  OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapper的offsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStore的offsetTable中:

  

public class OffsetSerializeWrapper extends RemotingSerializable {

 

   private ConcurrentMap MessageQueue, AtomicLong offsetTable =

   new ConcurrentHashMap MessageQueue, AtomicLong

   public ConcurrentMap MessageQueue, AtomicLong getOffsetTable() {

   return offsetTable;

   public void setOffsetTable(ConcurrentMap MessageQueue, AtomicLong offsetTable) {

   this.offsetTable = offsetTable;

  

 

  持久化进度

  updateOffset更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:

  创建OffsetSerializeWrapper对象

  遍历LocalFileOffsetStore的offsetTabl。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: