RocketMQ DLedger主从切换之日志追加

Raft协议负责组主要包含两个步骤:Leader选举和日志复制。使用Raft协议的集群在向外提供服务之前需要先在集群中进行Leader选举,推举一个主节点接受客户端的读写请求。Raft协议负责组的其他节点只需要复制数据,不对外提供服务。当Leader节点接受客户端的写请求后,先将数据存储在Leader节点上,然后将日志数据广播给它的从节点,只有超过半数的节点都成功存储了该日志,Leader节点才会向客户端返回写入成功。

本节将详细探讨Leader节点在接受客户端请求后是如何存储数据的,9.6节将详细介绍日志复制过程。

日志追加流程概述

在详细探讨RocketMQ DLedger日志追加流程之前,我们先了解一下客户端发送日志的请求协议字段,其类图如图9-9所示。

下面逐一介绍上述核心类及核心属性。

  • String group:Raft 复制组所属组名。

  • String remoteId:请求目的节点ID。

  • String localId:发起请求节点ID。

  • int code:请求响应字段,表示返回响应码。

  • String leaderId:集群中的 Leader 节点ID。

  • long term:集群当前选举轮次。

  • byte[] body:待发送的数据。

image 2025 02 06 15 07 07 088
Figure 1. 图9-9 RocketMQ日志追加请求类图

Leader节点处理日志写入请求的入口为DLedgerServer的handleAppend()方法。接下来我们详细介绍该方法的实现逻辑,如代码清单9-26所示。

代码清单9-26 DLedgerServer#handleAppend
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()),
        DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(),
        memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

第一步:验证请求的合理性,如代码清单9-27所示。

  • 如果请求目的节点不是当前节点,则抛出异常。

  • 如果请求的集群不是当前节点所在的集群,则抛出异常。

  • 如果当前节点不是Leader节点,则抛出异常。

代码清单9-27 DLedgerServer#handleAppend
long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
    AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
    appendEntryResponse.setGroup(memberState.getGroup());
    appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
    appendEntryResponse.setTerm(currTerm);
    appendEntryResponse.setLeaderId(memberState.getSelfId());
    return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
    DLedgerEntry dLedgerEntry = new DLedgerEntry();
    dLedgerEntry.setBody(request.getBody());
    DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
    return dLedgerEntryPusher.waitAck(resEntry);
}

第二步:消息的追加是一个异步过程,会将内容暂存到内存队列中。首先检查内存队列是否已满,如果已满则向客户端返回错误码,表示本次消息发送失败。如果队列未满,则先将数据追加到Leader节点的PageCache中,然后转发给Leader的所有从节点,最后Leader节点等待从节点日志复制的结果。上述消息追加主要包括如下步骤。

1)判断推送队列是否已满(DLedgerEntryPusher的isPendingFull()方法)。 2)Leader节点日志存储(DLedgerStore的appendAsLeader()方法)。 3)Leader节点等待从节点日志复制响应ACK。

判断 Push 队列是否已满

判断推送队列是否已满,如代码清单9-28所示。

代码清单9-28 DLedgerEntryPusher#isPendingFull
public boolean isPendingFull(long currTerm) {
    checkTermForPendingMap(currTerm, "isPendingFull");
    return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
}

pendingAppendResponsesByTerm的数据存储格式为Map< Long/* 投票轮次*/,ConcurrentMap <Long/日志序号/, TimeoutFuture< AppendEntryResponse>>>,该方法表示每一个投票轮次积压(未提交)的日志数量默认不能超过10000条,可通过配置参数maxPendingRequestsNum来改变默认值,即队列的长度默认为10000。

Leader节点日志存储

Leader节点的数据存储主要由DLedgerStore的appendAsLeader()方法实现。DLedger提供了基于内存和基于文件两种持久化实现,本节重点关注基于文件的存储实现方法,其实现类为DLedgerMmapFileStore。

下面重点分析数据存储流程,入口为DLedgerMmapFileStore的appendAsLeader()方法,如代码清单9-29所示。

代码清单9-29 DLedgerMmapFileStore#appendAsLeader
PreConditions.check(memberState.isLeader(),DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);

第一步:判断是否可以追加数据,如代码清单9-30所示,其判断依据有如下两点。

  • 当前节点的角色是否为Leader,如果不是,则抛出异常。

  • 当前节点磁盘是否已满,判断依据是DLedger的根目录或数据文件目录的使用率是否超过了允许使用的最大值,默认值为85%。

代码清单9-30 DLedgerMmapFileStore#appendAsLeader
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();

第二步:从本地线程变量获取一个数据Buffer与索引Buffer。其中用于存储数据的ByteBuffer容量固定为4MB,索引的ByteBuffer为两个索引条目的长度,固定为64字节,如代码清单9-31所示。

代码清单9-31 DLedgerMmapFileStore#appendAsLeader
DLedgerEntryCoder.encode(entry,dataBuffer);

public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
    byteBuffer.clear();
    int size = entry.computSizeInBytes();
    byteBuffer.putInt(entry.getMagic());
    byteBuffer.putInt(size);
    byteBuffer.putLong(entry.getIndex());
    byteBuffer.putLong(entry.getTerm());
    byteBuffer.putLong(entry.getPos());
    byteBuffer.putInt(entry.getChannel());
    byteBuffer.putInt(entry.getChainCrc());
    byteBuffer.putInt(entry.getBodyCrc());
    byteBuffer.putInt(entry.getBody().length);
    byteBuffer.put(entry.getBody());
    byteBuffer.flip();
}

第三步:对客户端发送的日志进行编码,按照RocketMQ DLedger存储协议进行封装,如代码清单9-32所示。

代码清单9-32 DLedgerMmapFileStore#appendAsLeader
synchronized (memberState) {
    PreConditions.check(memberState.isLeader(),DLedgerResponseCode.NOT_LEADER, null);
    //省略部分代码
}

第四步:锁定状态机,再一次检测节点的状态是否是Leader,如代码清单9-33所示。

代码清单9-33 DLedgerMmapFileStore#appendAsLeader
long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex,memberState.currTerm(), CURRENT_MAGIC);

第五步:为当前日志条目设置序号,即设置 entryIndex 与 entryTerm(投票轮次),并将魔数、entryIndex、entryTerm等写入byteBuffer,如代码清单9-34所示。

关于日志存储有一个非常重要的概念,即Raft会为Leader节点收到的每一条数据在服务端维护一个递增的日志序号,即为每一条数据生成了一个唯一的标记。

代码清单9-34 DLedgerMmapFileStore#appendAsLeader
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);

第六步:计算消息的起始物理偏移量,与CommitLog文件的物理偏移量设计思想相同,将该偏移量写入日志的byteBuffer,如代码清单9-35所示。关于dataFileList的preAppend实现后续将详细介绍。

代码清单9-35 DLedgerMmapFileStore#appendAsLeader
for (AppendHook writeHook : appendHooks) {
    writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}

第七步:执行日志追加的钩子函数,如代码清单9-36所示。

代码清单9-36 DLedgerMmapFileStore#appendAsLeader
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);

第八步:调用DataFileList的append方法,将日志先追加到PageCache中,如代码清单9-37所示。

代码清单9-37 DLedgerMmapFileStore#appendAsLeader
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);

第九步:构建索引条目并追加到PageCache中,如代码清单9-38所示。

代码清单9-38 DLedgerMmapFileStore#appendAsLeader
ledgerEndIndex++;
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
    ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();

第十步:一条消息被追加后,日志序号增加1,并更新当前节点状态机的 leaderEndIndex 与当前投票轮次。

日志追加到Leader节点的PageCache后,将异步转发给它所有的从节点,然后等待各从节点的反馈,并对这些反馈结果进行仲裁,只有集群内超过半数的节点存储了该条日志,Leader节点才可以向客户端返回日志写入成功,日志的复制将在9.6节详细介绍,在介绍Leader节点如何等待从节点复制、响应ACK之前,我们再介绍一下与存储相关的两个核心方法:DataFileList的preAppend()与append()方法。

DataFileList的preAppend()方法详解

DataFileList的preAppend()方法为预写入,主要是根据当前日志的长度计算该条日志的物理偏移量,如代码清单9-39所示。

代码清单9-39 DataFileList#preAppend
public long preAppend(int len, boolean useBlank)

先介绍一下参数的含义。

  • int len:需要申请的长度。

  • boolean useBlank:是否需要填充,默认为true。

第一步:获取最后一个文件,其机制与RocketMQ存储模块的MappedFileQueue类似,就不再展开讲述了,如代码清单9-40所示。

代码清单9-40 DataFileList#preAppend
MmapFile mappedFile = getLastMappedFile();
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = getLastMappedFile(0);
}

第二步:如果当前文件剩余空间已不足以存放一条消息的处理逻辑,则执行代码清单9-41所示的处理,其实现的关键点如下。

代码清单9-41 DataFileList#preAppend
if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {
    if (blank < MIN_BLANK_LEN) {
        logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
        return -1;
    } else {
        ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize()
                - mappedFile.getWrotePosition());
        byteBuffer.putInt(BLANK_MAGIC_CODE);
        byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
        if (mappedFile.appendMessage(byteBuffer.array())) {
            mappedFile.setWrotePosition(mappedFile.getFileSize());
        } else {
            logger.error("Append blank error for {}", storePath);
            return -1;
        }
        mappedFile = getLastMappedFile(0);
        if (null == mappedFile) {
            logger.error("Create mapped file for {}", storePath);
            return -1;
        }
    }
}

1)如果当前文件剩余的空间少于 MIN_BLANK_LEN,将返回 -1,表示存储错误,需要人工干预,正常情况下是不会出现这种情况的,因为写入一条消息之前会确保能容纳待写入的消息,并且还需要空余 MIN_BLANK_LEN 个字节,因为一个独立的物理文件,默认会填充文件结尾魔数(BLANK_MAGIC_CODE)。

2)如果空余空间大于 MIN_BLANK_LEN,会首先写入文件结尾魔数(4字节),然后将该文件剩余的字节数写入接下来的4个字节,表示该文件全部用完。

第三步:如果当前文件有剩余的空间容纳当前日志,则返回待写入消息的物理起始偏移量,如代码清单9-42所示。

return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();

DataFileList的append()方法详解

DataFileList的append()方法主要实现将消息写入pagecache,其关键点就是先根据待写入偏移量获取待写入的文件,用MmapFile表示,然后向该文件追加消息,接下来我们主要看一下其追加实现逻辑,如代码清单9-43所示。

代码清单9-43 DataFileList#append
public boolean appendMessage(final byte[] data, final int offset, final int length) {
    int currentPos = this.wrotePosition.get();
    if ((currentPos + length) <= this.fileSize) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        byteBuffer.put(data, offset, length);
        this.wrotePosition.addAndGet(length);
        return true;
    }
    return false;
}

该方法的主要工作是将消息写入 mappedByteBuffer,该对象是通过 FileChannel 的 map() 方法创建的,即日志首先写入 pageCache。

Leader节点等待从节点日志复制响应ACK

Leader节点等待日志复制主要由DLedgerEntryPusher的waitAck()方法实现,如代码清单9-44所示。

代码清单9-44 DLedgerEntryPusher#waitAck
updatePeerWaterMark(entry.getTerm(),m.getSelfId(),entry.getIndex());
private void updatePeerWaterMark(long term, String peerId, long index) {
    synchronized (peerWaterMarksByTerm) {
        checkTermForWaterMark(term, "updatePeerWaterMark");
        if (peerWaterMarksByTerm.get(term).get(peerId) < index) {
            peerWaterMarksByTerm.get(term).put(peerId, index);
        }
    }
}

第一步:Leader 节点首先更新自身的水位线,我们先来看一下 peerWaterMarksByTerm 的数据结构:Map<Long/team,投票轮次/, ConcurrentMap<String/节点编号/, Long/日志序号/>>,即以投票轮次为维度,存储复制组内每一个节点当前已存储(已追加到 PageCache))的数据的日志序号,其主要作用是判断已提交日志的序号,该数据结构用于计算一条日志是否已被超过半数的节点存储,如代码清单9-45所示。

代码清单9-45 DLedgerEntryPusher#waitAck
if (memberState.getPeerMap().size() == 1) {
    AppendEntryResponse response = new AppendEntryResponse();
    response.setGroup(memberState.getGroup());
    response.setLeaderId(memberState.getSelfId()); response.setIndex(entry.getIndex());
    response.setTerm(entry.getTerm());
    response.setPos(entry.getPos());
    return AppendFuture.newCompletedFuture(entry.getPos(), response);
}

第二步:如果一个复制集群中只有一台机器,则直接返回成功响应,如代码清单9-46所示。

代码清单9-46 DLedgerEntryPusher#waitAck
checkTermForPendingMap(entry.getTerm(), "waitAck");
AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
future.setPos(entry.getPos());
CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);

第三步:这里是一种通用的异步编程技巧,Leader节点需要等待从节点复制完数据,通常会返回给客户端一个Future对象,客户端可以调用该Future的get()方法同步等待结果,而服务端会将日志序号作为键,Future作为值存储在内存(ConcurrentMap)中。当结果异步返回后,需要根据日志序号找到对应的Future对象,最后填充结果,以便客户端被唤醒,从而得到响应结果,如代码清单9-47所示。

代码清单9-47 DLedgerEntryPusher#waitAck
wakeUpDispatchers();
public void wakeUpDispatchers () {
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
        dispatcher.wakeup();
    }
}

第四步:唤醒日志转发线程,即将Leader节点中的数据推送到各个从节点,Leader节点在启动时会为每一个从节点单独创建一个EntryDispatcher线程,单独异步转发日志到从节点,这部分内容将在9.6节详细介绍。