ConsumeQueue 与 Index 文件恢复
RocketMQ是将消息全量存储在CommitLog文件中,并异步生成转发任务更新ConsumeQueue文件、Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,就会导致CommitLog文件、ConsumeQueue文件、Index文件中的数据不一致。如果不加以人工修复,会有一部分消息即便在CommitLog文件中存在,由于并没有转发到ConsumeQueue文件,也永远不会被消费者消费。那RocketMQ是如何使文件达到最终一致性的呢?下面详细分析RocketMQ关于存储文件的加载流程来一窥端倪,如代码清单4-58所示。
boolean lastExitOK = !this.isTempFileExist();
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
第一步:判断上一次退出是否正常。其实现机制是Broker在启动时创建 ${ROCKET_HOME}/store/abort 文件,在退出时通过注册JVM钩子函数删除abort文件。如果下一次启动时存在abort文件。说明Broker是异常退出的,CommitLog与ConsumeQueue数据有可能不一致,需要进行修复,如代码清单4-59所示。
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
第二步:加载延迟队列,如代码清单4-60所示。
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
return true;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
第三步:加载CommitLog文件,加载${ROCKET_HOME}/store/commitlog目录下所有文件并按照文件名进行排序。如果文件与配置文件的单个文件大小不一致,将忽略该目录下的所有文件,然后创建MappedFile对象。注意load()方法将wrotePosition、flushedPosition、committedPosition三个指针都设置为文件大小。
第四步:加载消息消费队列,调用DefaultMessageStore#loadConsumeQueue,其思路与CommitLog大体一致,遍历消息消费队列根目录,获取该Broker存储的所有主题,然后遍历每个主题目录,获取该主题下的所有消息消费队列,最后分别加载每个消息消费队列下的文件,构建ConsumeQueue对象,主要初始化ConsumeQueue的topic、queueId、storePath、mappedFileSize属性,如代码清单4-61所示。
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.
getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
第五步:加载并存储checkpoint文件,主要用于记录CommitLog文件、ConsumeQueue文件、Inde文件的刷盘点,如代码清单4-62所示。
for (File file : files) {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
f.load();
if (!lastExitOK) {
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
f.destroy(0);
continue;
}
}
}
// 省略异常代码
第六步:加载Index文件,如果上次异常退出,而且Index文件刷盘时间小于该文件最大的消息时间戳,则该文件将立即销毁,如代码清单4-63所示。
private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue();
if (lastExitOK) {
this.commitLog.recoverNormally();
} else {
this.commitLog.recoverAbnormally();
}
this.recoverTopicQueueTable();
}
第七步:根据Broker是否为正常停止,执行不同的恢复策略,下文将分别介绍异常停止、正常停止的文件恢复机制,如代码清单4-64所示。
private void recoverTopicQueueTable() {
HashMap<String /* topic-queueid */, Long /* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
第八步:恢复ConsumeQueue文件后,将在CommitLog实例中保存每个消息消费队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID还存储了消息队列偏移量的关键所在。
Broker正常停止文件恢复
从图4-20可以看出,Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息,其结构如下。
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;
if (index < 0) {
index = 0;
}
// 省略部分代码
}
第一步:Broker正常停止再重启时,从倒数第3个文件开始恢复,如果不足3个文件,则从第一个文件开始恢复。checkCRCOnRecover参数用于在进行文件恢复时查找消息是否验证CRC,如代码清单4-66所示。
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
第二步:解释一下两个局部变量,mappedFileOffset为当前文件已校验通过的物理偏移量,processOffset为CommitLog文件已确认的物理偏移量,等于mappedFile.getFileFromOffset加上mappedFileOffset,如代码清单4-67所示。
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
} else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
} else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
第三步:遍历CommitLog文件,每次取出一条消息,如果查找结果为true并且消息的长度大于0,表示消息正确,mappedFileOffset指针向前移动本条消息的长度。如果查找结果为true并且消息的长度等于0,表示已到该文件的末尾,如果还有下一个文件,则重置processOffset、mappedFileOffset并重复上述步骤,否则跳出循环;如果查找结果为false,表明该文件未填满所有消息,则跳出循环,结束遍历文件,如代码清单4-68所示。
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
第四步:更新MappedFileQueue的flushedWhere和committedPosition指针,如代码清单4-69所示。
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
第五步:删除offset之后的所有文件。遍历目录下的文件,如果文件的尾部偏移量小于offset则跳过该文件,如果尾部的偏移量大于offset,则进一步比较offset与文件的开始偏移量。如果offset大于文件的起始偏移量,说明当前文件包含了有效偏移量,设置MappedFile的flushedPosition和committedPosition。如果offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,则调用MappedFile#destory方法释放MappedFile占用的内存资源(内存映射与内存通道等),然后加入待删除文件列表中,最终调用deleteExpiredFile将文件从物理磁盘上删除。过期文件的删除将在4.9节详细介绍。
Broker异常停止文件恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复与正常停止文件恢复的步骤基本相同,主要差别有两个:首先,Broker正常停止默认从倒数第三个文件开始恢复,而异常停止则需要从最后一个文件倒序推进,找到第一个消息存储正常的文件;其次,如果CommitLog目录没有消息文件,在ConsuneQueue目录下存在的文件则需要销毁。
如何判断一个消息文件是否正确呢?请看代码清单4-70。
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
第一步:判断文件的魔数,如果不是MESSAGE_MAGIC_CODE,则返回false,表示该文件不符合CommitLog文件的存储格式,如代码清单4-71所示。
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
return false;
}
第二步:如果文件中第一条消息的存储时间等于0,则返回false,说明该消息的存储文件中未存储任何消息,如代码清单4-72所示。
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
return true;
}
} else if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
return true;
}
第三步:对比文件第一条消息的时间戳与检测点。如果文件第一条消息的时间戳小于文件检测点,说明该文件的部分消息是可靠的,则从该文件开始恢复。checkpoint文件中保存了CommitLog、ConsumeQueue、Index的文件刷盘点,RocketMQ默认选择CommitLog文件与ConsumeQueue这两个文件的刷盘点中较小值与CommitLog文件第一条消息的时间戳做对比,如果messageIndexEnable为true,表示Index文件的刷盘时间点也参与计算。
第四步:如果根据前3步算法找到MappedFile,则遍历MappedFile中的消息,验证消息的合法性,并将消息重新转发到ConsumeQueue与Index文件,该步骤在4.7.1节中已详细说明。
第五步:如果未找到有效的MappedFile,则设置CommitLog目录的flushedWhere、committedWhere指针都为0,并销毁ConsumeQueue文件,如代码清单4-73所示。
public void destroy() {
this.maxPhysicOffset = -1;
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
this.consumeQueueExt.destroy();
}
}
重置ConsumeQueue的maxPhysicOffset与minLogicOffset,然后调用MappedFileQueue的destory()方法将ConsumeQuene目录下的文件全部删除。 存储启动时所谓的文件恢复主要完成flushedPosition、committedWhere指针的设置、将消息消费队列最大偏移量加载到内存,并删除flushedPosition之后所有的文件。如果Broker异常停止,在文件恢复过程中,RocketMQ会将最后一个有效文件中的所有消息重新转发到ConsumeQueue和Index文件中,确保不丢失消息,但同时会带来消息重复的问题。纵观RocktMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。