本篇文章为你整理了【RocketMQ】Dledger日志复制源码分析(rocketmq_client.log这个日志如何清除)的详细内容,包含有rocketmq client配置日志路径 rocketmq_client.log这个日志如何清除 rocketmqclient日志屏蔽 rocketmq异步复制 【RocketMQ】Dledger日志复制源码分析,希望能帮助你了解 【RocketMQ】Dledger日志复制源码分析。
在 【RocketMQ】消息的存储一文中提到,Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息,在DLedger模式下使用的是DLedgerCommitLog,进入asyncPutMessages方法,主要处理逻辑如下:
调用serialize方法将消息数据序列化;
构建批量消息追加请求BatchAppendEntryRequest,并设置上一步序列化的消息数据;
调用handleAppend方法提交消息追加请求,进行消息写入;
public class DLedgerCommitLog extends CommitLog {
@Override
public CompletableFuture PutMessageResult asyncPutMessages(MessageExtBatch messageExtBatch) {
// ...
AppendMessageResult appendResult;
BatchAppendFuture AppendEntryResponse dledgerFuture;
EncodeResult encodeResult;
// 将消息数据序列化
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status)));
putMessageLock.lock();
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
long queueOffset;
int msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
// 创建批量追加消息请求
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); // 设置group
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
// 从EncodeResult中获取序列化的消息数据
request.setBatchMsgs(encodeResult.batchData);
// 调用handleAppend将数据写入
AppendFuture AppendEntryResponse appendFuture = (AppendFuture AppendEntryResponse ) dLedgerServer.handleAppend(request);
if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
// ...
} catch (Exception e) {
log.error("Put message error", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
// ...
在serialize方法中,主要是将消息数据序列化到内存buffer,由于消息可能有多条,所以开启循环读取每一条数据进行序列化:
读取总数据大小、魔数和CRC校验和,这三步是为了让buffer的读取指针向后移动;
读取FLAG,记在flag变量;
读取消息长度,记在bodyLen变量;
接下来是消息内容开始位置,将开始位置记录在bodyPos变量;
从消息内容开始位置,读取消息内容计算CRC校验和;
更改buffer读取指针位置,将指针从bodyPos开始移动bodyLen个位置,也就是跳过消息内容,继续读取下一个数据;
读取消息属性长度,记录消息属性开始位置;
获取主题信息并计算数据的长度;
计算消息长度,并根据消息长度分配内存;
校验消息长度是否超过限制;
初始化内存空间,将消息的相关内容依次写入;
返回序列化结果EncodeResult;
class MessageSerializer {
public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
// 设置Key:top+queueId
String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
int totalMsgLen = 0;
// 获取消息数据
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
List byte[] batchBody = new LinkedList ();
// 获取系统标识
int sysFlag = messageExtBatch.getSysFlag();
int bornHostLength = (sysFlag MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysFlag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 分配内存
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
// 是否有剩余数据未读取
while (messagesByteBuff.hasRemaining()) {
// 读取总大小
messagesByteBuff.getInt();
// 读取魔数
messagesByteBuff.getInt();
// 读取CRC校验和
messagesByteBuff.getInt();
// 读取FLAG
int flag = messagesByteBuff.getInt();
// 读取消息长度
int bodyLen = messagesByteBuff.getInt();
// 记录消息内容开始位置
int bodyPos = messagesByteBuff.position();
// 从消息内容开始位置,读取消息内容计算CRC校验和
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
// 更改位置,将指针从bodyPos开始移动bodyLen个位置,也就是跳过消息内容,继续读取下一个数据
messagesByteBuff.position(bodyPos + bodyLen);
// 读取消息属性长度
short propertiesLen = messagesByteBuff.getShort();
// 记录消息属性位置
int propertiesPos = messagesByteBuff.position();
// 更改位置,跳过消息属性
messagesByteBuff.position(propertiesPos + propertiesLen);
// 获取主题信息
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
// 主题字节数组长度
final int topicLength = topicData.length;
// 计算消息长度
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
// 根据消息长度分配内存
ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
// 如果超过了最大消息大小
if (msgLen this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " +
bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
// 更新总长度
totalMsgLen += msgLen;
// 如果超过了最大消息大小
if (totalMsgLen maxMessageSize) {
throw new RuntimeException("message size exceeded");
// 初始化内存空间
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 写入长度
msgStoreItemMemory.putInt(msgLen);
// 2 写入魔数
msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
// 3 写入CRC校验和
msgStoreItemMemory.putInt(bodyCrc);
// 4 写入QUEUEID
msgStoreItemMemory.putInt(messageExtBatch.getQueueId());
// 5 写入FLAG
msgStoreItemMemory.putInt(flag);
// 6 写入队列偏移量QUEUEOFFSET
msgStoreItemMemory.putLong(0L);
// 7 写入物理偏移量
msgStoreItemMemory.putLong(0);
// 8 写入系统标识SYSFLAG
msgStoreItemMemory.putInt(messageExtBatch.getSysFlag());
// 9 写入消息产生的时间戳
msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
resetByteBuffer(bornHostHolder, bornHostLength);
msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 写入消息存储时间戳
msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
resetByteBuffer(storeHostHolder, storeHostLength);
msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset
msgStoreItemMemory.putLong(0);
// 15 写入消息内容长度
msgStoreItemMemory.putInt(bodyLen);
if (bodyLen 0) {
// 写入消息内容
msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 写入主题
msgStoreItemMemory.put((byte) topicLength);
msgStoreItemMemory.put(topicData);
// 17 写入属性长度
msgStoreItemMemory.putShort(propertiesLen);
if (propertiesLen 0) {
msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
// 创建字节数组
byte[] data = new byte[msgLen];
msgStoreItemMemory.clear();
msgStoreItemMemory.get(data);
// 加入到消息集合
batchBody.add(data);
// 返回结果
return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen);
将消息数据序列化之后,封装了消息追加请求,调用handleAppend方法写入消息,处理逻辑如下:
获取当前的Term,判断当前Term对应的写入请求数量是否超过了最大值,如果未超过进入下一步,如果超过,设置响应状态为LEADER_PENDING_FULL表示处理的消息追加请求数量过多,拒绝处理当前请求;
校验是否是批量请求:
如果是:遍历每一个消息,为消息创建DLedgerEntry对象,调用appendAsLeader将消息写入到Leader节点, 并调用waitAck为最后最后一条消息创建异步响应对象;
如果不是:直接为消息创建DLedgerEntry对象,调用appendAsLeader将消息写入到Leader节点并调用waitAck创建异步响应对象;
@Override
public CompletableFuture AppendEntryResponse handleAppend(AppendEntryRequest request) throws IOException {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
// 校验是否是Leader节点,如果不是Leader抛出NOT_LEADER异常
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
// 获取当前的Term
long currTerm = memberState.currTerm();
// 判断Pengding请求的数量
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setGroup(memberState.getGroup());
// 设置响应结果LEADER_PENDING_FULL
appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
// 设置Term
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId()); // 设置LeaderID
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
if (request instanceof BatchAppendEntryRequest) { // 批量
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() != null batchRequest.getBatchMsgs().size() != 0) {
long[] positions = new long[batchRequest.getBatchMsgs().size()];
DLedgerEntry resEntry = null;
int index = 0;
// 遍历每一个消息
Iterator byte[] iterator = batchRequest.getBatchMsgs().iterator();
while (iterator.hasNext()) {
// 创建DLedgerEntry
DLedgerEntry dLedgerEntry = new DLedgerEntry();
// 设置消息内容
dLedgerEntry.setBody(iterator.next());
// 写入消息
resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[index++] = resEntry.getPos();
// 为最后一个dLedgerEntry创建异步响应对象
BatchAppendFuture AppendEntryResponse batchAppendFuture =
(BatchAppendFuture AppendEntryResponse ) dLedgerEntryPusher.waitAck(resEntry, true);
batchAppendFuture.setPositions(positions);
return batchAppendFuture;
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
} else { // 普通消息
DLedgerEntry dLedgerEntry = new DLedgerEntry();
// 设置消息内容
dLedgerEntry.setBody(request.getBody());
// 写入消息
DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
// 等待响应,创建异步响应对象
return dLedgerEntryPusher.waitAck(resEntry, false);
} catch (DLedgerException e) {
// ...
pendingAppendResponsesByTerm
DLedgerEntryPusher中有一个pendingAppendResponsesByTerm成员变量,KEY为Term的值,VALUE是一个ConcurrentHashMap,KEY为消息的index(每条消息的编号,从0开始,后面会提到),ConcurrentMap的KEY为消息的index,value为此条消息写入请求的异步响应对象AppendEntryResponse:
调用isPendingFull方法的时候,会先校验当前Term是否在pendingAppendResponsesByTerm中有对应的值,如果没有,创建一个ConcurrentHashMap进行初始化,否则获取对应的ConcurrentHashMap里面数据的个数,与MaxPendingRequestsNum做对比,校验是否超过了最大值:
public class DLedgerEntryPusher {
// 外层的KEY为Term的值,value是一个ConcurrentMap
// ConcurrentMap的KEY为消息的index,value为此条消息写入请求的异步响应对象AppendEntryResponse
private Map Long, ConcurrentMap Long, TimeoutFuture AppendEntryResponse pendingAppendResponsesByTerm = new ConcurrentHashMap ();
public boolean isPendingFull(long currTerm) {
// 校验currTerm是否在pendingAppendResponsesByTerm中
checkTermForPendingMap(currTerm, "isPendingFull");
// 判断当前Term对应的写入请求数量是否超过了最大值
return pendingAppendResponsesByTerm.get(currTerm).size() dLedgerConfig.getMaxPendingRequestsNum();
private void checkTermForPendingMap(long term, String env) {
// 如果pendingAppendResponsesByTerm不包含
if (!pendingAppendResponsesByTerm.containsKey(term)) {
logger.info("Initialize the pending append map in {} for term={}", env, term);
// 创建一个ConcurrentHashMap加入到pendingAppendResponsesByTerm
pendingAppendResponsesByTerm.putIfAbsent(term, new ConcurrentHashMap ());
pendingAppendResponsesByTerm的值是在什么时候加入的?
在写入Leader节点之后,调用DLedgerEntryPusher的waitAck方法(后面会讲到)的时候,如果集群中有多个节点,会为当前的请求创建AppendFuture AppendEntryResponse 响应对象加入到pendingAppendResponsesByTerm中,所以可以通过pendingAppendResponsesByTerm中存放的响应对象数量判断当前Term有多少个在等待的写入请求:
// 创建响应对象
AppendFuture AppendEntryResponse future;
// 创建AppendFuture
if (isBatchWait) {
// 批量
future = new BatchAppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());
} else {
future = new AppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());
future.setPos(entry.getPos());
// 将创建的AppendFuture对象加入到pendingAppendResponsesByTerm中
CompletableFuture AppendEntryResponse old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
写入Leader
DLedgerStore有两个实现类,分别为DLedgerMemoryStore(基于内存存储)和DLedgerMmapFileStore(基于Mmap文件映射):
在createDLedgerStore方法中可以看到,是根据配置的存储类型进行选择的:
public class DLedgerServer implements DLedgerProtocolHander {
public DLedgerServer(DLedgerConfig dLedgerConfig) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = new MemberState(dLedgerConfig);
// 根据配置中的StoreType创建DLedgerStore
this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);
// ...
// 创建DLedgerStore
private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
if (storeType.equals(DLedgerConfig.MEMORY)) {
return new DLedgerMemoryStore(config, memberState);
} else {
return new DLedgerMmapFileStore(config, memberState);
appendAsLeader
接下来以DLedgerMmapFileStore为例,看下appendAsLeader的处理逻辑:
获取日志数据buffer(dataBuffer)和索引数据buffer(indexBuffer),会先将内容写入buffer,再将buffer内容写入文件;
设置消息的index(为每条消息进行了编号),为ledgerEndIndex + 1,ledgerEndIndex初始值为-1,新增一条消息ledgerEndIndex的值也会增1,ledgerEndIndex是随着消息的增加而递增的,写入成功之后会更新ledgerEndIndex的值,ledgerEndIndex记录最后一条成功写入消息的index;
调用dataFileList的append方法将dataBuffer内容写入日志文件,返回数据在文件中的偏移量;
调用updateLedgerEndIndexAndTerm方法更新MemberState中记录的LedgerEndIndex和LedgerEndTerm的值,LedgerEndIndex会在FLUSH的时候,将内容写入到文件进行持久化保存。
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
// Leader校验判断当前节点是否是Leader
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
// 磁盘是否已满校验
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
// 获取日志数据buffer
ByteBuffer dataBuffer = localEntryBuffer.get();
// 获取索引数据buffer
ByteBuffer indexBuffer = localIndexBuffer.get();
// 将entry消息内容写入dataBuffer
DLedgerEntryCoder.encode(entry, dataBuffer);
int entrySize = dataBuffer.remaining();
synchronized (memberState) {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);
// 设置消息的index,为ledgerEndIndex + 1
long nextIndex = ledgerEndIndex + 1;
// 设置消息的index
entry.setIndex(nextIndex);
// 设置Term
entry.setTerm(memberState.currTerm());
// 设置魔数
entry.setMagic(CURRENT_MAGIC);
// 设置Term的Index
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);
for (AppendHook writeHook : appendHooks) {
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
// 将dataBuffer内容写入日志文件,返回数据的位置
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
// 将索引信息写入indexBuffer
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
// 将indexBuffer内容写入索引文件
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
if (logger.isDebugEnabled()) {
logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
// ledgerEndIndex自增
ledgerEndIndex++;
// 设置ledgerEndTerm的值为当前Term
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
// 更新ledgerBeginIndex
ledgerBeginIndex = ledgerEndIndex;
// 更新LedgerEndIndex和LedgerEndTerm
updateLedgerEndIndexAndTerm();
return entry;
更新LedgerEndIndex和LedgerEndTerm
在消息写入Leader之后,会调用getLedgerEndIndex和getLedgerEndTerm法获取DLedgerMmapFileStore中记录的LedgerEndIndex和LedgerEndTerm的值,然后更新到MemberState中:
public abstract class DLedgerStore {
protected void updateLedgerEndIndexAndTerm() {
if (getMemberState() != null) {
// 调用MemberState的updateLedgerIndexAndTerm进行更新
getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
public class MemberState {
private volatile long ledgerEndIndex = -1;
private volatile long ledgerEndTerm = -1;
// 更新ledgerEndIndex和ledgerEndTerm
public void updateLedgerIndexAndTerm(long index, long term) {
this.ledgerEndIndex = index;
this.ledgerEndTerm = term;
waitAck
在消息写入Leader节点之后,由于Leader节点需要向Follwer节点转发日志,这个过程是异步处理的,所以会在waitAck方法中为消息的写入创建异步响应对象,主要处理逻辑如下:
调用updatePeerWaterMark更新水位线,因为Leader节点需要将日志转发给各个Follower,这个水位线其实是记录每个节点消息的复制进度,也就是复制到哪条消息,将消息的index记录下来,这里更新的是Leader节点最新写入消息的index,后面会看到Follower节点的更新;
如果集群中只有一个节点,创建AppendEntryResponse返回响应;
如果集群中有多个节点,由于日志转发是异步进行的,所以创建异步响应对象AppendFuture AppendEntryResponse ,并将创建的对象加入到pendingAppendResponsesByTerm中,pendingAppendResponsesByTerm的数据就是在这里加入的;
这里再区分一下pendingAppendResponsesByTerm和peerWaterMarksByTerm:
pendingAppendResponsesByTerm中记录的是每条消息写入请求的异步响应对象AppendEntryResponse,因为要等待集群中大多数节点的响应,所以使用了异步处理,之后获取处理结果。
peerWaterMarksByTerm中记录的是每个节点的消息复制进度,保存的是每个节点最后一条成功写入的消息的index。
public class DLedgerEntryPusher {
public CompletableFuture AppendEntryResponse waitAck(DLedgerEntry entry, boolean isBatchWait) {
// 更新当前节点最新写入消息的index
updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());
// 如果集群中只有一个节点
if (memberState.getPeerMap().size() == 1) {
// 创建响应
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getSelfId());
response.setIndex(entry.getIndex());
response.setTerm(entry.getTerm());
response.setPos(entry.getPos());
if (isBatchWait) {
return BatchAppendFuture.newCompletedFuture(entry.getPos(), response);
return AppendFuture.newCompletedFuture(entry.getPos(), response);
} else {
// pendingAppendResponsesByTerm
checkTermForPendingMap(entry.getTerm(), "waitAck");
// 响应对象
AppendFuture AppendEntryResponse future;
// 创建AppendFuture
if (isBatchWait) {
// 批量
future = new BatchAppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());
} else {
future = new AppendFuture (dLedgerConfig.getMaxWaitAckTimeMs());
future.setPos(entry.getPos());
// 将创建的AppendFuture对象加入到pendingAppendResponsesByTerm中
CompletableFuture AppendEntryResponse old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
if (old != null) {
logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
return future;
消息写入Leader之后,Leader节点会将消息转发给其他Follower节点,这个过程是异步进行处理的,接下来看下消息的复制过程。
在DLedgerEntryPusher的startup方法中会启动以下线程:
EntryDispatcher:用于Leader节点向Follwer节点转发日志;
EntryHandler:用于Follower节点处理Leader节点发送的日志;
QuorumAckChecker:用于Leader节点等待Follower节点同步;
需要注意的是,Leader节点会为每个Follower节点创建EntryDispatcher转发器,每一个EntryDispatcher负责一个节点的日志转发,多个节点之间是并行处理的。
public class DLedgerEntryPusher {
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
this.dLedgerStore = dLedgerStore;
this.dLedgerRpcService = dLedgerRpcService;
for (String peer : memberState.getPeerMap().keySet()) {
if (!peer.equals(memberState.getSelfId())) {
// 为集群中除当前节点以外的其他节点创建EntryDispatcher
dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
// 创建EntryHandler
this.entryHandler = new EntryHandler(logger);
// 创建QuorumAckChecker
this.quorumAckChecker = new QuorumAckChecker(logger);
public void startup() {
// 启动EntryHandler
entryHandler.start();
// 启动QuorumAckChecker
quorumAckChecker.start();
// 启动EntryDispatcher
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.start();
EntryDispatcher(日志转发)
EntryDispatcher用于Leader节点向Follower转发日志,它继承了ShutdownAbleThread,所以会启动线程处理日志转发,入口在doWork方法中。
在doWork方法中,首先调用checkAndFreshState校验节点的状态,这一步主要是校验当前节点是否是Leader节点以及更改消息的推送类型,如果不是Leader节点结束处理,如果是Leader节点,对消息的推送类型进行判断:
APPEND:消息追加,用于向Follower转发消息,批量消息调用doBatchAppend,否则调用doAppend处理;
COMPARE:消息对比,一般出现在数据不一致的情况下,此时调用doCompare对比消息;
public class DLedgerEntryPusher {
// 日志转发线程
private class EntryDispatcher extends ShutdownAbleThread {
@Override
public void doWork() {
try {
// 检查状态
if (!checkAndFreshState()) {
waitForRunning(1);
return;
// 如果是APPEND类型
if (type.get() == PushEntryRequest.Type.APPEND) {
// 如果开启了批量追加
if (dLedgerConfig.isEnableBatchPush()) {
doBatchAppend();
} else {
doAppend();
} else {
// 比较
doCompare();
Thread.yield();
} catch (Throwable t) {
DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
// 出现异常转为COMPARE
changeState(-1, PushEntryRequest.Type.COMPARE);
DLedgerUtils.sleep(500);
状态检查(checkAndFreshState)
如果Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState的不一致,会调用changeState方法,将消息的推送类型更改为COMPARE,并将compareIndex置为-1:
public class DLedgerEntryPusher {
private class EntryDispatcher extends ShutdownAbleThread {
private long term = -1;
private String leaderId = null;
private boolean checkAndFreshState() {
// 如果不是Leader节点
if (!memberState.isLeader()) {
return false;
// 如果Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState的不一致
if (term != memberState.currTerm() leaderId == null !leaderId.equals(memberState.getLeaderId())) {
synchronized (memberState) { // 加锁
if (!memberState.isLeader()) {
return false;
PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
term = memberState.currTerm();
leaderId = memberState.getSelfId();
// 更改状态为COMPARE
changeState(-1, PushEntryRequest.Type.COMPARE);
return true;
private synchronized void changeState(long index, PushEntryRequest.Type target) {
logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
switch (target) {
case APPEND:
compareIndex = -1;
updatePeerWaterMark(term, peerId, index);
quorumAckChecker.wakeup();
writeIndex = index + 1;
if (dLedgerConfig.isEnableBatchPush()) {
resetBatchAppendEntryRequest();
break;
case COMPARE:
// 如果设置COMPARE状态成功
if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
compareIndex = -1; // compareIndex改为-1
if (dLedgerConfig.isEnableBatchPush()) {
batchPendingMap.clear();
} else {
pendingMap.clear();
break;
case TRUNCATE:
compareIndex = -1;
break;
default:
break;
type.set(target);
Leader节点消息转发
如果处于APPEND状态,Leader节点会向Follower节点发送Append请求,将消息转发给Follower节点,doAppend方法的处理逻辑如下:
writeIndex为待转发消息的Index,默认值为-1,判断是否大于LedgerEndIndex,如果大于调用doCommit向Follower节点发送COMMIT请求更新committedIndex(后面再说);
这里可以看出转发日志的时候也使用了一个计数器writeIndex来记录待转发消息的index,每次根据writeIndex的值从日志中取出消息进行转发,转发成后更新writeIndex的值(自增)指向下一条数据。
如果pendingMap中的大小超过了最大限制maxPendingSize的值,或者上次检查时间超过了1000ms(有较长的时间未进行清理),进行过期数据清理(这一步主要就是为了清理数据):
pendingMap是一个ConcurrentMap,KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间(doAppendInner方法中会将数据加入到pendingMap);
前面知道peerWaterMark的数据记录了每个节点的消息复制进度,这里根据Term和节点ID获取对应的复制进度(最新复制成功的消息的index)记在peerWaterMark变量中;
遍历pendingMap,与peerWaterMark的值对比,peerWaterMark之前的消息表示都已成功的写入完毕,所以小于peerWaterMark说明已过期可以被清理掉,将数据从pendingMap移除达到清理空间的目的;
更新检查时间lastCheckLeakTimeMs的值为当前时间;
public class DLedgerEntryPusher {
private class EntryDispatcher extends ShutdownAbleThread {
// 待转发消息的Index,默认值为-1
private long writeIndex = -1;
// KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间
private ConcurrentMap Long, Long pendingMap = new ConcurrentHashMap ();
private void doAppend() throws Exception {
while (true) {
// 校验状态
if (!checkAndFreshState()) {
break;
// 如果不是APPEND状态,终止
if (type.get() != PushEntryRequest.Type.APPEND) {
break;
// 判断待转发消息的Index是否大于LedgerEndIndex
if (writeIndex dLedgerStore.getLedgerEndIndex()) {
doCommit(); // 向Follower节点发送COMMIT请求更新
doCheckAppendResponse();
break;
// 如果pendingMap中的大小超过了maxPendingSize,或者上次检查时间超过了1000ms
if (pendingMap.size() = maxPendingSize (DLedgerUtils.elapsed(lastCheckLeakTimeMs) 1000)) {
// 根据节点peerId获取复制进度
long peerWaterMark = getPeerWaterMark(term, peerId);
// 遍历pendingMap
for (Long index : pendingMap.keySet()) {
// 如果index小于peerWaterMark
if (index peerWaterMark) {
// 移除
pendingMap.remove(index);
// 更新检查时间
lastCheckLeakTimeMs = System.currentTimeMillis();
if (pendingMap.size() = maxPendingSize) {
doCheckAppendResponse();
break;
// 同步消息
doAppendInner(writeIndex);
// 更新writeIndex的值
writeIndex++;
getPeerWaterMark
peerWaterMarksByTerm
peerWaterMarksByTerm中记录了日志转发的进度,KEY为Term,VALUE为ConcurrentMap,ConcurrentMap中的KEY为Follower节点的ID(peerId),VALUE为该节点已经同步完毕的最新的那条消息的index:
调用getPeerWaterMark方法的时候,首先会调用checkTermForWaterMark检查peerWaterMarksByTerm是否存在数据,如果不存在, 创建ConcurrentMap,并遍历集群中的节点,加入到ConcurrentMap,其中KEY为节点的ID,value为默认值-1,当消息成功写入Follower节点后,会调用updatePeerWaterMark更同步进度:
public class DLedgerEntryPusher {
// 记录Follower节点的同步进度,KEY为Term,VALUE为ConcurrentMap
// ConcurrentMap中的KEY为Follower节点的ID(peerId。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。