【RocketMQ】消息的存储(rocketmq消息存多久)

  本篇文章为你整理了【RocketMQ】消息的存储(rocketmq消息存多久)的详细内容,包含有rocketmq broker queue消息存储 rocketmq消息存多久 rocketmq的消息类型 rocketmq消息堆积怎么处理 【RocketMQ】消息的存储,希望能帮助你了解 【RocketMQ】消息的存储。

  Broker对消息的处理

  BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:

  

public class BrokerController {

 

   // 初始化

   public boolean initialize() throws CloneNotSupportedException {

   // ...

   // 注册处理器

   this.registerProcessor();

   // ...

   // 注册处理器

   public void registerProcessor() {

   * 发送消息处理器

   SendMessageProcessor sendProcessor = new SendMessageProcessor(this);

   // ...

   // 注册消息发送处理器

   this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);

   this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);

   // 省略其他注册...

  

 

  在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessor的processRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:

  

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

 

   // 处理请求

   @Override

   public RemotingCommand processRequest(ChannelHandlerContext ctx,

   RemotingCommand request) throws RemotingCommandException {

   RemotingCommand response = null;

   try {

   // 处理请求

   response = asyncProcessRequest(ctx, request).get();

   } catch (InterruptedException ExecutionException e) {

   log.error("process SendMessage error, request : " + request.toString(), e);

   return response;

   // 异步处理请求

   public CompletableFuture RemotingCommand asyncProcessRequest(ChannelHandlerContext ctx,

   RemotingCommand request) throws RemotingCommandException {

   final SendMessageContext mqtraceContext;

   switch (request.getCode()) {

   case RequestCode.CONSUMER_SEND_MSG_BACK:

   return this.asyncConsumerSendMsgBack(ctx, request);

   default:

   // 解析请求头

   SendMessageRequestHeader requestHeader = parseRequestHeader(request);

   // ...

   if (requestHeader.isBatch()) {

   // 批量消息发送处理

   return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);

   } else {

   // 单个消息发送处理

   return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);

   // 单个消息发送处理

   private CompletableFuture RemotingCommand asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,

   SendMessageContext mqtraceContext,

   SendMessageRequestHeader requestHeader) {

   // ...

   CompletableFuture PutMessageResult putMessageResult = null;

   String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);

   // 是否使用事务

   if (transFlag != null Boolean.parseBoolean(transFlag)) {

   if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {

   response.setCode(ResponseCode.NO_PERMISSION);

   response.setRemark(

   "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()

   + "] sending transaction message is forbidden");

   return CompletableFuture.completedFuture(response);

   // 事务处理

   putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);

   } else {

   // 消息持久化

   putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

   return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);

  

 

  以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:

  创建MessageExtBrokerInner对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中

  判断是否使用了事务,如果未使用事务调用brokerController的getMessageStore方法获取MessageStore对象,然后调用asyncPutMessage方法对消息进行持久化存储

  返回消息的存储结果

  

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

 

   // 单个消息发送处理

   private CompletableFuture RemotingCommand asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,

   SendMessageContext mqtraceContext,

   SendMessageRequestHeader requestHeader) {

   // ...

   // 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息

   MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

   // 设置主题

   msgInner.setTopic(requestHeader.getTopic());

   // 设置消息所在的队列ID

   msgInner.setQueueId(queueIdInt);

   if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {

   return CompletableFuture.completedFuture(response);

   // 设置消息内容

   msgInner.setBody(body);

   msgInner.setFlag(requestHeader.getFlag());

   // 设置属性

   Map String, String origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());

   MessageAccessor.setProperties(msgInner, origProps);

   // 设置发送消息时间

   msgInner.setBornTimestamp(requestHeader.getBornTimestamp());

   // 设置发送消息的主机地址

   msgInner.setBornHost(ctx.channel().remoteAddress());

   // 设置存储消息的主机地址

   msgInner.setStoreHost(this.getStoreHost());

   msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

   String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();

   // 属性中添加集群名称

   MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);

   // 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK

   if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {

   String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);

   // 设置消息属性

   msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

   origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);

   } else {

   msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

   CompletableFuture PutMessageResult putMessageResult = null;

   String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);

   // 是否使用事务

   if (transFlag != null Boolean.parseBoolean(transFlag)) {

   if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {

   response.setCode(ResponseCode.NO_PERMISSION);

   response.setRemark(

   "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()

   + "] sending transaction message is forbidden");

   return CompletableFuture.completedFuture(response);

   // 事务处理

   putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);

   } else {

   // 消息写入

   putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

   // 返回消息持久化结果

   return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);

  

 

  MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:

  

public class BrokerController {

 

   private MessageStore messageStore;

   public boolean initialize() throws CloneNotSupportedException {

   boolean result = this.topicConfigManager.load();

   // ...

   if (result) {

   try {

   // 创建DefaultMessageStore

   this.messageStore =

   new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,

   this.brokerConfig);

   // ...

   } catch (IOException e) {

   result = false;

   log.error("Failed to initialize", e);

   // 获取MessageStore

   public MessageStore getMessageStore() {

   return messageStore;

  

 

  DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog,DLedgerCommitLog是CommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。

  在DefaultMessageStore的asyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog的asyncPutMessage进行消息写入:

  

public class DefaultMessageStore implements MessageStore {

 

   private final CommitLog commitLog; // CommitLog

   public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,

   final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {

   // ...

   // 如果启用了Dleger

   if (messageStoreConfig.isEnableDLegerCommitLog()) {

   // 使用DLedgerCommitLog

   this.commitLog = new DLedgerCommitLog(this);

   } else {

   // 否则使用CommitLog

   this.commitLog = new CommitLog(this);

   // ...

   @Override

   public CompletableFuture PutMessageResult asyncPutMessage(MessageExtBrokerInner msg) {

   // 校验存储状态

   PutMessageStatus checkStoreStatus = this.checkStoreStatus();

   if (checkStoreStatus != PutMessageStatus.PUT_OK) {

   return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));

   // 校验消息合法性

   PutMessageStatus msgCheckStatus = this.checkMessage(msg);

   if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {

   return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));

   // 进行一系列校验

   PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);

   if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {

   return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));

   long beginTime = this.getSystemClock().now();

   // 调用CommitLog的asyncPutMessage方法写入消息

   CompletableFuture PutMessageResult putResultFuture = this.commitLog.asyncPutMessage(msg);

   putResultFuture.thenAccept((result) - {

   long elapsedTime = this.getSystemClock().now() - beginTime;

   if (elapsedTime 500) {

   log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);

   this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

   if (null == result !result.isOk()) {

   this.storeStatsService.getPutMessageFailedTimes().add(1);

   return putResultFuture;

  

 

  合法性校验

  Broker存储检查

  checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:

  MessageStore是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储

  Broker是否是从节点,从节点只能读不能写

  Broker是否有写权限,如果没有写入权限,不能进行写入操作

  操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作

  

 private PutMessageStatus checkStoreStatus() {

 

   // 是否处于停止状态

   if (this.shutdown) {

   log.warn("message store has shutdown, so putMessage is forbidden");

   return PutMessageStatus.SERVICE_NOT_AVAILABLE;

   // 是否SLAVE角色

   if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {

   long value = this.printTimes.getAndIncrement();

   if ((value % 50000) == 0) {

   log.warn("broke role is slave, so putMessage is forbidden");

   return PutMessageStatus.SERVICE_NOT_AVAILABLE;

   // 是否可写

   if (!this.runningFlags.isWriteable()) {

   long value = this.printTimes.getAndIncrement();

   if ((value % 50000) == 0) {

   log.warn("the message store is not writable. It may be caused by one of the following reasons: " +

   "the brokers disk is full, write to logic queue error, write to index file error, etc");

   return PutMessageStatus.SERVICE_NOT_AVAILABLE;

   } else {

   this.printTimes.set(0);

   // 操作系统是否处于PAGECACHE繁忙状态

   if (this.isOSPageCacheBusy()) {

   return PutMessageStatus.OS_PAGECACHE_BUSY;

   return PutMessageStatus.PUT_OK;

  

 

  消息长度检查

  checkMessage方法主要是对主题的长度校验和消息属性的长度校验:

  

 private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {

 

   // 如果主题的长度大于最大值

   if (msg.getTopic().length() Byte.MAX_VALUE) {

   log.warn("putMessage message topic length too long " + msg.getTopic().length());

   return PutMessageStatus.MESSAGE_ILLEGAL;

   // 如果消息属性长度大于最大值

   if (msg.getPropertiesString() != null msg.getPropertiesString().length() Short.MAX_VALUE) {

   log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());

   return PutMessageStatus.MESSAGE_ILLEGAL;

   return PutMessageStatus.PUT_OK;

  

 

  checkLmqMessage

  checkLmqMessage主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量:

  

 private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {

 

   // 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量

   if (msg.getProperties() != null

   StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))

   this.isLmqConsumeQueueNumExceeded()) {

   return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;

   return PutMessageStatus.PUT_OK;

   private boolean isLmqConsumeQueueNumExceeded() {

   // 开启了LMQ 开启了多个队列分发 消费数量大于了限定值

   if (this.getMessageStoreConfig().isEnableLmq() this.getMessageStoreConfig().isEnableMultiDispatch()

   this.lmqConsumeQueueNum.get() this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {

   return true;

   return false;

  

 

  对消息进行校验完毕之后,调用了CommitLog的asyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:

  
消息内容的CRC校验和

  如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识

  
获取当前线程绑定的PutMessageThreadLocal对象,里面有一个MessageExtEncoder类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner的setEncodedBuff方法将buffer设置到encodedBuff中

  
加锁,从mappedFileQueue中获取上一次使用的映射文件mappedFile,并更新消息的存储时间, 如果mappedFile为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果

  mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1G

  mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录

  
调用mappedFile的appendMessage方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中

  
调用submitFlushRequest方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘

  
private final ThreadLocal PutMessageThreadLocal putMessageThreadLocal;

   // 写入消息的回调函数

   private final AppendMessageCallback appendMessageCallback;

   public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数

   //...

   // 创建回调函数

   this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());

   //...

   public CompletableFuture PutMessageResult asyncPutMessage(final MessageExtBrokerInner msg) {

   // 设置存储时间

   msg.setStoreTimestamp(System.currentTimeMillis());

   // 设置消息的CRC值

   msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

   // 写入结果

   AppendMessageResult result = null;

   // 获取存储统计服务

   StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

   // 获取主题

   String topic = msg.getTopic();

   // 获取事务类型

   final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

   if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE

   tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

   // 省略事务相关处理

   // 获取发送消息的主机地址

   InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();

   if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6

   msg.setBornHostV6Flag(); // 设置IPV6标识

   // 获取存储消息的主机地址

   InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();

   if (storeSocketAddress.getAddress() instanceof Inet6Address) {

   msg.setStoreHostAddressV6Flag(); // 设置IPV6标识

   // 获取当前线程绑定的PutMessageThreadLocal对象

   PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();

   // 调用encode方法对消息进行编码,并写入buffer

   PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);

   if (encodeResult != null) {

   return CompletableFuture.completedFuture(encodeResult);

   // 将存储编码消息的buffer设置到msg中

   msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);

   // 创建PutMessageContext

   PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

   long elapsedTimeInLock = 0;

   MappedFile unlockMappedFile = null;

   // 加锁

   putMessageLock.lock();

   try {

   // 获取上一次写入的文件

   MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

   // 获取系统时间戳

   long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();

   this.beginTimeInLock = beginLockTimestamp;

   // 再次更新存储时间戳,保证全局顺序

   msg.setStoreTimestamp(beginLockTimestamp);

   // 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件

   if (null == mappedFile mappedFile.isFull()) {

   // 使用偏移量0创建一个新的文件

   mappedFile = this.mappedFileQueue.getLastMappedFile(0);

   // 如果依旧为空

   if (null == mappedFile) {

   // 提示错误

   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());

   return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));

   // 写入消息

   result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

   // ...

   elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;

   } finally {

   beginTimeInLock = 0;

   putMessageLock.unlock();

   // ...

   PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

   // 统计相关

   storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);

   storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

   // 执行刷盘

   CompletableFuture PutMessageStatus flushResultFuture = submitFlushRequest(result, msg);

   CompletableFuture PutMessageStatus replicaResultFuture = submitReplicaRequest(result, msg);

   return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) - {

   if (flushStatus != PutMessageStatus.PUT_OK) {

   putMessageResult.setPutMessageStatus(flushStatus);

   if (replicaStatus != PutMessageStatus.PUT_OK) {

   putMessageResult.setPutMessageStatus(replicaStatus);

   // 返回结果

   return putMessageResult;

  

 

 

  写入内存Buffer

  MessageExtEncoder是CommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:

  

public class CommitLog {

 

   // ThreadLocal

   private final ThreadLocal PutMessageThreadLocal putMessageThreadLocal;

   // 添加消息的ThreadLocal对象

   static class PutMessageThreadLocal {

   private MessageExtEncoder encoder; // 引用MessageExtEncoder

   private StringBuilder keyBuilder;

   PutMessageThreadLocal(int size) {

   // 创建MessageExtEncoder,size用来指定分配内存的大小

   encoder = new MessageExtEncoder(size);

   keyBuilder = new StringBuilder();

   // ...

  

 

  MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区:

  对消息属性数据的长度进行校验判断是否超过限定值

  对总消息内容长度进行校验,判断是否超过最大的长度限制

  根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域

  将消息相关信息写入buffer:

  写入消息长度

  写入消息体CRC校验和

  写入队列ID

  队列的偏移量, 需要注意这里还没达到偏移量的值,先占位稍后写入

  文件的物理偏移量, 先占位稍后写入

  写入系统标识

  写入发送消息的时间戳

  写入发送消息的主机地址

  写入存储时间戳

  写入存储消息的主机地址

  RECONSUMETIMES

  Prepared Transaction Offset

  写入消息体长度和消息内容

  写入主题长度

  写入属性长度和属性内容

  
// 对消息进行编码并写入buffer

   protected PutMessageResult encode(MessageExtBrokerInner msgInner) {

   // 消息属性数据

   final byte[] propertiesData =

   msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

   // 属性数据长度

   final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

   // 校验长度是否超过最大值

   if (propertiesLength Short.MAX_VALUE) {

   log.warn("putMessage message properties length too long. length={}", propertiesData.length);

   return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);

   // 获取主题数据

   final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);

   final int topicLength = topicData.length;// 主题数据长度

   // 获取消息体内容长度

   final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

   // 总消息内容长度

   final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

   // 是否超过最大长度限制

   if (msgLen this.maxMessageSize) {

   CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength

   + ", maxMessageSize: " + this.maxMessageSize);

   return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);

   // 初始化

   this.resetByteBuffer(encoderBuffer, msgLen);

   // 1 写入消息长度

   this.encoderBuffer.putInt(msgLen);

   // 2 写入魔数

   this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);

   // 3 写入消息体CRC校验和

   this.encoderBuffer.putInt(msgInner.getBodyCRC());

   // 4 写入队列ID

   this.encoderBuffer.putInt(msgInner.getQueueId());

   // 5 写入标识

   this.encoderBuffer.putInt(msgInner.getFlag());

   // 6 队列的偏移量, 稍后写入

   this.encoderBuffer.putLong(0);

   // 7 文件的物理偏移量, 稍后写入

   this.encoderBuffer.putLong(0);

   // 8 写入系统标识

   this.encoderBuffer.putInt(msgInner.getSysFlag());

   // 9 写入发送消息的时间戳

   this.encoderBuffer.putLong(msgInner.getBornTimestamp());

   // 10 写入发送消息的主机地址

   socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);

   // 11 写入存储时间戳

   this.encoderBuffer.putLong(msgInner.getStoreTimestamp());

   // 12 写入存储消息的主机地址

   socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);

   // 13 RECONSUMETIMES

   this.encoderBuffer.putInt(msgInner.getReconsumeTimes());

   // 14 Prepared Transaction Offset

   this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());

   // 15 写入消息体长度

   this.encoderBuffer.putInt(bodyLength);

   if (bodyLength 0)

   this.encoderBuffer.put(msgInner.getBody());// 写入消息内容

   // 16 写入主题长度

   this.encoderBuffer.put((byte) topicLength);

   // 写入主题

   this.encoderBuffer.put(topicData);

   // 17 写入属性长度

   this.encoderBuffer.putShort((short) propertiesLength);

   if (propertiesLength 0)

   this.encoderBuffer.put(propertiesData); // 写入属性数据

   encoderBuffer.flip();

   return null;

  

 

 

  写入内存映射文件

  前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:
 

  ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。

  MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。

  MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:

  第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。

  第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。

  综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。

  

public class MappedFile extends ReferenceResource {

 

   // 记录文件的写入位置

   protected final AtomicInteger wrotePosition = new AtomicInteger(0);

   // 文件大小

   protected int fileSize;

   // 字节buffer

   protected ByteBuffer writeBuffer = null;

   // 文件映射

   private MappedByteBuffer mappedByteBuffer;

   // 暂存池,类似线程池,只不过池中存放的是申请的内存

   protected TransientStorePool transientStorePool = null;

   // 初始化

   public void init(final String fileName, final int fileSize,

   final TransientStorePool transientStorePool) throws IOException {

   init(fileName, fileSize);

   // 从暂存池中获取一块内存

   this.writeBuffer = transientStorePool.borrowBuffer();

   this.transientStorePool = transientStorePool;

   // 初始化

   private void init(final String fileName, final int fileSize) throws IOException {

   // ...

   try {

   // 获取文件

   this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();

   // 进行文件映射

   this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

   TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);

   TOTAL_MAPPED_FILES.incrementAndGet();

   ok = true;

   } catch (FileNotFoundException e) {

   // ...

   } catch (IOException e) {

   // ...

   } finally {

   if (!ok this.fileChannel != null) {

   this.fileChannel.close();

  // 暂存池

  public class TransientStorePool {

   private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

   private final int poolSize; // 暂存池大小

   private final int fileSize; // 申请的每一块内存大小

   private final Deque ByteBuffer availableBuffers; // 双端队列,存放申请的内存

   private final MessageStoreConfig storeConfig; // 存储配置

   public TransientStorePool(final MessageStoreConfig storeConfig) {

   this.storeConfig = storeConfig;

   this.poolSize = storeConfig.getTransientStorePoolSize();

   this.fileSize = storeConfig.getMappedFileSizeCommitLog();

   this.availableBuffers = new ConcurrentLinkedDeque ();

   * 初始化

   public void init() {

   // 根据暂存池大小申请内存

   for (int i = 0; i poolSize; i++) {

   // 申请直接内存

   ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

   final long address = ((DirectBuffer) byteBuffer).address();

   Pointer pointer = new Pointer(address);

   LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

   // 放入到暂存池中

   availableBuffers.offer(byteBuffer);

  

 

  经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:

  
MappedFile中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息

  
如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的slice方法创建一个与MappedFile共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中
 

  
调用回调函数的doAppend方法进行写入,前面可知回调函数是DefaultAppendMessageCallback类型的

  
// 记录文件的写入位置

   protected final AtomicInteger wrotePosition = new AtomicInteger(0);

   // 文件大小

   protected int fileSize;

   // 字节buffer

   protected ByteBuffer writeBuffer = null;

   // 文件映射

   private MappedByteBuffer mappedByteBuffer;

   // 写入消息

   public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,

   PutMessageContext putMessageContext) {

   // 调用appendMessagesInner

   return appendMessagesInner(msg, cb, putMessageContext);

   public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,

   PutMessageContext putMessageContext) {

   assert messageExt != null;

   assert cb != null;

   // 获取写入位置

   int currentPos = this.wrotePosition.get();

   // 如果写指针小于文件大小

   if (currentPos this.fileSize) {

   // 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer

   ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();

   // 设置共享内存区的写入位置

   byteBuffer.position(currentPos);

   AppendMessageResult result;

   if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理

   // 通过共享内存区byteBuffer写入数据

   result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,

   (MessageExtBrokerInner) messageExt, putMessageContext);

   } else if (messageExt instanceof MessageExtBatch) { // 批量消息

   // 通过共享内存区byteBuffer写入数据

   result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,

   (MessageExtBatch) messageExt, putMessageContext);

   } else {

   return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

   // 更新MappedFile的写入位置

   this.wrotePosition.addAndGet(result.getWroteBytes());

   this.storeTimestamp = result.getStoreTimestamp();

   return result;

   log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);

   return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

  

 

 

  进入到DefaultAppendMessageCallback的doAppend方法中,首先来看方法的入参:

  fileFromOffset:文件的起始位置偏移量

  byteBuffer:缓冲。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: