ConsumeQueue 与 Index 文件恢复

RocketMQ是将消息全量存储在CommitLog文件中,并异步生成转发任务更新ConsumeQueue文件、Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,就会导致CommitLog文件、ConsumeQueue文件、Index文件中的数据不一致。如果不加以人工修复,会有一部分消息即便在CommitLog文件中存在,由于并没有转发到ConsumeQueue文件,也永远不会被消费者消费。那RocketMQ是如何使文件达到最终一致性的呢?下面详细分析RocketMQ关于存储文件的加载流程来一窥端倪,如代码清单4-58所示。

代码清单4-58 DefaultMessageStore#load
boolean lastExitOK = !this.isTempFileExist();

private boolean isTempFileExist() {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    return file.exists();
}

第一步:判断上一次退出是否正常。其实现机制是Broker在启动时创建 ${ROCKET_HOME}/store/abort 文件,在退出时通过注册JVM钩子函数删除abort文件。如果下一次启动时存在abort文件。说明Broker是异常退出的,CommitLog与ConsumeQueue数据有可能不一致,需要进行修复,如代码清单4-59所示。

代码清单4-59 DefaultMessageStore#load
if (null != scheduleMessageService) {
    result = result && this.scheduleMessageService.load();
}

第二步:加载延迟队列,如代码清单4-60所示。

代码清单4-60 MappedFileQueue#load
Arrays.sort(files);

for (File file : files) {
    if (file.length() != this.mappedFileSize) {
        return true;
    }
    try {
        MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
        mappedFile.setWrotePosition(this.mappedFileSize);
        mappedFile.setFlushedPosition(this.mappedFileSize);
        mappedFile.setCommittedPosition(this.mappedFileSize);
        this.mappedFiles.add(mappedFile);
        log.info("load " + file.getPath() + " OK");
    } catch (IOException e) {
        log.error("load file " + file + " error", e);
        return false;
    }
}

第三步:加载CommitLog文件,加载${ROCKET_HOME}/store/commitlog目录下所有文件并按照文件名进行排序。如果文件与配置文件的单个文件大小不一致,将忽略该目录下的所有文件,然后创建MappedFile对象。注意load()方法将wrotePosition、flushedPosition、committedPosition三个指针都设置为文件大小。

第四步:加载消息消费队列,调用DefaultMessageStore#loadConsumeQueue,其思路与CommitLog大体一致,遍历消息消费队列根目录,获取该Broker存储的所有主题,然后遍历每个主题目录,获取该主题下的所有消息消费队列,最后分别加载每个消息消费队列下的文件,构建ConsumeQueue对象,主要初始化ConsumeQueue的topic、queueId、storePath、mappedFileSize属性,如代码清单4-61所示。

代码清单4-61 DefaultMessageStore#load
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.
getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

第五步:加载并存储checkpoint文件,主要用于记录CommitLog文件、ConsumeQueue文件、Inde文件的刷盘点,如代码清单4-62所示。

代码清单4-62 IndexService#load
for (File file : files) {
    IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
    f.load();
    if (!lastExitOK) {
        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
            f.destroy(0);
            continue;
        }
    }
}
// 省略异常代码

第六步:加载Index文件,如果上次异常退出,而且Index文件刷盘时间小于该文件最大的消息时间戳,则该文件将立即销毁,如代码清单4-63所示。

代码清单4-63 DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
    this.recoverConsumeQueue();
    if (lastExitOK) {
        this.commitLog.recoverNormally();
    } else {
        this.commitLog.recoverAbnormally();
    }
    this.recoverTopicQueueTable();
}

第七步:根据Broker是否为正常停止,执行不同的恢复策略,下文将分别介绍异常停止、正常停止的文件恢复机制,如代码清单4-64所示。

代码清单4-64 DefaultMessageStore#recoverTopicQueueTable
private void recoverTopicQueueTable() {
    HashMap<String /* topic-queueid */, Long /* offset */> table = new HashMap<String, Long>(1024);
    long minPhyOffset = this.commitLog.getMinOffset();

    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            logic.correctMinOffset(minPhyOffset);
        }
    }

    this.commitLog.setTopicQueueTable(table);
}

第八步:恢复ConsumeQueue文件后,将在CommitLog实例中保存每个消息消费队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID还存储了消息队列偏移量的关键所在。

Broker正常停止文件恢复

从图4-20可以看出,Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息,其结构如下。

代码清单4-65 CommitLog#recoverNormally
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();

if (!mappedFiles.isEmpty()) {
    int index = mappedFiles.size() - 3;
    if (index < 0) {
        index = 0;
    }
    // 省略部分代码
}

第一步:Broker正常停止再重启时,从倒数第3个文件开始恢复,如果不足3个文件,则从第一个文件开始恢复。checkCRCOnRecover参数用于在进行文件恢复时查找消息是否验证CRC,如代码清单4-66所示。

代码清单4-66 CommitLog#recoverNormally
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;

第二步:解释一下两个局部变量,mappedFileOffset为当前文件已校验通过的物理偏移量,processOffset为CommitLog文件已确认的物理偏移量,等于mappedFile.getFileFromOffset加上mappedFileOffset,如代码清单4-67所示。

代码清单4-67 CommitLog#recoverNormally
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();

if (dispatchRequest.isSuccess() && size > 0) {
    mappedFileOffset += size;
} else if (dispatchRequest.isSuccess() && size == 0) {
    index++;
    if (index >= mappedFiles.size()) {
        break;
    } else {
        mappedFile = mappedFiles.get(index);
        byteBuffer = mappedFile.sliceByteBuffer();
        processOffset = mappedFile.getFileFromOffset();
        mappedFileOffset = 0;
        log.info("recover next physics file, " + mappedFile.getFileName());
    }
} else if (!dispatchRequest.isSuccess()) {
    log.info("recover physics file end, " + mappedFile.getFileName());
    break;
}

第三步:遍历CommitLog文件,每次取出一条消息,如果查找结果为true并且消息的长度大于0,表示消息正确,mappedFileOffset指针向前移动本条消息的长度。如果查找结果为true并且消息的长度等于0,表示已到该文件的末尾,如果还有下一个文件,则重置processOffset、mappedFileOffset并重复上述步骤,否则跳出循环;如果查找结果为false,表明该文件未填满所有消息,则跳出循环,结束遍历文件,如代码清单4-68所示。

processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);

第四步:更新MappedFileQueue的flushedWhere和committedPosition指针,如代码清单4-69所示。

代码清单4-69 MappedFileQueue#truncateDirtyFiles
public void truncateDirtyFiles(long offset) {
    List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
    for (MappedFile file : this.mappedFiles) {
        long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
        if (fileTailOffset > offset) {
            if (offset >= file.getFileFromOffset()) {
                file.setWrotePosition((int) (offset % this.mappedFileSize));
                file.setCommittedPosition((int) (offset % this.mappedFileSize));
                file.setFlushedPosition((int) (offset % this.mappedFileSize));
            } else {
                file.destroy(1000);
                willRemoveFiles.add(file);
            }
        }
    }
    this.deleteExpiredFile(willRemoveFiles);
}

第五步:删除offset之后的所有文件。遍历目录下的文件,如果文件的尾部偏移量小于offset则跳过该文件,如果尾部的偏移量大于offset,则进一步比较offset与文件的开始偏移量。如果offset大于文件的起始偏移量,说明当前文件包含了有效偏移量,设置MappedFile的flushedPosition和committedPosition。如果offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,则调用MappedFile#destory方法释放MappedFile占用的内存资源(内存映射与内存通道等),然后加入待删除文件列表中,最终调用deleteExpiredFile将文件从物理磁盘上删除。过期文件的删除将在4.9节详细介绍。

Broker异常停止文件恢复

Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复与正常停止文件恢复的步骤基本相同,主要差别有两个:首先,Broker正常停止默认从倒数第三个文件开始恢复,而异常停止则需要从最后一个文件倒序推进,找到第一个消息存储正常的文件;其次,如果CommitLog目录没有消息文件,在ConsuneQueue目录下存在的文件则需要销毁。

如何判断一个消息文件是否正确呢?请看代码清单4-70。

代码清单4-70 CommitLog#isMappedFileMatchedRecover
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
    return false;
}

第一步:判断文件的魔数,如果不是MESSAGE_MAGIC_CODE,则返回false,表示该文件不符合CommitLog文件的存储格式,如代码清单4-71所示。

代码清单4-71 CommitLog#isMappedFileMatchedRecover
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
    return false;
}

第二步:如果文件中第一条消息的存储时间等于0,则返回false,说明该消息的存储文件中未存储任何消息,如代码清单4-72所示。

代码清单4-72 CommitLog#isMappedFileMatchedRecover
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
    this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
    if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
        return true;
    }
} else if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
    return true;
}

第三步:对比文件第一条消息的时间戳与检测点。如果文件第一条消息的时间戳小于文件检测点,说明该文件的部分消息是可靠的,则从该文件开始恢复。checkpoint文件中保存了CommitLog、ConsumeQueue、Index的文件刷盘点,RocketMQ默认选择CommitLog文件与ConsumeQueue这两个文件的刷盘点中较小值与CommitLog文件第一条消息的时间戳做对比,如果messageIndexEnable为true,表示Index文件的刷盘时间点也参与计算。

第四步:如果根据前3步算法找到MappedFile,则遍历MappedFile中的消息,验证消息的合法性,并将消息重新转发到ConsumeQueue与Index文件,该步骤在4.7.1节中已详细说明。

第五步:如果未找到有效的MappedFile,则设置CommitLog目录的flushedWhere、committedWhere指针都为0,并销毁ConsumeQueue文件,如代码清单4-73所示。

代码清单4-73 ConsumeQueue#destroy
public void destroy() {
    this.maxPhysicOffset = -1;
    this.minLogicOffset = 0;
    this.mappedFileQueue.destroy();
    if (isExtReadEnable()) {
        this.consumeQueueExt.destroy();
    }
}

重置ConsumeQueue的maxPhysicOffset与minLogicOffset,然后调用MappedFileQueue的destory()方法将ConsumeQuene目录下的文件全部删除。 存储启动时所谓的文件恢复主要完成flushedPosition、committedWhere指针的设置、将消息消费队列最大偏移量加载到内存,并删除flushedPosition之后所有的文件。如果Broker异常停止,在文件恢复过程中,RocketMQ会将最后一个有效文件中的所有消息重新转发到ConsumeQueue和Index文件中,确保不丢失消息,但同时会带来消息重复的问题。纵观RocktMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。