RocketMQ DLedger主从切换之日志追加
Raft协议负责组主要包含两个步骤:Leader选举和日志复制。使用Raft协议的集群在向外提供服务之前需要先在集群中进行Leader选举,推举一个主节点接受客户端的读写请求。Raft协议负责组的其他节点只需要复制数据,不对外提供服务。当Leader节点接受客户端的写请求后,先将数据存储在Leader节点上,然后将日志数据广播给它的从节点,只有超过半数的节点都成功存储了该日志,Leader节点才会向客户端返回写入成功。
本节将详细探讨Leader节点在接受客户端请求后是如何存储数据的,9.6节将详细介绍日志复制过程。
日志追加流程概述
在详细探讨RocketMQ DLedger日志追加流程之前,我们先了解一下客户端发送日志的请求协议字段,其类图如图9-9所示。
下面逐一介绍上述核心类及核心属性。
-
String group:Raft 复制组所属组名。
-
String remoteId:请求目的节点ID。
-
String localId:发起请求节点ID。
-
int code:请求响应字段,表示返回响应码。
-
String leaderId:集群中的 Leader 节点ID。
-
long term:集群当前选举轮次。
-
byte[] body:待发送的数据。

Leader节点处理日志写入请求的入口为DLedgerServer的handleAppend()方法。接下来我们详细介绍该方法的实现逻辑,如代码清单9-26所示。
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()),
DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(),
memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
第一步:验证请求的合理性,如代码清单9-27所示。
-
如果请求目的节点不是当前节点,则抛出异常。
-
如果请求的集群不是当前节点所在的集群,则抛出异常。
-
如果当前节点不是Leader节点,则抛出异常。
long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setGroup(memberState.getGroup());
appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
return dLedgerEntryPusher.waitAck(resEntry);
}
第二步:消息的追加是一个异步过程,会将内容暂存到内存队列中。首先检查内存队列是否已满,如果已满则向客户端返回错误码,表示本次消息发送失败。如果队列未满,则先将数据追加到Leader节点的PageCache中,然后转发给Leader的所有从节点,最后Leader节点等待从节点日志复制的结果。上述消息追加主要包括如下步骤。
1)判断推送队列是否已满(DLedgerEntryPusher的isPendingFull()方法)。 2)Leader节点日志存储(DLedgerStore的appendAsLeader()方法)。 3)Leader节点等待从节点日志复制响应ACK。
判断 Push 队列是否已满
判断推送队列是否已满,如代码清单9-28所示。
public boolean isPendingFull(long currTerm) {
checkTermForPendingMap(currTerm, "isPendingFull");
return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
}
pendingAppendResponsesByTerm的数据存储格式为Map< Long/* 投票轮次*/,ConcurrentMap <Long/日志序号/, TimeoutFuture< AppendEntryResponse>>>,该方法表示每一个投票轮次积压(未提交)的日志数量默认不能超过10000条,可通过配置参数maxPendingRequestsNum来改变默认值,即队列的长度默认为10000。
Leader节点日志存储
Leader节点的数据存储主要由DLedgerStore的appendAsLeader()方法实现。DLedger提供了基于内存和基于文件两种持久化实现,本节重点关注基于文件的存储实现方法,其实现类为DLedgerMmapFileStore。
下面重点分析数据存储流程,入口为DLedgerMmapFileStore的appendAsLeader()方法,如代码清单9-29所示。
PreConditions.check(memberState.isLeader(),DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
第一步:判断是否可以追加数据,如代码清单9-30所示,其判断依据有如下两点。
-
当前节点的角色是否为Leader,如果不是,则抛出异常。
-
当前节点磁盘是否已满,判断依据是DLedger的根目录或数据文件目录的使用率是否超过了允许使用的最大值,默认值为85%。
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();
第二步:从本地线程变量获取一个数据Buffer与索引Buffer。其中用于存储数据的ByteBuffer容量固定为4MB,索引的ByteBuffer为两个索引条目的长度,固定为64字节,如代码清单9-31所示。
DLedgerEntryCoder.encode(entry,dataBuffer);
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
byteBuffer.clear();
int size = entry.computSizeInBytes();
byteBuffer.putInt(entry.getMagic());
byteBuffer.putInt(size);
byteBuffer.putLong(entry.getIndex());
byteBuffer.putLong(entry.getTerm());
byteBuffer.putLong(entry.getPos());
byteBuffer.putInt(entry.getChannel());
byteBuffer.putInt(entry.getChainCrc());
byteBuffer.putInt(entry.getBodyCrc());
byteBuffer.putInt(entry.getBody().length);
byteBuffer.put(entry.getBody());
byteBuffer.flip();
}
第三步:对客户端发送的日志进行编码,按照RocketMQ DLedger存储协议进行封装,如代码清单9-32所示。
synchronized (memberState) {
PreConditions.check(memberState.isLeader(),DLedgerResponseCode.NOT_LEADER, null);
//省略部分代码
}
第四步:锁定状态机,再一次检测节点的状态是否是Leader,如代码清单9-33所示。
long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex,memberState.currTerm(), CURRENT_MAGIC);
第五步:为当前日志条目设置序号,即设置 entryIndex 与 entryTerm(投票轮次),并将魔数、entryIndex、entryTerm等写入byteBuffer,如代码清单9-34所示。
关于日志存储有一个非常重要的概念,即Raft会为Leader节点收到的每一条数据在服务端维护一个递增的日志序号,即为每一条数据生成了一个唯一的标记。 |
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);
第六步:计算消息的起始物理偏移量,与CommitLog文件的物理偏移量设计思想相同,将该偏移量写入日志的byteBuffer,如代码清单9-35所示。关于dataFileList的preAppend实现后续将详细介绍。
for (AppendHook writeHook : appendHooks) {
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}
第七步:执行日志追加的钩子函数,如代码清单9-36所示。
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
第八步:调用DataFileList的append方法,将日志先追加到PageCache中,如代码清单9-37所示。
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
第九步:构建索引条目并追加到PageCache中,如代码清单9-38所示。
ledgerEndIndex++;
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();
第十步:一条消息被追加后,日志序号增加1,并更新当前节点状态机的 leaderEndIndex 与当前投票轮次。
日志追加到Leader节点的PageCache后,将异步转发给它所有的从节点,然后等待各从节点的反馈,并对这些反馈结果进行仲裁,只有集群内超过半数的节点存储了该条日志,Leader节点才可以向客户端返回日志写入成功,日志的复制将在9.6节详细介绍,在介绍Leader节点如何等待从节点复制、响应ACK之前,我们再介绍一下与存储相关的两个核心方法:DataFileList的preAppend()与append()方法。
DataFileList的preAppend()方法详解
DataFileList的preAppend()方法为预写入,主要是根据当前日志的长度计算该条日志的物理偏移量,如代码清单9-39所示。
public long preAppend(int len, boolean useBlank)
先介绍一下参数的含义。
-
int len:需要申请的长度。
-
boolean useBlank:是否需要填充,默认为true。
第一步:获取最后一个文件,其机制与RocketMQ存储模块的MappedFileQueue类似,就不再展开讲述了,如代码清单9-40所示。
MmapFile mappedFile = getLastMappedFile();
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = getLastMappedFile(0);
}
第二步:如果当前文件剩余空间已不足以存放一条消息的处理逻辑,则执行代码清单9-41所示的处理,其实现的关键点如下。
if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {
if (blank < MIN_BLANK_LEN) {
logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
return -1;
} else {
ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize()
- mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
if (mappedFile.appendMessage(byteBuffer.array())) {
mappedFile.setWrotePosition(mappedFile.getFileSize());
} else {
logger.error("Append blank error for {}", storePath);
return -1;
}
mappedFile = getLastMappedFile(0);
if (null == mappedFile) {
logger.error("Create mapped file for {}", storePath);
return -1;
}
}
}
1)如果当前文件剩余的空间少于 MIN_BLANK_LEN,将返回 -1,表示存储错误,需要人工干预,正常情况下是不会出现这种情况的,因为写入一条消息之前会确保能容纳待写入的消息,并且还需要空余 MIN_BLANK_LEN 个字节,因为一个独立的物理文件,默认会填充文件结尾魔数(BLANK_MAGIC_CODE)。
2)如果空余空间大于 MIN_BLANK_LEN,会首先写入文件结尾魔数(4字节),然后将该文件剩余的字节数写入接下来的4个字节,表示该文件全部用完。
第三步:如果当前文件有剩余的空间容纳当前日志,则返回待写入消息的物理起始偏移量,如代码清单9-42所示。
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
DataFileList的append()方法详解
DataFileList的append()方法主要实现将消息写入pagecache,其关键点就是先根据待写入偏移量获取待写入的文件,用MmapFile表示,然后向该文件追加消息,接下来我们主要看一下其追加实现逻辑,如代码清单9-43所示。
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
if ((currentPos + length) <= this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
byteBuffer.put(data, offset, length);
this.wrotePosition.addAndGet(length);
return true;
}
return false;
}
该方法的主要工作是将消息写入 mappedByteBuffer,该对象是通过 FileChannel 的 map() 方法创建的,即日志首先写入 pageCache。
Leader节点等待从节点日志复制响应ACK
Leader节点等待日志复制主要由DLedgerEntryPusher的waitAck()方法实现,如代码清单9-44所示。
updatePeerWaterMark(entry.getTerm(),m.getSelfId(),entry.getIndex());
private void updatePeerWaterMark(long term, String peerId, long index) {
synchronized (peerWaterMarksByTerm) {
checkTermForWaterMark(term, "updatePeerWaterMark");
if (peerWaterMarksByTerm.get(term).get(peerId) < index) {
peerWaterMarksByTerm.get(term).put(peerId, index);
}
}
}
第一步:Leader 节点首先更新自身的水位线,我们先来看一下 peerWaterMarksByTerm 的数据结构:Map<Long/team,投票轮次/, ConcurrentMap<String/节点编号/, Long/日志序号/>>,即以投票轮次为维度,存储复制组内每一个节点当前已存储(已追加到 PageCache))的数据的日志序号,其主要作用是判断已提交日志的序号,该数据结构用于计算一条日志是否已被超过半数的节点存储,如代码清单9-45所示。
if (memberState.getPeerMap().size() == 1) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getSelfId()); response.setIndex(entry.getIndex());
response.setTerm(entry.getTerm());
response.setPos(entry.getPos());
return AppendFuture.newCompletedFuture(entry.getPos(), response);
}
第二步:如果一个复制集群中只有一台机器,则直接返回成功响应,如代码清单9-46所示。
checkTermForPendingMap(entry.getTerm(), "waitAck");
AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
future.setPos(entry.getPos());
CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
第三步:这里是一种通用的异步编程技巧,Leader节点需要等待从节点复制完数据,通常会返回给客户端一个Future对象,客户端可以调用该Future的get()方法同步等待结果,而服务端会将日志序号作为键,Future作为值存储在内存(ConcurrentMap)中。当结果异步返回后,需要根据日志序号找到对应的Future对象,最后填充结果,以便客户端被唤醒,从而得到响应结果,如代码清单9-47所示。
wakeUpDispatchers();
public void wakeUpDispatchers () {
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.wakeup();
}
}
第四步:唤醒日志转发线程,即将Leader节点中的数据推送到各个从节点,Leader节点在启动时会为每一个从节点单独创建一个EntryDispatcher线程,单独异步转发日志到从节点,这部分内容将在9.6节详细介绍。