RocketMQ DLedger主从切换之日志复制

Leader节点首先将客户端发送过来的日志按照指定格式存储在Leader节点上,但此时并不会向客户端返回写入成功,而是需要将日志转发给它的所有从节点,只有超过半数的节点都存储了该条日志,Leader节点才会向客户端返回日志写入成功。

日志的复制主要包括如下3个步骤。

1)Leader节点将日志推送到从节点。 2)从节点收到Leader节点推送的日志并存储,然后向Leader节点汇报日志复制结果。 3)Leader节点对日志复制进行仲裁,如果成功存储该条日志的节点超过半数,则向客户端返回写入成功。

在介绍日志复制流程之前,先介绍日志复制的一些设计理念。

日志复制设计理念

日志编号

为了方便对日志进行管理与辨别,Raft协议对每条日志进行编号,每一条消息到达主节点时会生成一个全局唯一的递增号,这样可以根据日志序号来快速判断日志中的数据在主从复制过程中是否保持一致,在DLedger的实现中对应DLedgerMemoryStore中的ledgerBeginIndex、ledgerEndIndex,分别表示当前节点最小的日志序号与最大的日志序号,下一条日志的序号为ledgerEndIndex+1。

日志追加与提交机制

Leader节点收到客户端的数据写入请求后,先通过解析请求提取数据,构建日志对象,并生成日志序号,用seq表示。然后将日志存储到Leader节点内,将日志广播(推送)给其所有从节点。这个过程存在网络延时,如果客户端向主节点查询日志序号为seq的日志,日志已经存储在Leader节点中了,直接返回给客户端显然是有问题的,这是因为网络等原因导致从节点未能正常存储该日志,导致数据不一致,该如何避免出现这个问题呢?

为了解决上述问题,DLedger引入了已提交指针(committedIndex)。当主节点收到客户端的请求时,先将数据进行存储,此时数据是未提交的,这一过程被称为日志追加,此时该条日志对客户端不可见,只有当集群内超过半数的节点都将日志追加完成后,才会更新committedIndex指针,该条日志才会向客户端返回写入成功。一条日志被提交成功的充分必要条件是已超过集群内半数节点成功追加日志。

保证日志一致性

一个拥有3个节点的Raft集群,只需要主节点和其中一个从节点成功追加日志,就可以认为是成功提交了日志,客户端即可通过主节点访问该日志。因为部分数据存在延迟,所以在DLedger的实现中,读写请求都将由Leader节点负责。那么落后的从节点如何再次跟上集群的进度呢?

DLedger的实现思路是按照日志序号向从节点源源不断地转发日志,从节点接收日志后,将这些待追加的数据放入一个待写队列。从节点并不是从挂起队列中处理一个个追加请求的,而是先查找从节点当前已追加的最大日志序号,用ledgerEndIndex表示,然后尝试追加ledgerEndIndex+1的日志,根据日志序号从待写队列中查找日志,如果该队列不为空,并且待写日志不在待写队列中,说明从节点未接收到这条日志,发生了数据缺失。从节点在响应主节点的append请求时会告知数据不一致,然后主节点的日志转发线程状态变更为COMPARE,向该从节点发送COMPARE命令,用来比较主从节点的数据差异。根据比较出的差异重新从主节点同步数据或删除从节点上多余的数据,最终达到一致。同时,主节点也会对推送超时的消息发起重推,尽最大可能帮助从节点及时更新到主节点的数据。

日志复制类设计体系

日志复制(日志转发)由DLedgerEntryPusher实现,具体类图如图9-10所示。

image 2025 02 06 16 33 58 531
Figure 1. 图9-10 日志复制类体系

DledgerEntryPusher是DLedger日志转发与处理核心类,该类构建如下3个对象,每一个对象对应一个线程。

1)EntryDispatcher:日志转发线程,当前节点为主节点时追加。 2)QuorumAckChecker:日志追加ACK投票仲裁线程,当前节点为主节点时激活。 3)EntryHandler:日志接收处理线程,当节点为从节点时激活。

DLedger的日志复制使用推送模式,其核心入口为DLedgerEntryPusher,接下来我们探究其实现细节。DLedgerEntryPusher的详细类图如图9-11所示。

image 2025 02 06 16 34 39 253
Figure 2. 图9-11 DLedgerEntryPusher类图

下面逐一介绍上述核心类及核心属性。

  • DLedgerConfig dLedgerConfig:DLedger多副本相关配置。

  • DLedgerStore dLedgerStore:存储实现类。

  • MemberState memberState:节点状态机。

4)DLedgerRpcService dLedgerRpcService:RPC服务实现类,用于与集群内的其他节点进行网络通信。 5)Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm:每个节点基于投票轮次的水位线标记。键为投票轮次,值为ConcurrentMap<String/ 节点ID*/, Long/ 节点对应的日志序号*/>。 6)Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>>pendingAppendResponsesByTerm:用于存放日志追加请求的响应结果(Future模式)。 7)EntryHandler entryHandler:从节点上开启的线程,用于接收主节点的推送请求(append、commit、append)。 8)QuorumAckChecker quorumAckChecker:主节点上的日志复制结果仲裁器,用于判断日志是否可提交。 9)Map<String, EntryDispatcher> dispatcherMap:日志请求转发器,负责向从节点转发日志,主节点为每一个从节点构建一个EntryDispatcher。

通常了解一个类需要从其构造函数开始,我们先看一下DLedgerEntryPusher的构造函数,如代码清单9-48所示。

代码清单9-48 DLedgerEntryPusher构造函数
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState
        memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {
    this.dLedgerConfig = dLedgerConfig;
    this.memberState = memberState;
    this.dLedgerStore = dLedgerStore;
    this.dLedgerRpcService = dLedgerRpcService;
    for (String peer : memberState.getPeerMap().keySet()) {
        if (!peer.equals(memberState.getSelfId())) {
            dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
        }
    }
}

这里主要是根据集群的配置,为每一个从节点创建一个EntryDispatcher转发线程,即每一个从节点的日志转发相互不干扰。

接下来看一下用来开启日志推送的 startup() 方法,如代码清单9-49所示。

public void startup() {
    entryHandler.start();
    quorumAckChecker.start();
    for (EntryDispatcher dispatcher:
    dispatcherMap.values()){
        dispatcher.start();
    }
}

启动 EntryDispatcher、EntryHandler、QuorumAckChecker 线程,对应日志复制的 3 个核心流程。

  • Leader节点将日志推送到从节点。

  • 从节点收到Leader节点推送的日志并存储,然后向Leader节点汇报日志复制结果。

  • Leader节点对日志复制进行仲裁,如果成功存储该条日志的节点超过半数节点,则向客户端返回写入成功。

日志转发

EntryDispatcher类图

日志转发由EntryDispatcher实现,先来看一下类图,如图9-12所示。

image 2025 02 06 16 46 33 009
Figure 3. 图9-12 EntryDispatcher类图

下面逐一介绍上述核心类及核心属性。

  1. AtomicReference<PushEntryRequest.Type>type=new AtomicReference <>(PushEntryReque st.Type.COMPARE):向从节点发送命令的类型,可选值为COMPARE、TRUNCATE、APPEND、COMMIT。

  2. long lastPushCommitTimeMs =-1:上一次发送 commit 请求的时间戳。

  3. String peerId:目标节点 ID。

  4. long compareIndex = -1:已完成 COMPARE 的日志序号。

  5. long writeIndex = -1:已写入的日志序号。

  6. int maxPendingSize = 1000:允许的最大挂起日志数量。

  7. long term = -1:Leader节点当前的投票轮次。

  8. String leaderId = null:Leader节点ID。

  9. long lastCheckLeakTimeMs = System.currentTimeMillis():上次检测泄漏的时间,所谓泄漏,指的是挂起的日志请求数量超过了maxPendingSize。

  10. ConcurrentMap<Long, Long> pendingMap = new ConcurrentHashMap<>():记录日志的挂起时间,key表示日志的序列(entryIndex),value表示挂起时间戳。

  11. Quota quota = new Quota(dLedgerConfig.getPeerPushQuota()):配额。

推送请求类型

在详细介绍日志转发流程之前,先介绍一下主节点向从节点发送推送请求的类型,在PushEntryRequest.Type中定义,可选值如下。

  • COMPARE:如果Leader节点发生变化,新的Leader节点需要与它的从节点日志条目进行比较,以便截断从节点多余的数据。

  • TRUNCATE:如果Leader节点通过索引完成日志对比后,发现从节点存在多余的数据(未提交的数据),则Leader节点将发送TRUNCATE给它的从节点,删除多余的数据,实现主从节点数据一致性。

  • APPEND:将日志条目追加到从节点。

  • COMMIT:通常Leader节点会将提交的索引附加到append请求,如果append请求很少且分散,Leader节点将发送一个单独的请求来通知从节点提交索引。

日志转发流程

EntryDispatcher是一个线程类,继承自ShutdownAbleThread,其run()方法会循环执行doWork()方法,即doWork()方法为EntryDispatcher的核心入口,如代码清单9-50所示。

代码清单9-50 EntryDispatcher#doWork
public void doWork() {
    try {
        if (!checkAndFreshState()) {
            waitForRunning(1);
            return;
        }
        if (type.get() == PushEntryRequest.Type.APPEND) {
            doAppend();
        } else {
            doCompare();
        }
        waitForRunning(1);
    } catch (Throwable t) {
        DLedgerUtils.sleep(500);
    }
}

该方法主要完成如下两件事。

  • 检查当前节点的状态,确定当前节点状态是否可以发送append或compare请求。

  • 根据当前转发器的状态向从节点发送append或compare请求。

checkAndFreshState()方法不只是简单地检测一下状态,而是会根据运行状态改变日志转发器的状态,从而驱动转发器是发送append请求还是发送compare请求,下面详细看一下该方法的实现细节,如代码清单9-51所示。

代码清单9-51 EntryDispatcher#checkAndFreshState
private boolean checkAndFreshState() {
    if (!memberState.isLeader()) {
        return false;
    }
    if (term != memberState.currTerm()
            || leaderId == null || !leaderId.equals(member
            State.getLeaderId())) {
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                return false;
            }
            PreConditions.check(memberState.getSelfId().equals(memberState.GetLeaderId()), DLedgerResponseCode.UNKNOWN);
            term = memberState.currTerm();
            leaderId = memberState.getSelfId();
            changeState(-1, PushEntryRequest.Type.COMPARE);
        }
    }
    return true;
}

如果当前节点的状态不是Leader则直接返回。如果日志转发器(EntryDispatcher)的投票轮次为空或与状态机的投票轮次不相等,将日志转发器的term、leaderId与状态机同步,即发送compare请求。这种情况通常是由于集群触发了重新选举,当前节点刚被选举成Leader节点。

changeState改变日志转发器的状态,该方法非常重要,我们来看一下状态转换过程中需要处理的核心逻辑,如代码清单9-52所示。

代码清单9-52 EntryDispatcher#changeState
private synchronized void changeState(long index, PushEntryRequest.Type target) {
    logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
    switch (target) {
        case APPEND:
            compareIndex = -1;
            updatePeerWaterMark(term, peerId, index);
            quorumAckChecker.wakeup();
            writeIndex = index + 1;
            break;
        case COMPARE:
            if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
                compareIndex = -1;
                pendingMap.clear();
            }
            break;
        case TRUNCATE:
            compareIndex = -1;
            break;
        default:
            break;
    }
    type.set(target);
}

先介绍一下该方法两个参数的含义。

  • Long index:已写入日志序号。

  • PushEntryRequest.Type target:日志转发器即将进入的状态。

接下来说明一下进入各个状态后的初始化逻辑。

  • APPEND:日志转发器即将进入APPEND状态,该状态下主节点将向从节点转发日志,重置compareIndex指针,更新当前节点已追加日志序号为index,并唤醒QuorumAckChecker线程,以便对append响应结果进行仲裁。最后更新待追加日志序号(writeIndex)。

  • COMPARE:日志转发器即将进入COMPARE状态,该状态下先重置compareIndex为-1,然后清除已挂起的日志转发请求,与从节点进行协商,以确保主从节点数据一致。

  • TRUNCATE:日志转发器即将进入TRUNCATE状态,该状态下先重置compareIndex为-1,然后向从节点发起truncate请求,清除从节点未提交且在主节点上不存在的数据,确保主从节点数据一致。

compare请求

日志转发器EntryDispatcher的初始状态为COMPARE,当一个节点被选举为Leader后,日志转发器的状态同样会先设置为COMPARE,Leader节点先向从节点发送该请求的目的是比较主、从节点之间数据的差异,以此确保发送主从切换时不会丢失数据,并且重新确定待转发的日志序号。

通过EntryDispatcher的doWork()方法可知,如果节点状态为COMPARE,会调用doCompare()方法。doCompare()方法内部代码都是while(true)包裹,在查看其代码时注意其退出条件,如代码清单9-53所示。

代码清单9-53 EntryDispatcher#doCompare
if (!checkAndFreshState()) {
    break;
}
if (type.get() != PushEntryRequest.Type.COMPARE && type.get() != PushEntryRequest.Type.TRUNCATE){
    break;
}
if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1){
    break;
}

第一步:再次验证当前状态下是否可以发送 compare 请求,如代码清单9-54所示,其关键点如下。

  • 如果当前节点不是Leader节点,则直接跳出。

  • 如果请求类型不是compare或truncate,则直接跳出。

  • 如果compareIndex和ledgerEndIndex都为 -1,表示这是一个新的集群,没有存储任何数据,故无须比较主从是否一致。

代码清单9-54 EntryDispatcher#doCompare
if (compareIndex == -1) {
    compareIndex = dLedgerStore.getLedgerEndIndex();
} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()){
    compareIndex = dLedgerStore.getLedgerEndIndex();
}

第二步:重置compareIndex,其实现逻辑为如果compareIndex为-1或compareIndex不在有效范围内,则重置compareIndex为Leader节点当前存储的最大日志序号,如代码清单9-55所示。

代码清单9-55 EntryDispatcher#doCompare
DLedgerEntry entry = dLedgerStore.get(compareIndex);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);

第三步:根据待比较的日志序号查询日志,并向从节点发起compare请求,默认超时时间为3s,如代码清单9-56所示。这里会涉及网络通信,从节点对compare请求的响应将在下文详细介绍。

代码清单9-56 EntryDispatcher#doCompare
long truncateIndex = -1;
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
    if (compareIndex == response.getEndIndex()) {
        changeState(compareIndex, PushEntryRequest.Type.APPEND);
        break;

    } else {
        truncateIndex = compareIndex;
    }
} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() {
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex < response.getBeginIndex()) {
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex > response.getEndIndex()) {
    compareIndex = response.getEndIndex();
} else {
    compareIndex--;
}
if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
}
if (truncateIndex != -1) {
    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    doTruncate(truncateIndex);
    break;
}

第四步:根据从节点的响应结果计算truncateIndex(需要截断的日志序号),即计算从节点中多余的数据,如代码清单9-57所示,其实现关键点如下。

1)如果主从节点已存储的日志序号相同,则无须截断,日志转发器的状态将变更为APPEND,主节点将开始向从节点转发日志,否则设置truncateIndex为从节点返回的compareIndex,将向从节点发送truncate请求。 2)如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,则设置truncateIndex为主节点的ledgerBeginIndex,即主节点目前最小的偏移量,这样意味着将会删除从节点所有的数据,然后从truncateIndex开始向从节点重新转发日志。这种情况通常发生在从节点崩溃很长一段时间,而主节点删除了过期的条目时。 3)如果compareIndex小于从节点的开始日志序号,则从主节点最小日志序号开始同步。 4)如果compareIndex大于从节点的最大日志序号,并且不小于Leader节点的最小存储日志序号,则将compareIndex设置为从节点最大的日志序号,继续发起compare请求。 5)如果compareIndex大于从节点的开始日志序号,但小于从节点的最大日志序号,表示主节点与从节点数据有相交的情况,故将compareIndex减1,继续比较,直到找到需要截断的日志序号。 6)如果compareIndex小于主节点的最小日志序号,则将truncateIndex设置为主节点的最小日志序号。

代码清单9-57 EntryDispatcher#doCompare
if (truncateIndex != -1) {
    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    doTruncate(truncateIndex);
    break;
}

第五步:如果 truncateIndex 不等于 -1,则日志转发器状态设置为 TRUNCATE,然后向从节点发送 truncate 请求,具体由 doTruncate() 方法实现。

truncate请求

Leader节点在发送compare请求后,得知与从节点的数据存在差异,将向从节点发送truncate请求,指示从节点应该将truncateIndex及以后的日志删除,如代码清单9-58所示。

代码清单9-58 EntryDispatcher#doTruncate
private void doTruncate(long truncateIndex) throws Exception {
    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
    PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
    lastPushCommitTimeMs = System.currentTimeMillis();
    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}

该方法的实现比较简单,主节点构建 truncate 请求包并通过网络向从节点发送请求,从节点在收到请求后会清理多余的数据,使主从节点数据保持一致。日志转发器在处理完 truncate 请求后,状态将变更为 APPEND,开始向从节点转发日志。

append请求

Leader节点在确认主从数据一致后,开始将新的消息转发到从节点。doAppend()方法内部的逻辑被包裹在while(true)中,故在查看其代码时应注意退出条件,如代码清单9-59所示。

代码清单9-59 EntryDispatcher#doAppend
if (!checkAndFreshState()) {
    break;
}
if (type.get() != PushEntryRequest.Type.APPEND){
    break;
}

第一步:再次判断节点状态,确保当前节点是Leader节点并且日志转发器内部的状态为APPEND,如代码清单9-60所示。

代码清单9-60 EntryDispatcher#doAppend
if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
    doCommit();
    doCheckAppendResponse();
    break;
}

第二步:writeIndex表示当前已追加到从节点的日志序号。通常情况下,主节点向从节点发送append请求时会带上主节点已提交的指针,但如果append请求发送不频繁,pending请求超过其队列长度(默认为1万字节)时,会阻止数据的追加,此时有可能会出现writeIndex大于leaderEndIndex的情况,需要单独发送commit请求,并检查append请求响应,如代码清单9-61所示,稍后详细介绍。

代码清单9-61 EntryDispatcher#doAppend
if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) {
    long peerWaterMark = getPeerWaterMark(term, peerId);
    for (Long index : pendingMap.keySet()) {
        if (index < peerWaterMark) {
            pendingMap.remove(index);
        }
    }
    lastCheckLeakTimeMs = System.currentTimeMillis();
}

第三步:检测pendingMap(挂起的请求数量)是否发生泄露,即挂起队列的容量是否超过了最大挂起阈值。获取当前节点的水位线(已成功append请求的日志序号),如果挂起请求的日志序号小于水位线,则丢弃,并记录最后一次检查的时间戳,如代码清单9-62所示。

代码清单9-62 EntryDispatcher#doAppend
if (pendingMap.size() >= maxPendingSize) {
    doCheckAppendResponse();
    break;
}

第四步:如果挂起的请求(等待从节点追加结果)大于maxPendingSize,并且下一条日志的推送请求已经超时,则重新发起推送请求,即重新发送日志,避免网络丢包等异常,如代码清单9-63所示。

代码清单9-63 EntryDispatcher#doAppend
doAppendInner(writeIndex);
writeIndex++;

第五步:调用doAppendInner方法将日志转发到从节点,如代码清单9-64所示。

代码清单9-64 EntryDispatcher#doAppendInner
DLedgerEntry entry = dLedgerStore.get(index);
PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
checkQuotaAndWait(entry);

第六步:如代码清单9-65所示,根据日志序号查询对应的日志内容,并检查是否超出配额,如果超出会触发限流,触发规则如下。

  1. append挂起请求数已超过最大允许挂起数,默认为1000。

  2. 主从同步差异超过300MB,可通过peerPushThrottlePoint进行配置。

  3. 每秒追加的日志超过20MB(可通过peerPushQuota进行配置),则会休眠1s中后再追加。

代码清单9-65 EntryDispatcher#doAppendInner
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
pendingMap.put(index, System.currentTimeMillis());

第七步:构建append请求包,通过网络异步发送并返回Future,然后保存该条日志的发送时间戳,用于区分推送请求是否发送超时,从而触发重推,如代码清单9-66所示。关于append在从节点的响应处理逻辑将在下文详细介绍,在这里先把其当成“黑盒”。

代码清单9-66 EntryDispatcher#doAppendInner
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
    case SUCCESS:
        pendingMap.remove(x.getIndex());
        updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
        quorumAckChecker.wakeup();
        break;
    case INCONSISTENT_STATE:
        changeState(-1, PushEntryRequest.Type.COMPARE);
        break;
    default:
        logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
        break;
}

第八步:Leader节点收到从节点的append响应结果后,对结果进行处理,其核心逻辑如下。

1)从节点返回SUCCESS。 移除pendingMap中对应的日志条目,表示已经成功收到响应结果。 更新已成功追加的日志序号(按投票轮次组织,并且每个从服务器一个键值对)。 唤醒quorumAckChecker线程(主要用于仲裁append结果),后续会详细介绍。 2)从节点返回INCONSISTENT_STATE。 3)如果append请求出现状态不一致的情况,Leader节点将发送compare请求到从节点,以便对比主从节点的数据是否一致。

append重推机制

如果因网络等原因,主节点在向从节点追加日志时失败,该如何保证从节点与主节点一致呢?从上文我们可以得知,Leader节点在向从节点转发日志后,会存储该日志的推送时间戳到pendingMap,其存储结构为ConcurrentMap<Long/日志序号/, Long/PUSH时间戳/>pendingMap。当pendingMap的积压超过1000ms时会触发重推机制,该逻辑封装在doCheckAppendResponse()方法中,如代码清单9-67所示。

代码清单9-67 EntryDispatcher#doCheckAppendResponse
private void doCheckAppendResponse() throws Exception {
    long peerWaterMark = getPeerWaterMark(term, peerId);
    Long sendTimeMs = pendingMap.get(peerWaterMark + 1);
    if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) {
        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
        doAppendInner(peerWaterMark + 1);
    }
}

首先从Map<Long/term/, ConcurrentMap<String/从节点ID/, Long/日志序号/>>peerWaterMarksByTerm中获取从节点已复制的日志序号peerWaterMark,然后用在pendingMap中尝试查找该日志序号加1的记录,如果能找到,说明从服务下一条需要追加的消息已经存储在主节点中,接着在尝试推送,如果该条推送已经超时,默认超时时间为1s,调用doAppendInner重新推送。

日志转发流程图

为了更加直观地展示日志转发的全过程,整理其流程如图9-13所示。

image 2025 02 06 17 29 15 033
Figure 4. 图9-13 日志转发流程图

日志复制流程

Leader节点实时向从节点转发消息,从节点接收到日志后进行存储,然后向Leader节点反馈复制进度,从节点的日志接收主要由EntryHandler实现。

EntryHandler类图

EntryHandler的类图如图9-14所示。

image 2025 02 06 17 32 05 087
Figure 5. 图9-14 EntryHandler类图

下面逐一介绍上述核心类及核心属性。

  • long lastCheckFastForwardTimeMs:上一次检查主服务器是否有推送消息的时间戳。

  • ConcurrentMap<Long,Pair<PushEntryRequest,CompletableFuture<PushEntryResponse>>> writeRequestMap:append请求处理队列。

  • BlockingQueue<Pair<PushEntryRequest,CompletableFuture<PushEntryResponse>>>compareOrTruncateRequests:COMMIT、COMPARE、TRUNCATE相关请求的处理队列。

请求处理等待队列

从上文得知,Leader节点会主动向从节点传播日志,从节点通过网络接收请求数据并处理,其调用链如图9-15所示。

image 2025 02 06 17 33 05 213
Figure 6. 图9-15 从节点日志处理调用链

从节点收到Leader节点的推送请求后(无论是APPEND、COMMIT、COMPARE、TRUNCATE),由EntryHandler的handlePush()方法执行,如代码清单9-68所示。

代码清单9-68 EntryHandler#handlePush
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
    CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);
    switch (request.getType()) {
        case APPEND:
            long index = request.getEntry().getIndex();
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
            if (old != null) {
                future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
            }
            break;
        case COMMIT:
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        case COMPARE:
        case TRUNCATE:
            writeRequestMap.clear();
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        default:
            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            break;
    }
    return future;
}

handlePush()方法的主要职责是将处理请求放入队列,由doWork()方法从处理队列中拉取任务进行处理。

1)如果是append请求,将请求放入writeRequestMap集合,如果已存在该条日志的推送请求,表示Leader重复推送,则返回状态码REPEATED_PUSH。 2)如果是commit请求,将请求存入compareOrTruncateRequests请求处理队列。 3)如果是compare或truncate请求,将待追加队列writeRequestMap清空,并将请求放入compareOrTruncateRequests请求队列,由doWork()方法进行异步处理。

EntryHandler任务分发机制

EntryHandler的handlePush()方法主要是接收请求并将其放入队列的处理队列,而doWork()方法是从指定队列中获取待执行任务。

第一步:如果当前节点的状态不是从节点,则跳出,如代码清单9-69所示。

代码清单9-69 EntryHandler#doWork
if (!memberState.isFollower()) {
    waitForRunning(1);
    return;
}

第二步:如果compareOrTruncateRequests队列不为空,说明优先处理COMMIT、COMPARE、TRUNCATE等请求,如代码清单9-70所示。值得注意的是,这里使用的是peek、poll等非阻塞方法,然后根据请求的类型,调用对应的方法。

代码清单9-70 EntryHandler#doWork
if (compareOrTruncateRequests.peek() != null) {
    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
    PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
    switch (pair.getKey().getType()) {
        case TRUNCATE:
            handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
            break;
        case COMPARE:
            handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
            break;
        case COMMIT:
            handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
            break;
        default:
            break;
    }
}

如代码清单9-71所示,处理日志追加append请求,根据当前节点已存储的最大日志序号计算下一条待写日志的日志序号,从待写队列中获取日志的处理请求。如果能查找到对应日志的追加请求,则执行doAppend()方法追加日志;如果从待写队列中没有找到对应的追加请求,则调用checkAbnormalFuture检查追加请求是否丢失,详细逻辑将在下文介绍。

代码清单9-71 EntryHandler#doWork
long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
Pair < PushEntryRequest, CompletableFuture < PushEntryResponse >> pair = writeRequestMap.remove(nextIndex);
if (pair == null) {
    checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
    waitForRunning(1);
    return;
}
PushEntryRequest request = pair.getKey();
handleDoAppend(nextIndex, request, pair.getValue());

compare请求响应

从上文得知,Leader节点首先会向从节点发送compare请求,以此比较两者的数据是否存在差异,这一步由EntryHandler的handleDoCompare()方法实现,如代码清单9-72所示。

代码清单9-72 EntryHandler#handleDoCompare
private CompletableFuture<PushEntryResponse> handleDoCompare(
        long compareIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
    try {
        DLedgerEntry local = dLedgerStore.get(compareIndex);
        PreConditions.check(request.getEntry().equals(local),
                DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
    } catch (Throwable t) {
        logger.error("[HandleDoCompare] compareIndex={}", compareIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
    return future;
}

这一步的逻辑实现较为简单,其核心思想是判断Leader节点传来的日志序号在从节点中是否存在,如果存在则返回状态码SUCCESS,否则返回状态码INCONSISTENT。同时将当前从节点已存储的最小日志序号、最大日志序号、当前投票轮次返回给Leader节点,方便Leader节点进行比较,从而计算出应该截断的日志序号。

truncate请求响应

Leader节点与从节点进行数据对比后,如果发现数据有差异,将计算出需要截断的日志序号,发送truncate请求给从节点,从节点对多余的日志进行截断,由EntryHandler的handleDoTruncate()方法实现,如代码清单9-73所示。

代码清单9-73 EntryHandler#handleDoTruncate
private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
    try {
        long index = dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
    return future;
}

handleDoTruncate()方法的实现比较简单,删除节点上truncateIndex日志序号之后的所有日志,会调用dLedgerStore的truncate()方法,根据日志序号定位到日志文件。如果命中具体的文件,则修改相应的读写指针、刷盘指针等,并将所在物理文件之后的所有文件删除。因为其实现原理与RocketMQ存储、删除过期日志文件类似,所以这里不再深入展开。

append请求响应

Leader节点与从节点进行差异对比,截断从节点多余的数据文件后,会实时转发日志到从节点,具体由EntryHandler的handleDoAppend()方法实现,如代码清单9-74所示。

代码清单9-74 EntryHandler#handleDoAppend
private void handleDoAppend(long writeIndex, PushEntryRequest request,
                            CompletableFuture<PushEntryResponse> future) {
    try {
        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
}

将从Leader节点的日志追加到从节点,具体调用DLedgerStore的appendAsFollower()方法实现,其实现细节与服务端追加日志的流程基本类似,只是少了日志转发这个流程。然后使用Leader节点的已提交指针更新从节点的已提交指针,即append请求会附带有commit请求的效果。

从节点日志复制异常检测机制

收到Leader节点的append请求后,从节点首先会将这些写入请求存储在writeRequestMap处理队列中,从节点并不是直接从该队列中获取一个待写入处理请求进行数据追加,而是查找当前节点已存储的最大日志序号leaderEndIndex,然后加1得出下一条待追加的日志序号nextIndex。如果该日志序号在writeRequestMap中不存在日志推送请求,则有可能是因为发生了推送请求丢失,在这种情况下,需要进行异常检测,以便尽快恢复异常,使主节点与从节点最终保持一致性。从节点的日志复制异常检测由checkAbnormalFuture()方法实现,如代码清单9-75所示。

代码清单9-75 EntryHandler#checkAbnormalFuture
if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000) {
    return;
}
lastCheckFastForwardTimeMs = System.currentTimeMillis();
if (writeRequestMap.isEmpty()) {
    return;
}

第一步:检测是否需要进行异常检测,如代码清单9-76所示,以下两种情况无须进行检测。

  • 上一次检查距现在不到1s。

  • 当前没有积压的append请求,因此可以同样明确地判断出主节点没有推送新的日志。

代码清单9-76 EntryHandler#checkAbnormalFuture
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
    long index = pair.getKey().getEntry().getIndex();
    if (index <= endIndex) {
        try {
            DLedgerEntry local = dLedgerStore.get(index);

            PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
            logger.warn("[PushFallBehind]The leader pushed an entry index = {}smaller than current ledgerEndIndex = {}, maybe the last ack is missed", index, endIndex);
        } catch (Throwable t) {
            logger.error(" [PushFallBehind]The leader pushed an entry index = {}smaller than current ledgerEndIndex = {}, maybe the last ack is missed", index, endIndex, t);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }
        writeRequestMap.remove(index);
        continue;
    }    //省略下一步代码
}

第二步:遍历所有待写请求,如果待追加的日志序号小于从节点已经存储的最大日志序号,且从节点存储的该条日志内容与推送请求中包含的日志内容相同,说明从节点已成功存储该条日志,则向Leader节点返回SUCCESS。如果从节点存储的该条日志与推送请求中的不一样,则向Leader节点返回INCONSISTENT_STATE,告知从节点从这条日志开始与Leader节点的数据不一致,需要发送compare请求和truncate请求修正数据,如代码清单9-77所示。

代码清单9-77 EntryHandler#checkAbnormalFuture
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
    //省略上一步代码
    if (index == endIndex + 1) {
        return;
    }
}

第三步:如果待追加的日志序号等于endIndex+1,即从节点当前存储的最大日志序号加1,表示从节点下一条期望追加的日志Leader节点已经推送过来了,如代码清单9-78所示。这种情况非常正常,故可结束异常检测逻辑。

代码清单9-78 EntryHandler#checkAbnormalFuture
TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue();
if (!future.isTimeOut()) {
    continue;
}
if (index < minFastForwardIndex) {
    minFastForwardIndex = index;
}

第四步:主要处理待追加日志的序号大于endIndex+1的情况,可以认为有追加积压,处理要点如下。

1)如果挂起时间(排队时间)未超时,则继续检查下一条待追加日志。 2)如果已经超时,说明该日志没有正常写入从节点,则记录其日志序号,然后向主节点汇报,因为这里是在遍历检查每一条待追加日志,所以最终需要反馈的是最小超时的日志序号,如代码清单9-79所示。

代码清单9-79 EntryHandler#checkAbnormalFuture
if (minFastForwardIndex == Long.MAX_VALUE) {
    return;
}
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);
if (pair == null) {
    return;
}
logger.warn("[PushFastForward] ledgerEndIndex = {}entryIndex = {}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pairgetKey(), DLedgerResponseCode.INCONSISTENT_STATE.g etCode()));

第五步:如果找到已经超时的待追加请求,则向Leader节点返回错误码INCONSISTENT_STATE,即快速失败机制,尽快通知Leader节点与从节点进行数据比对,使主从数据保持一致性。

日志复制流程如图9-16所示。

image 2025 02 06 17 53 47 729
Figure 7. 图9-16 从节点日志复制流程图

日志复制仲裁

Raft协议判断一条日志写入成功的标准是集群中超过半数的节点存储了该日志,Leader节点首先存储数据,然后异步向它所有的从节点推送日志。不需要所有的从节点都返回日志追加成功才认为是成功写入,故Leader节点需要对返回结果进行仲裁,这部分功能主要由QuorumAckChecker实现。

QuorumAckChecker类图

QuorumAckChecker的类图如图9-17所示。

image 2025 02 06 18 08 23 942
Figure 8. 图9-17 QuorumAckChecker类图

下面逐一介绍上述核心类及核心属性。

  • long lastPrintWatermarkTimeMs:上次打印水位线的时间戳,单位为ms。

  • long lastCheckLeakTimeMs:上次检测泄露的时间戳,单位为ms。

  • long lastQuorumIndex:已投票仲裁的日志序号。

日志仲裁流程

日志仲裁流程主要封装在QuorumAckChecker的doWork()方法中,如代码清单9-80所示。

代码清单9-80 QuorumAckChecker#doWork
if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
    logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCo
            mmittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
    lastPrintWatermarkTimeMs = System.currentTimeMillis();
}

第一步:主要是输出日志相关日志。如果距上一次打印日志的时间超过3s,则输出当前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 等日志,如代码清单 9-81 所示。

代码清单9-81 QuorumAckChecker#doWork
if (!memberState.isLeader()) {
    waitForRunning(1);
    return;
}

第二步:如果当前节点不是Leader,直接返回,如代码清单9-82所示。

代码清单9-82 QuorumAckChecker#doWork
if (pendingAppendResponsesByTerm.size() > 1) {
    for (Long term : pendingAppendResponsesByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
            futureEntry.getValue().complete(response);
        }
        pendingAppendResponsesByTerm.remove(term);
    }
}

第三步:清除已过期被挂起的请求,向客户端返回错误码TERM_CHANGED,所谓的过期请求就是投票轮次与当前投票轮次不同,如代码清单9-83所示。

代码清单9-83 QuorumAckChecker#doWork
if (peerWaterMarksByTerm.size() > 1) {
    for (Long term : peerWaterMarksByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        peerWaterMarksByTerm.remove(term);
    }
}

第四步:清除已过期的日志复制水位线,即投票轮次不为当前投票轮次的所有复制水位线都是过期数据,及时清除以避免内存泄露,如代码清单9-84所示。内存结构为Map<Long/投票轮次/, ConcurrentMap<String/节点nodeId/, Long/从节点对应已存储日志序号/>>,这个数据结构是进行日志复制仲裁的关键,即主节点收到从节点的日志复制响应后,会将复制进度存储在该数据结构中。

代码清单9-84 QuorumAckChecker#doWork
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {
    int num = 0;
    for (Long another : peerWaterMarks.values()) {
        if (another >= index) {
            num++;
        }
    }
    if (memberState.isQuorum(num) && index > quorumIndex) {
        quorumIndex = index;
    }
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);

第五步:根据各个节点的日志复制进度进行仲裁,确定已提交日志的序号,日志一旦提交,就可以向客户端返回写入成功。为了更容易理解仲裁逻辑,我们先以3个节点集群为例,展示 peerWaterMarksByTerm 的内存结构,如图9-18所示。

image 2025 02 06 18 16 57 975
Figure 9. 图9-18 peerWaterMarksByTerm内存结构图

结合图9-18来看一下日志仲裁的实现关键点,如代码清单9-85所示。

1)遍历peerWaterMarks的value集合,即{101, 101,100},用临时变量index来记录待投票的日志序号,集群内超过半数节点的已复制序号超过该值,则该日志能被确认提交。 2)遍历peerWaterMarks中value集合,与index进行比较,如果已提交序号大于、等于待投票的日志序号(index),则num加1,表示该日志进度的已复制节点数加1。 3)对index进行仲裁,如果超过半数节点已成功负责仲裁,并且index大于quorumIndex,更新quorumIndex的值为index。quorumIndex是peerWaterMarks最终最大的已提交日志序号,如图9-18所示的数据,quorumIndex的值为101。 4)更新committedIndex索引,方便DLedgerStore定时将committedIndex写入checkpoint。

代码清单9-85 QuorumAckChecker#doWork
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses =
        pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex >= 0) {
    for (Long i = quorumIndex; i >= 0; i--) {
        try {
            CompletableFuture<AppendEntryResponse> future = responses.remove(i);
            if (future == null) {
                needCheck = lastQuorumIndex != -1 && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;
                break;
            } else if (!future.isDone()) {
                AppendEntryResponse response = new AppendEntryResponse();
                response.setGroup(memberState.getGroup());
                response.setTerm(currTerm);
                response.setIndex(i);
                response.setLeaderId(memberState.getSelfId());
                response.setPos(((AppendFuture) future).getPos());
                future.complete(response);
            }
            ackNum++;
        } catch (Throwable t) {
            logger.error("Error in ack to index={} term={}", i, currTerm, t);
        }
    }
}

第六步:处理quorumIndex(已提交指针)之前的挂起请求。唤醒该日志序号之前挂起的请求,向客户端发送成功响应,如代码清单9-86所示,这里是Future模式,实现关键点如下。

1)从quorumIndex开始倒推,逐条向客户端返回写入成功请求。 2)从ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>中移除对应的条目,并得到待通知的Future对象。 3)如果未找到quorumIndex挂起请求,说明前面挂起的请求已经全部处理完毕,则结束本次通知。结束之前再判断一次是否需要进行泄露检测,依据如下(三个条件必须同时满足)。

本次不是第一次进行日志复制仲裁。 上一次仲裁的日志序号与本次仲裁的日志序号不相同,即本次执行新的日志。 本次最后一条唤醒的日志序号与上一次仲裁的日志序号不相同,说明两次仲裁的日志不连续,需要对小于已仲裁日志序号的日志进行响应。

4)调用future.complete方法向客户端返回响应结果,并将ackNum加1,表示本次仲裁向客户端返回响应结果的数量。

代码清单9-86 QuorumAckChecker#doWork
if (ackNum == 0) {
    for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {
        TimeoutFuture<AppendEntryResponse> future = responses.get(i);
        if (future == null) {
            break;
        } else if (future.isTimeOut()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.get Code());
            response.setTerm(currTerm);
            response.setIndex(i);
            response.setLeaderId(memberState.getSelfId());
            future.complete(response);
        } else {
            break;
        }
    }
    waitForRunning(1);
}

第七步:如果本次日志仲裁没有日志被成功追加,则检查被挂起的追加请求,判断其日志序号大于已仲裁的日志序号是否超时,如果超时,向客户端返回错误码WAIT_QUORUM_ACK_TIMEOUT,如代码清单9-87所示。

代码清单9-87 QuorumAckChecker#doWork
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
    updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
    for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry :responses.entrySet()){
        if (futureEntry.getKey() < quorumIndex) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setTerm(currTerm);
            response.setIndex(futureEntry.getKey());
            response.setLeaderId(memberState.getSelfId());
            response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
            futureEntry.getValue().complete(response);
            responses.remove(futureEntry.getKey());
        }
    }
    lastCheckLeakTimeMs = System.currentTimeMillis();
}

第八步:进行日志追加挂起请求泄露检测,主要是遍历已挂起的请求,如果日志序号小于已仲裁的序号,向客户端返回成功,将其移出待挂起队列。

日志仲裁的流程就分析到这里了,为了更加清晰的展示实现原理,流程图如图9-19所示。

image 2025 02 06 18 24 17 317
Figure 10. 图9-19 日志仲裁流程图