RocketMQ DLedger主从切换之存储实现

RocketMQ DLedger的存储实现思路与RocketMQ的存储实现思路相似,本节不会详细介绍其实现原理,只是点到为止,抛砖引玉。

RocketMQ DLedger核心类及核心属性

RocketMQ DLedger类图如图9-6所示。

image 2025 02 06 14 50 52 496
Figure 1. 图9-6 RocketMQ DLedger存储类图

下面逐一介绍上述核心类及核心属性。

  1. DLedgerStore:存储抽象类,定义如下核心方法。

    • public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry):向主节点追加日志(数据)。

    • public abstract DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm,String leaderId):向从节点广播日志。

    • public abstract DLedgerEntry get(Long index):根据日志下标查找日志。

    • public abstract long getCommittedIndex():获取已提交的日志序号。

    • public abstract long getLedgerEndTerm():获取Leader节点当前最大的投票轮次。

    • public abstract long getLedgerEndIndex():获取Leader节点下一条日志写入的日志序号(最新日志的下标)。

    • public abstract long getLedgerBeginIndex():获取Leader节点第一条消息的日志序号。

    • public void updateCommittedIndex(long term, long committedIndex):更新commitedIndex的值,为空实现,由具体的存储子类实现。

    • protected void updateLedgerEndIndexAndTerm():更新Leader节点维护的ledgerEndIndex和ledgerEndTerm。

    • public void flush():刷盘,空方法,由具体子类实现。

    • public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId):删除日志,空方法,由具体子类实现。

    • public void startup():启动存储管理器,空方法,由具体子类实现。

    • public void shutdown():关闭存储管理器,空方法,由具体子类实现。

  2. DLedgerMemoryStore:DLedger基于内存实现的日志存储实现类。

  3. DLedgerMmapFileStore:基于文件内存映射机制的存储实现,核心属性如下。

    • long ledgerBeginIndex = -1:日志的起始序号,默认为-1。

    • long ledgerEndIndex = -1:下一条日志下标,默认为-1。

    • long committedIndex = -1:已提交的日志序号,默认为-1。

    • long ledgerEndTerm:当前最大的投票轮次。

  4. DLedgerConfig dLedgerConfig:DLedger的配置信息。

    • MemberState memberState:状态机。

    • MmapFileList dataFileList:日志文件(数据文件)的内存映射队列。

    • MmapFileList indexFileList:索引文件的内存映射文件集合。

    • ThreadLocal<ByteBuffer> localIndexBuffer:本地线程变量,用来缓存索引ByteBuffer。

    • ThreadLocal<ByteBuffer> localEntryBuffer:本地线程变量,用来缓存数据索引ByteBuffer。

    • FlushDataService flushDataService:数据文件刷盘线程。

    • CleanSpaceService cleanSpaceService:清除过期日志文件线程。

    • boolean isDiskFull = false:磁盘是否已满。

    • long lastCheckPointTimeMs:上一次检测点(时间戳)。

    • AtomicBoolean hasLoaded:是否已经加载,主要用来避免重复加载(初始化)日志文件。

    • AtomicBoolean hasRecovered:是否已恢复。

RocketMQ DLedger的上述核心类与RocketMQ存储模块的对应关系如表9-1所示。

image 2025 02 06 14 53 43 089
Figure 2. 表9-1 RocketMQ存储模块与DLedger模块

RocketMQ DLedger数据存储协议

RocketMQ DLedger数据存储协议如图9-7所示。

image 2025 02 06 14 54 10 508
Figure 3. 图9-7 RocketMQ DLedger数据存储格式

存储协议各个字段的含义如下。

1)magic:魔数,4字节。 2)size:条目总长度,包含header(协议头)+body(消息体),占4字节。 3)entryIndex:当前条目的日志序号,占8字节。 4)entryTerm:条目所属的投票轮次,占8字节。 5)pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。 6)channel:保留字段,当前版本未使用,占4字节。 7)chain crc:当前版本未使用,占4字节。 8)body crc:消息体的CRC校验和,用来区分数据是否损坏,占4字节。 9)body size:用来存储消息体的长度,占4个字节。 10)body:消息体的内容。

RocketMQ DLedger索引存储协议

RocketMQ DLedger索引的存储协议如图9-8所示。

image 2025 02 06 14 54 46 485
Figure 4. 图9-8 RocketMQ DLedger索引存储格式

存储协议中各个字段的含义如下。

  • magic:魔数。

  • pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。

  • size:条目长度。

  • entryIndex:当前条目的日志序号,占8字节。

  • entryTerm:条目所属的投票轮次,占8字节。