【RocketMQ】Dledger日志复制源码分析(rocketmq_client.log这个日志如何清除)

  本篇文章为你整理了【RocketMQ】Dledger日志复制源码分析(rocketmq_client.log这个日志如何清除)的详细内容,包含有rocketmq client配置日志路径 rocketmq_client.log这个日志如何清除 rocketmqclient日志屏蔽 rocketmq异步复制 【RocketMQ】Dledger日志复制源码分析,希望能帮助你了解 【RocketMQ】Dledger日志复制源码分析。

  在 【RocketMQ】消息的存储一文中提到,Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息,在DLedger模式下使用的是DLedgerCommitLog,进入asyncPutMessages方法,主要处理逻辑如下:

  调用serialize方法将消息数据序列化;

  构建批量消息追加请求BatchAppendEntryRequest,并设置上一步序列化的消息数据;

  调用handleAppend方法提交消息追加请求,进行消息写入;

  

public class DLedgerCommitLog extends CommitLog {

 

   @Override

   public CompletableFuture PutMessageResult asyncPutMessages(MessageExtBatch messageExtBatch) {

   // ...

   AppendMessageResult appendResult;

   BatchAppendFuture AppendEntryResponse dledgerFuture;

   EncodeResult encodeResult;

   // 将消息数据序列化

   encodeResult = this.messageSerializer.serialize(messageExtBatch);

   if (encodeResult.status != AppendMessageStatus.PUT_OK) {

   return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult

   .status)));

   putMessageLock.lock();

   msgIdBuilder.setLength(0);

   long elapsedTimeInLock;

   long queueOffset;

   int msgNum = 0;

   try {

   beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();

   queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);

   encodeResult.setQueueOffsetKey(queueOffset, true);

   // 创建批量追加消息请求

   BatchAppendEntryRequest request = new BatchAppendEntryRequest();

   request.setGroup(dLedgerConfig.getGroup()); // 设置group

   request.setRemoteId(dLedgerServer.getMemberState().getSelfId());

   // 从EncodeResult中获取序列化的消息数据

   request.setBatchMsgs(encodeResult.batchData);

   // 调用handleAppend将数据写入

   AppendFuture AppendEntryResponse appendFuture = (AppendFuture AppendEntryResponse ) dLedgerServer.handleAppend(request);

   if (appendFuture.getPos() == -1) {

   log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());

   return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));

   // ...

   } catch (Exception e) {

   log.error("Put message error", e);

   return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));

   } finally {

   beginTimeInDledgerLock = 0;

   putMessageLock.unlock();

   // ...

  

 

  在serialize方法中,主要是将消息数据序列化到内存buffer,由于消息可能有多条,所以开启循环读取每一条数据进行序列化:

  读取总数据大小、魔数和CRC校验和,这三步是为了让buffer的读取指针向后移动;

  读取FLAG,记在flag变量;

  读取消息长度,记在bodyLen变量;

  接下来是消息内容开始位置,将开始位置记录在bodyPos变量;

  从消息内容开始位置,读取消息内容计算CRC校验和;

  更改buffer读取指针位置,将指针从bodyPos开始移动bodyLen个位置,也就是跳过消息内容,继续读取下一个数据;

  读取消息属性长度,记录消息属性开始位置;

  获取主题信息并计算数据的长度;

  计算消息长度,并根据消息长度分配内存;

  校验消息长度是否超过限制;

  初始化内存空间,将消息的相关内容依次写入;

  返回序列化结果EncodeResult;

  

class MessageSerializer {

 

   public EncodeResult serialize(final MessageExtBatch messageExtBatch) {

   // 设置Key:top+queueId

   String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();

   int totalMsgLen = 0;

   // 获取消息数据

   ByteBuffer messagesByteBuff = messageExtBatch.wrap();

   List byte[] batchBody = new LinkedList ();

   // 获取系统标识

   int sysFlag = messageExtBatch.getSysFlag();

   int bornHostLength = (sysFlag MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;

   int storeHostLength = (sysFlag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;

   // 分配内存

   ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);

   ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

   // 是否有剩余数据未读取

   while (messagesByteBuff.hasRemaining()) {

   // 读取总大小

   messagesByteBuff.getInt();

   // 读取魔数

   messagesByteBuff.getInt();

   // 读取CRC校验和

   messagesByteBuff.getInt();

   // 读取FLAG

   int flag = messagesByteBuff.getInt();

   // 读取消息长度

   int bodyLen = messagesByteBuff.getInt();

   // 记录消息内容开始位置

   int bodyPos = messagesByteBuff.position();

   // 从消息内容开始位置,读取消息内容计算CRC校验和

   int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);

   // 更改位置,将指针从bodyPos开始移动bodyLen个位置,也就是跳过消息内容,继续读取下一个数据

   messagesByteBuff.position(bodyPos + bodyLen);

   // 读取消息属性长度

   short propertiesLen = messagesByteBuff.getShort();

   // 记录消息属性位置

   int propertiesPos = messagesByteBuff.position();

   // 更改位置,跳过消息属性

   messagesByteBuff.position(propertiesPos + propertiesLen);

   // 获取主题信息

   final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);

   // 主题字节数组长度

   final int topicLength = topicData.length;

   // 计算消息长度

   final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);

   // 根据消息长度分配内存

   ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);

   // 如果超过了最大消息大小

   if (msgLen this.maxMessageSize) {

   CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " +

   bodyLen

   + ", maxMessageSize: " + this.maxMessageSize);

   throw new RuntimeException("message size exceeded");

   // 更新总长度

   totalMsgLen += msgLen;

   // 如果超过了最大消息大小

   if (totalMsgLen maxMessageSize) {

   throw new RuntimeException("message size exceeded");

   // 初始化内存空间

   this.resetByteBuffer(msgStoreItemMemory, msgLen);

   // 1 写入长度

   msgStoreItemMemory.putInt(msgLen);

   // 2 写入魔数

   msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);

   // 3 写入CRC校验和

   msgStoreItemMemory.putInt(bodyCrc);

   // 4 写入QUEUEID

   msgStoreItemMemory.putInt(messageExtBatch.getQueueId());

   // 5 写入FLAG

   msgStoreItemMemory.putInt(flag);

   // 6 写入队列偏移量QUEUEOFFSET

   msgStoreItemMemory.putLong(0L);

   // 7 写入物理偏移量

   msgStoreItemMemory.putLong(0);

   // 8 写入系统标识SYSFLAG

   msgStoreItemMemory.putInt(messageExtBatch.getSysFlag());

   // 9 写入消息产生的时间戳

   msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp());

   // 10 BORNHOST

   resetByteBuffer(bornHostHolder, bornHostLength);

   msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));

   // 11 写入消息存储时间戳

   msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp());

   // 12 STOREHOSTADDRESS

   resetByteBuffer(storeHostHolder, storeHostLength);

   msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));

   // 13 RECONSUMETIMES

   msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes());

   // 14 Prepared Transaction Offset

   msgStoreItemMemory.putLong(0);

   // 15 写入消息内容长度

   msgStoreItemMemory.putInt(bodyLen);

   if (bodyLen 0) {

   // 写入消息内容

   msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);

   // 16 写入主题

   msgStoreItemMemory.put((byte) topicLength);

   msgStoreItemMemory.put(topicData);

   // 17 写入属性长度

   msgStoreItemMemory.putShort(propertiesLen);

   if (propertiesLen 0) {

   msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);

   // 创建字节数组

   byte[] data = new byte[msgLen];

   msgStoreItemMemory.clear();

   msgStoreItemMemory.get(data);

   // 加入到消息集合

   batchBody.add(data);

   // 返回结果

   return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen);

  

 

  将消息数据序列化之后,封装了消息追加请求,调用handleAppend方法写入消息,处理逻辑如下:

  获取当前的Term,判断当前Term对应的写入请求数量是否超过了最大值,如果未超过进入下一步,如果超过,设置响应状态为LEADER_PENDING_FULL表示处理的消息追加请求数量过多,拒绝处理当前请求;

  校验是否是批量请求:

  如果是:遍历每一个消息,为消息创建DLedgerEntry对象,调用appendAsLeader将消息写入到Leader节点, 并调用waitAck为最后最后一条消息创建异步响应对象;

  如果不是:直接为消息创建DLedgerEntry对象,调用appendAsLeader将消息写入到Leader节点并调用waitAck创建异步响应对象;

  
@Override

   public CompletableFuture AppendEntryResponse handleAppend(AppendEntryRequest request) throws IOException {

   try {

   PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());

   PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());

   // 校验是否是Leader节点,如果不是Leader抛出NOT_LEADER异常

   PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

   PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);

   // 获取当前的Term

   long currTerm = memberState.currTerm();

   // 判断Pengding请求的数量

   if (dLedgerEntryPusher.isPendingFull(currTerm)) {

   AppendEntryResponse appendEntryResponse = new AppendEntryResponse();

   appendEntryResponse.setGroup(memberState.getGroup());

   // 设置响应结果LEADER_PENDING_FULL

   appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());

   // 设置Term

   appendEntryResponse.setTerm(currTerm);

   appendEntryResponse.setLeaderId(memberState.getSelfId()); // 设置LeaderID

   return AppendFuture.newCompletedFuture(-1, appendEntryResponse);

   } else {

   if (request instanceof BatchAppendEntryRequest) { // 批量

   BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;

   if (batchRequest.getBatchMsgs() != null batchRequest.getBatchMsgs().size() != 0) {

   long[] positions = new long[batchRequest.getBatchMsgs().size()];

   DLedgerEntry resEntry = null;

   int index = 0;

   // 遍历每一个消息

   Iterator byte[] iterator = batchRequest.getBatchMsgs().iterator();

   while (iterator.hasNext()) {

   // 创建DLedgerEntry

   DLedgerEntry dLedgerEntry = new DLedgerEntry();

   // 设置消息内容

   dLedgerEntry.setBody(iterator.next());

   // 写入消息

   resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);

   positions[index++] = resEntry.getPos();

   // 为最后一个dLedgerEntry创建异步响应对象

   BatchAppendFuture AppendEntryResponse batchAppendFuture =

   (BatchAppendFuture AppendEntryResponse ) dLedgerEntryPusher.waitAck(resEntry, true);

   batchAppendFuture.setPositions(positions);

   return batchAppendFuture;

   throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +

   " with empty bodys");

   } else { // 普通消息

   DLedgerEntry dLedgerEntry = new DLedgerEntry();

   // 设置消息内容

   dLedgerEntry.setBody(request.getBody());

   // 写入消息

   DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);

   // 等待响应,创建异步响应对象

   return dLedgerEntryPusher.waitAck(resEntry, false);

   } catch (DLedgerException e) {

   // ...

  

 

 

  pendingAppendResponsesByTerm
 

  DLedgerEntryPusher中有一个pendingAppendResponsesByTerm成员变量,KEY为Term的值,VALUE是一个ConcurrentHashMap,KEY为消息的index(每条消息的编号,从0开始,后面会提到),ConcurrentMap的KEY为消息的index,value为此条消息写入请求的异步响应对象AppendEntryResponse:
 

  调用isPendingFull方法的时候,会先校验当前Term是否在pendingAppendResponsesByTerm中有对应的值,如果没有,创建一个ConcurrentHashMap进行初始化,否则获取对应的ConcurrentHashMap里面数据的个数,与MaxPendingRequestsNum做对比,校验是否超过了最大值:

  

public class DLedgerEntryPusher {

 

   // 外层的KEY为Term的值,value是一个ConcurrentMap

   // ConcurrentMap的KEY为消息的index,value为此条消息写入请求的异步响应对象AppendEntryResponse

   private Map Long, ConcurrentMap Long, TimeoutFuture AppendEntryResponse pendingAppendResponsesByTerm = new ConcurrentHashMap ();

   public boolean isPendingFull(long currTerm) {

   // 校验currTerm是否在pendingAppendResponsesByTerm中

   checkTermForPendingMap(currTerm, "isPendingFull");

   // 判断当前Term对应的写入请求数量是否超过了最大值

   return pendingAppendResponsesByTerm.get(currTerm).size() dLedgerConfig.getMaxPendingRequestsNum();

   private void checkTermForPendingMap(long term, String env) {

   // 如果pendingAppendResponsesByTerm不包含

   if (!pendingAppendResponsesByTerm.containsKey(term)) {

   logger.info("Initialize the pending append map in {} for term={}", env, term);

   // 创建一个ConcurrentHashMap加入到pendingAppendResponsesByTerm

   pendingAppendResponsesByTerm.putIfAbsent(term, new ConcurrentHashMap ());

  

 

  pendingAppendResponsesByTerm的值是在什么时候加入的?
 

  在写入Leader节点之后,调用DLedgerEntryPusher的waitAck方法(后面会讲到)的时候,如果集群中有多个节点,会为当前的请求创建AppendFuture AppendEntryResponse 响应对象加入到pendingAppendResponsesByTerm中,所以可以通过pendingAppendResponsesByTerm中存放的响应对象数量判断当前Term有多少个在等待的写入请求:

  

 // 创建响应对象

 

   AppendFuture AppendEntryResponse future;

   // 创建AppendFuture

   if (isBatchWait) {

   // 批量

   future = new BatchAppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());

   } else {

   future = new AppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());

   future.setPos(entry.getPos());

   // 将创建的AppendFuture对象加入到pendingAppendResponsesByTerm中

   CompletableFuture AppendEntryResponse old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);

  

 

  写入Leader

  DLedgerStore有两个实现类,分别为DLedgerMemoryStore(基于内存存储)和DLedgerMmapFileStore(基于Mmap文件映射):
 

  在createDLedgerStore方法中可以看到,是根据配置的存储类型进行选择的:

  

public class DLedgerServer implements DLedgerProtocolHander {

 

   public DLedgerServer(DLedgerConfig dLedgerConfig) {

   this.dLedgerConfig = dLedgerConfig;

   this.memberState = new MemberState(dLedgerConfig);

   // 根据配置中的StoreType创建DLedgerStore

   this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);

   // ...

   // 创建DLedgerStore

   private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {

   if (storeType.equals(DLedgerConfig.MEMORY)) {

   return new DLedgerMemoryStore(config, memberState);

   } else {

   return new DLedgerMmapFileStore(config, memberState);

  

 

  appendAsLeader

  接下来以DLedgerMmapFileStore为例,看下appendAsLeader的处理逻辑:

  
获取日志数据buffer(dataBuffer)和索引数据buffer(indexBuffer),会先将内容写入buffer,再将buffer内容写入文件;

  
设置消息的index(为每条消息进行了编号),为ledgerEndIndex + 1,ledgerEndIndex初始值为-1,新增一条消息ledgerEndIndex的值也会增1,ledgerEndIndex是随着消息的增加而递增的,写入成功之后会更新ledgerEndIndex的值,ledgerEndIndex记录最后一条成功写入消息的index;
 

  
调用dataFileList的append方法将dataBuffer内容写入日志文件,返回数据在文件中的偏移量;

  
调用updateLedgerEndIndexAndTerm方法更新MemberState中记录的LedgerEndIndex和LedgerEndTerm的值,LedgerEndIndex会在FLUSH的时候,将内容写入到文件进行持久化保存。

  
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {

   // Leader校验判断当前节点是否是Leader

   PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

   // 磁盘是否已满校验

   PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);

   // 获取日志数据buffer

   ByteBuffer dataBuffer = localEntryBuffer.get();

   // 获取索引数据buffer

   ByteBuffer indexBuffer = localIndexBuffer.get();

   // 将entry消息内容写入dataBuffer

   DLedgerEntryCoder.encode(entry, dataBuffer);

   int entrySize = dataBuffer.remaining();

   synchronized (memberState) {

   PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);

   PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);

   // 设置消息的index,为ledgerEndIndex + 1

   long nextIndex = ledgerEndIndex + 1;

   // 设置消息的index

   entry.setIndex(nextIndex);

   // 设置Term

   entry.setTerm(memberState.currTerm());

   // 设置魔数

   entry.setMagic(CURRENT_MAGIC);

   // 设置Term的Index

   DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);

   long prePos = dataFileList.preAppend(dataBuffer.remaining());

   entry.setPos(prePos);

   PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);

   DLedgerEntryCoder.setPos(dataBuffer, prePos);

   for (AppendHook writeHook : appendHooks) {

   writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);

   // 将dataBuffer内容写入日志文件,返回数据的位置

   long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());

   PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);

   PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);

   // 将索引信息写入indexBuffer

   DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);

   // 将indexBuffer内容写入索引文件

   long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);

   PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);

   if (logger.isDebugEnabled()) {

   logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);

   // ledgerEndIndex自增

   ledgerEndIndex++;

   // 设置ledgerEndTerm的值为当前Term

   ledgerEndTerm = memberState.currTerm();

   if (ledgerBeginIndex == -1) {

   // 更新ledgerBeginIndex

   ledgerBeginIndex = ledgerEndIndex;

   // 更新LedgerEndIndex和LedgerEndTerm

   updateLedgerEndIndexAndTerm();

   return entry;

  

 

 

  更新LedgerEndIndex和LedgerEndTerm

  在消息写入Leader之后,会调用getLedgerEndIndex和getLedgerEndTerm法获取DLedgerMmapFileStore中记录的LedgerEndIndex和LedgerEndTerm的值,然后更新到MemberState中:

  

public abstract class DLedgerStore {

 

   protected void updateLedgerEndIndexAndTerm() {

   if (getMemberState() != null) {

   // 调用MemberState的updateLedgerIndexAndTerm进行更新

   getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());

  public class MemberState {

   private volatile long ledgerEndIndex = -1;

   private volatile long ledgerEndTerm = -1;

   // 更新ledgerEndIndex和ledgerEndTerm

   public void updateLedgerIndexAndTerm(long index, long term) {

   this.ledgerEndIndex = index;

   this.ledgerEndTerm = term;

  

 

  waitAck

  在消息写入Leader节点之后,由于Leader节点需要向Follwer节点转发日志,这个过程是异步处理的,所以会在waitAck方法中为消息的写入创建异步响应对象,主要处理逻辑如下:

  调用updatePeerWaterMark更新水位线,因为Leader节点需要将日志转发给各个Follower,这个水位线其实是记录每个节点消息的复制进度,也就是复制到哪条消息,将消息的index记录下来,这里更新的是Leader节点最新写入消息的index,后面会看到Follower节点的更新;

  如果集群中只有一个节点,创建AppendEntryResponse返回响应;

  如果集群中有多个节点,由于日志转发是异步进行的,所以创建异步响应对象AppendFuture AppendEntryResponse ,并将创建的对象加入到pendingAppendResponsesByTerm中,pendingAppendResponsesByTerm的数据就是在这里加入的;

  这里再区分一下pendingAppendResponsesByTerm和peerWaterMarksByTerm:
 

  pendingAppendResponsesByTerm中记录的是每条消息写入请求的异步响应对象AppendEntryResponse,因为要等待集群中大多数节点的响应,所以使用了异步处理,之后获取处理结果。
 

  peerWaterMarksByTerm中记录的是每个节点的消息复制进度,保存的是每个节点最后一条成功写入的消息的index。

  

public class DLedgerEntryPusher {

 

   public CompletableFuture AppendEntryResponse waitAck(DLedgerEntry entry, boolean isBatchWait) {

   // 更新当前节点最新写入消息的index

   updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());

   // 如果集群中只有一个节点

   if (memberState.getPeerMap().size() == 1) {

   // 创建响应

   AppendEntryResponse response = new AppendEntryResponse();

   response.setGroup(memberState.getGroup());

   response.setLeaderId(memberState.getSelfId());

   response.setIndex(entry.getIndex());

   response.setTerm(entry.getTerm());

   response.setPos(entry.getPos());

   if (isBatchWait) {

   return BatchAppendFuture.newCompletedFuture(entry.getPos(), response);

   return AppendFuture.newCompletedFuture(entry.getPos(), response);

   } else {

   // pendingAppendResponsesByTerm

   checkTermForPendingMap(entry.getTerm(), "waitAck");

   // 响应对象

   AppendFuture AppendEntryResponse future;

   // 创建AppendFuture

   if (isBatchWait) {

   // 批量

   future = new BatchAppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());

   } else {

   future = new AppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());

   future.setPos(entry.getPos());

   // 将创建的AppendFuture对象加入到pendingAppendResponsesByTerm中

   CompletableFuture AppendEntryResponse old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);

   if (old != null) {

   logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());

   return future;

  

 

  消息写入Leader之后,Leader节点会将消息转发给其他Follower节点,这个过程是异步进行处理的,接下来看下消息的复制过程。

  在DLedgerEntryPusher的startup方法中会启动以下线程:

  EntryDispatcher:用于Leader节点向Follwer节点转发日志;

  EntryHandler:用于Follower节点处理Leader节点发送的日志;

  QuorumAckChecker:用于Leader节点等待Follower节点同步;

  需要注意的是,Leader节点会为每个Follower节点创建EntryDispatcher转发器,每一个EntryDispatcher负责一个节点的日志转发,多个节点之间是并行处理的。

  

public class DLedgerEntryPusher {

 

   public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,

   DLedgerRpcService dLedgerRpcService) {

   this.dLedgerConfig = dLedgerConfig;

   this.memberState = memberState;

   this.dLedgerStore = dLedgerStore;

   this.dLedgerRpcService = dLedgerRpcService;

   for (String peer : memberState.getPeerMap().keySet()) {

   if (!peer.equals(memberState.getSelfId())) {

   // 为集群中除当前节点以外的其他节点创建EntryDispatcher

   dispatcherMap.put(peer, new EntryDispatcher(peer, logger));

   // 创建EntryHandler

   this.entryHandler = new EntryHandler(logger);

   // 创建QuorumAckChecker

   this.quorumAckChecker = new QuorumAckChecker(logger);

   public void startup() {

   // 启动EntryHandler

   entryHandler.start();

   // 启动QuorumAckChecker

   quorumAckChecker.start();

   // 启动EntryDispatcher

   for (EntryDispatcher dispatcher : dispatcherMap.values()) {

   dispatcher.start();

  

 

  EntryDispatcher(日志转发)

  EntryDispatcher用于Leader节点向Follower转发日志,它继承了ShutdownAbleThread,所以会启动线程处理日志转发,入口在doWork方法中。

  在doWork方法中,首先调用checkAndFreshState校验节点的状态,这一步主要是校验当前节点是否是Leader节点以及更改消息的推送类型,如果不是Leader节点结束处理,如果是Leader节点,对消息的推送类型进行判断:

  APPEND:消息追加,用于向Follower转发消息,批量消息调用doBatchAppend,否则调用doAppend处理;

  COMPARE:消息对比,一般出现在数据不一致的情况下,此时调用doCompare对比消息;

  

public class DLedgerEntryPusher {

 

   // 日志转发线程

   private class EntryDispatcher extends ShutdownAbleThread {

   @Override

   public void doWork() {

   try {

   // 检查状态

   if (!checkAndFreshState()) {

   waitForRunning(1);

   return;

   // 如果是APPEND类型

   if (type.get() == PushEntryRequest.Type.APPEND) {

   // 如果开启了批量追加

   if (dLedgerConfig.isEnableBatchPush()) {

   doBatchAppend();

   } else {

   doAppend();

   } else {

   // 比较

   doCompare();

   Thread.yield();

   } catch (Throwable t) {

   DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);

   // 出现异常转为COMPARE

   changeState(-1, PushEntryRequest.Type.COMPARE);

   DLedgerUtils.sleep(500);

  

 

  状态检查(checkAndFreshState)

  如果Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState的不一致,会调用changeState方法,将消息的推送类型更改为COMPARE,并将compareIndex置为-1:

  

public class DLedgerEntryPusher {

 

   private class EntryDispatcher extends ShutdownAbleThread {

   private long term = -1;

   private String leaderId = null;

   private boolean checkAndFreshState() {

   // 如果不是Leader节点

   if (!memberState.isLeader()) {

   return false;

   // 如果Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState的不一致

   if (term != memberState.currTerm() leaderId == null !leaderId.equals(memberState.getLeaderId())) {

   synchronized (memberState) { // 加锁

   if (!memberState.isLeader()) {

   return false;

   PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);

   term = memberState.currTerm();

   leaderId = memberState.getSelfId();

   // 更改状态为COMPARE

   changeState(-1, PushEntryRequest.Type.COMPARE);

   return true;

   private synchronized void changeState(long index, PushEntryRequest.Type target) {

   logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);

   switch (target) {

   case APPEND:

   compareIndex = -1;

   updatePeerWaterMark(term, peerId, index);

   quorumAckChecker.wakeup();

   writeIndex = index + 1;

   if (dLedgerConfig.isEnableBatchPush()) {

   resetBatchAppendEntryRequest();

   break;

   case COMPARE:

   // 如果设置COMPARE状态成功

   if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {

   compareIndex = -1; // compareIndex改为-1

   if (dLedgerConfig.isEnableBatchPush()) {

   batchPendingMap.clear();

   } else {

   pendingMap.clear();

   break;

   case TRUNCATE:

   compareIndex = -1;

   break;

   default:

   break;

   type.set(target);

  

 

  Leader节点消息转发

  如果处于APPEND状态,Leader节点会向Follower节点发送Append请求,将消息转发给Follower节点,doAppend方法的处理逻辑如下:

  
writeIndex为待转发消息的Index,默认值为-1,判断是否大于LedgerEndIndex,如果大于调用doCommit向Follower节点发送COMMIT请求更新committedIndex(后面再说);

  这里可以看出转发日志的时候也使用了一个计数器writeIndex来记录待转发消息的index,每次根据writeIndex的值从日志中取出消息进行转发,转发成后更新writeIndex的值(自增)指向下一条数据。
 

  
如果pendingMap中的大小超过了最大限制maxPendingSize的值,或者上次检查时间超过了1000ms(有较长的时间未进行清理),进行过期数据清理(这一步主要就是为了清理数据):

  pendingMap是一个ConcurrentMap,KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间(doAppendInner方法中会将数据加入到pendingMap);

  
前面知道peerWaterMark的数据记录了每个节点的消息复制进度,这里根据Term和节点ID获取对应的复制进度(最新复制成功的消息的index)记在peerWaterMark变量中;

  遍历pendingMap,与peerWaterMark的值对比,peerWaterMark之前的消息表示都已成功的写入完毕,所以小于peerWaterMark说明已过期可以被清理掉,将数据从pendingMap移除达到清理空间的目的;

  更新检查时间lastCheckLeakTimeMs的值为当前时间;

  


public class DLedgerEntryPusher {

 

   private class EntryDispatcher extends ShutdownAbleThread {

   // 待转发消息的Index,默认值为-1

   private long writeIndex = -1;

   // KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间

   private ConcurrentMap Long, Long pendingMap = new ConcurrentHashMap ();

   private void doAppend() throws Exception {

   while (true) {

   // 校验状态

   if (!checkAndFreshState()) {

   break;

   // 如果不是APPEND状态,终止

   if (type.get() != PushEntryRequest.Type.APPEND) {

   break;

   // 判断待转发消息的Index是否大于LedgerEndIndex

   if (writeIndex dLedgerStore.getLedgerEndIndex()) {

   doCommit(); // 向Follower节点发送COMMIT请求更新

   doCheckAppendResponse();

   break;

   // 如果pendingMap中的大小超过了maxPendingSize,或者上次检查时间超过了1000ms

   if (pendingMap.size() = maxPendingSize (DLedgerUtils.elapsed(lastCheckLeakTimeMs) 1000)) {

   // 根据节点peerId获取复制进度

   long peerWaterMark = getPeerWaterMark(term, peerId);

   // 遍历pendingMap

   for (Long index : pendingMap.keySet()) {

   // 如果index小于peerWaterMark

   if (index peerWaterMark) {

   // 移除

   pendingMap.remove(index);

   // 更新检查时间

   lastCheckLeakTimeMs = System.currentTimeMillis();

   if (pendingMap.size() = maxPendingSize) {

   doCheckAppendResponse();

   break;

   // 同步消息

   doAppendInner(writeIndex);

   // 更新writeIndex的值

   writeIndex++;

  

 

  getPeerWaterMark

  peerWaterMarksByTerm

  peerWaterMarksByTerm中记录了日志转发的进度,KEY为Term,VALUE为ConcurrentMap,ConcurrentMap中的KEY为Follower节点的ID(peerId),VALUE为该节点已经同步完毕的最新的那条消息的index:
 

  调用getPeerWaterMark方法的时候,首先会调用checkTermForWaterMark检查peerWaterMarksByTerm是否存在数据,如果不存在, 创建ConcurrentMap,并遍历集群中的节点,加入到ConcurrentMap,其中KEY为节点的ID,value为默认值-1,当消息成功写入Follower节点后,会调用updatePeerWaterMark更同步进度:

  

public class DLedgerEntryPusher {

 

   // 记录Follower节点的同步进度,KEY为Term,VALUE为ConcurrentMap

   // ConcurrentMap中的KEY为Follower节点的ID(peerId。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: