本篇文章为你整理了【RocketMQ】消息的消费(rocketmq消息模型)的详细内容,包含有rocketmq消息类型有多少种 rocketmq消息模型 rocketmq消息堆积消费慢 rocketmq消费过程 【RocketMQ】消息的消费,希望能帮助你了解 【RocketMQ】消息的消费。
上一讲【RocketMQ】消息的拉取
当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyService的submitConsumeRequest方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:
如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可
如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费
如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交
RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
@Override
public void submitConsumeRequest(
final List MessageExt msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 如果消息的个数小于等于批量消费的大小
if (msgs.size() = consumeBatchSize) {
// 构建消费请求
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 加入到消费线程池中
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
} else {
// 遍历消息
for (int total = 0; total msgs.size(); ) {
// 创建消息列表,大小为consumeBatchSize,用于批量提交使用
List MessageExt msgThis = new ArrayList MessageExt (consumeBatchSize);
for (int i = 0; i consumeBatchSize; i++, total++) {
if (total msgs.size()) {
// 加入到消息列表中
msgThis.add(msgs.get(total));
} else {
break;
// 创建ConsumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// 加入到消费线程池中
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total msgs.size(); total++) {
msgThis.add(msgs.get(total));
// 如果出现拒绝提交异常,延迟进行提交
this.submitConsumeRequestLater(consumeRequest);
消费任务运行
ConsumeRequest是ConsumeMessageConcurrentlyService的内部类,实现了Runnable接口,在run方法中,对消费任务进行了处理:
因为延迟消息的主题在后续处理的时候被设置为SCHEDULE_TOPIC_XXXX,所以这里需要重置。
获取消息监听器,调用消息监听器的consumeMessage进行消息消费,并返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESS和RECONSUME_LATER
CONSUME_SUCCESS:表示消息消费成功。
RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。
再次判断消息所属的处理队列是否处于删除状态,如果不处于删除状态,调用processConsumeResult方法处理消费结果
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final List MessageExt msgs;
private final ProcessQueue processQueue; // 处理队列
private final MessageQueue messageQueue; // 消息队列
@Override
public void run() {
// 如果处理队列已被删除
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because its dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
// 获取消息监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 重置消息重试主题名称
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
// 如果设置了钩子函数
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
// ...
// 执行钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
// 设置消费开始时间戳
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
// 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
// 计算消费时长
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
// 出现异常
returnType = ConsumeReturnType.EXCEPTION;
} else {
// 返回NULL
returnType = ConsumeReturnType.RETURNNULL;
} else if (consumeRT = defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时
returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费
returnType = ConsumeReturnType.FAILED; // 返回类置为失败
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态
returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功
// ...
// 如果消费状态为空
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
// 状态置为延迟消费
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
// 如果设置了钩子函数
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
// 执行executeHookAfter方法
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
// 处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
// 重置消息重试主题
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void resetRetryAndNamespace(final List MessageExt msgs, String consumerGroup) {
// 获取消费组的重试主题:%RETRY% + 消费组名称
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
// 获取消息的重试主题名称
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
// 如果重试主题不为空并且与消费组的重试主题一致
if (retryTopic != null groupTopic.equals(msg.getTopic())) {
// 设置重试主题
msg.setTopic(retryTopic);
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
// 消费结果状态
public enum ConsumeConcurrentlyStatus {
* 消费成功
CONSUME_SUCCESS,
* 消费失败,延迟进行消费
RECONSUME_LATER;
处理消费结果
一、设置ackIndex
ackIndex的值用来判断失败消息的个数,在processConsumeResult方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:
CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。
二、处理消费失败的消息
广播模式
广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。
集群模式
开启for循环,初始值为i = ackIndex + 1,结束条件为i consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:
消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。
三、移除消息,更新拉取偏移量
以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
// 获取ackIndex
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS: // 如果消费成功
// 如果ackIndex大于等于消息的大小
if (ackIndex = consumeRequest.getMsgs().size()) {
// 设置为消息大小-1
ackIndex = consumeRequest.getMsgs().size() - 1;
// 计算消费成功的的个数
int ok = ackIndex + 1;
// 计算消费失败的个数
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER: // 如果延迟消费
// ackIndex置为-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
// 判断消费模式
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 广播模式
for (int i = ackIndex + 1; i consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
break;
case CLUSTERING: // 集群模式
List MessageExt msgBackFailed = new ArrayList MessageExt (consumeRequest.getMsgs().size());
// 遍历消费失败的消息
for (int i = ackIndex + 1; i consumeRequest.getMsgs().size(); i++) {
// 获取消息
MessageExt msg = consumeRequest.getMsgs().get(i);
// 向Broker发送延迟消息
boolean result = this.sendMessageBack(msg, context);
// 如果发送失败
if (!result) {
// 消费次数+1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
// 加入失败消息列表中
msgBackFailed.add(msg);
// 如果不为空
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 稍后重新进行消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
break;
default:
break;
// 从处理队列中移除消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset = 0 !consumeRequest.getProcessQueue().isDropped()) {
// 更新拉取偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
发送CONSUMER_SEND_MSG_BACK消息
RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfig的messageDelayLevel变量中:
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
延迟级别与延迟时间对应关系:
延迟级别0 --- 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 --- 延迟时间5s
延迟级别2 --- 延迟时间10s
...
以此类推,最大的延迟时间为2h
在sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImpl的sendMessageBack发送消息:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// 获取延迟级别
int delayLevel = context.getDelayLevelWhenNextConsume();
// 对主题添加上Namespace
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
// 向Broker发送消息
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
return false;
// 并发消费上下文
public class ConsumeConcurrentlyContext {
* -1,不进行重试,加入DLQ队列
* 0, Broker控制重试频率
* 0, 客户端控制
private int delayLevelWhenNextConsume = 0; // 默认为0
DefaultMQPushConsumerImp的sendMessageBack方法中又调用了MQClientAPIImpl的consumerSendMessageBack方法进行发送:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
// 获取Broker地址
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
// 调用consumerSendMessageBack方法发送消息
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
// ...
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
在MQClientAPIImpl的consumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:
public class MQClientAPIImpl {
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
// 创建请求头
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
// 设置请求类型为CONSUMER_SEND_MSG_BACK
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
// 设置消费组
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
// 设置消息物理偏移量
requestHeader.setOffset(msg.getCommitLogOffset());
// 设置延迟级别
requestHeader.setDelayLevel(delayLevel);
// 设置消息ID
requestHeader.setOriginMsgId(msg.getMsgId());
// 设置最大消费次数
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
// 向Broker发送请求
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
default:
break;
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
Broker对请求的处理
Broker对CONSUMER_SEND_MSG_BACK类型的请求在SendMessageProcessor中,处理逻辑如下:
根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息
根据消息的物理偏移量从commitlog中获取消息
判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别
新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:
达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费
调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 处理请求
public CompletableFuture RemotingCommand asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
// 处理请求
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// ...
private CompletableFuture RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
// ...
// 根据消费组获取订阅信息配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
// 如果为空,直接返回
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return CompletableFuture.completedFuture(response);
// ...
// 获取消费组的重试主题
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 从重试队列中随机选取一个队列
int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
// 创建TopicConfig主题配置信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE PermName.PERM_READ, topicSysFlag);
//...
// 根据消息物理偏移量从commitLog文件中获取消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
// 获取消息的重试主题
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
msgExt.setWaitStoreMsgOK(false);
// 延迟等级获取
int delayLevel = requestHeader.getDelayLevel();
// 获取最大消费重试次数
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() = MQVersion.Version.V3_4_9.ordinal()) {
Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
// 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
if (msgExt.getReconsumeTimes() = maxReconsumeTimes
delayLevel 0) {
// 获取DLQ主题
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
// 选取一个队列
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
// 创建DLQ的topicConfig
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE PermName.PERM_READ, 0);
// ...
} else {
// 如果延迟级别为0
if (0 == delayLevel) {
// 更新延迟级别
delayLevel = 3 + msgExt.getReconsumeTimes();
// 设置延迟级别
msgExt.setDelayTimeLevel(delayLevel);
// 新建消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic); // 设置主题
msgInner.setBody(msgExt.getBody()); // 设置消息
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt); // 设置队列ID
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数
// 原始的消息ID
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
// 设置消息ID
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
// 添加重试消息
CompletableFuture PutMessageResult putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) - {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
// ...
return response;
default:
break;
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
延迟消息处理
由【消息的存储】文章可知,消息添加会进入到asyncPutMessage方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:
获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:
public class TopicValidator {
// ...
public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理
public class CommitLog {
public CompletableFuture PutMessageResult asyncPutMessage(final MessageExtBrokerInner msg) {
// ...
// 获取事务类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果未使用事务或者提交事务
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 判断延迟级别
if (msg.getDelayTimeLevel() 0) {
// 如果超过了最大延迟级别
if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
// 获取RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根据延迟级别选取对应的队列
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 备份之前的TOPIC和队列ID
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置SCHEDULE_TOPIC
msg.setTopic(topic);
msg.setQueueId(queueId);
// ...
拉取进度持久化
RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。
LocalFileOffsetStore中使用了一个ConcurrentMap类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中:
public class LocalFileOffsetStore implements OffsetStore {
// offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量
private ConcurrentMap MessageQueue, AtomicLong offsetTable =
new ConcurrentHashMap MessageQueue, AtomicLong
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 获取之前的拉取进度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
// 如果之前不存在,进行创建
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
// 如果不为空
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
// 更新拉取偏移量
offsetOld.set(offset);
由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。
LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator + "offsets.json";
在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:
public class LocalFileOffsetStore implements OffsetStore {
// 文件路径
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final String storePath;
// ...
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
// 设置拉取进度文件的路径
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
@Override
public void load() throws MQClientException {
// 从本地读取拉取偏移量
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null offsetSerializeWrapper.getOffsetTable() != null) {
// 加入到offsetTable中
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (Entry MessageQueue, AtomicLong mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
AtomicLong offset = mqEntry.getValue();
log.info("load consumers offset, {} {} {}",
this.groupName,
mqEntry.getKey(),
offset.get());
// 从本地加载文件
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 读取文件
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
if (null == content content.length() == 0) {
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
// 序列化
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
return offsetSerializeWrapper;
OffsetSerializeWrapper
OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapper的offsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStore的offsetTable中:
public class OffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentMap MessageQueue, AtomicLong offsetTable =
new ConcurrentHashMap MessageQueue, AtomicLong
public ConcurrentMap MessageQueue, AtomicLong getOffsetTable() {
return offsetTable;
public void setOffsetTable(ConcurrentMap MessageQueue, AtomicLong offsetTable) {
this.offsetTable = offsetTable;
持久化进度
updateOffset更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:
创建OffsetSerializeWrapper对象
遍历LocalFileOffsetStore的offsetTabl。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。