实时更新 ConsumeQueue 与 Index 文件

因为ConsumeQueue文件、Index文件都是基于CommitLog文件构建的,所以当消息生产者提交的消息存储到CommitLog文件中时,ConsumeQueue文件、Index文件需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageServcie来准实时转发CommitLog文件的更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue文件、Index文件,如代码清单4-48所示。

代码清单4-48 DefaultMessageStore#start
if (this.getMessageStoreConfig().isDuplicationEnable()) {
    this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
    this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();

Broker服务器在启动时会启动ReputMessageService线程,并初始化一个非常关键的参数reputFromOffset,该参数的含义是ReputMessageService从哪个物理偏移量开始转发消息给ConsumeQueue和Index文件。如果允许重复转发,将reputFromOffset设置为CommitLog文件的提交指针。如果不允许重复转发,将reputFromOffset设置为CommitLog文件的内存中最大偏移量,如代码清单4-49所示。

代码清单4-49 DefaultMessageStore#run
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

ReputMessageService线程每执行一次任务推送,休息1ms后继续尝试推送消息到ConsumeQueue和Index文件中,消息消费转发由doReput()方法实现,如代码清单4-50所示。

代码清单4-50 DefaultMessageStore#doReput
SelectMappedBufferResult result =
        DefaultMessageStore.this.commitLog.getData(reputFromOffset);

第一步:返回reputFromOffset偏移量开始的全部有效数据(CommitLog文件)。然后循环读取每一条消息,如代码清单4-51所示。

代码清单4-51 DefaultMessageStore#doReput
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
        .checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
    if (size > 0) {
        DefaultMessageStore.this.doDispatch(dispatchRequest);
    }
}

第二步:从result返回的ByteBuffer中循环读取消息,一次读取一条,创建Dispatch Request对象。DispatchRequest类图如图4-20所示,如果消息长度大于0,则调用doDispatch()方法。最终将分别调用CommitLogDispatcherBuildConsumeQueue(构建消息消费队列)、CommitLogDispatcherBuildIndex(构建索引文件)。

image 2025 01 18 13 44 59 858
Figure 1. 图4-20 DispatchRequest类图

下面介绍DispatchRequest的核心属性。

1)String topic:消息主题名称。 2)int queueId:消息队列ID。 3)long commitLogOffset:消息物理偏移量。 4)int msgSize:消息长度。 5)long tagsCode:消息过滤tag哈希码。 6)long storeTimestamp:消息存储时间戳。 7)long consumeQueueOffset:消息队列偏移量。 8)String keys:消息索引key。多个索引key用空格隔开,例如key1 key2。 9)boolean success:是否成功解析到完整的消息。 10)String uniqKey:消息唯一键。 11)int sysFlag:消息系统标记。 12)long preparedTransactionOffset:消息预处理事务偏移量。 13)Map propertiesMap:消息属性。 14)byte bitMap[]:位图。

根据消息更新 ConsumeQueue 文件

消息消费队列转发任务实现类为CommitLogDispatcherBuildConsumeQueue,内部最终将调用putMessagePositionInfo()方法,如代码清单4-52所示。

代码清单4-52 DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(),
            dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

第一步:根据消息主题与队列ID,先获取对应的ConsumeQueue文件,其逻辑比较简单,因为每一个消息主题对应一个ConsumeQueue目录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后的ConsumeQueue文件即可,如代码清单4-53所示。

代码清单4-53 ConsumeQueue#putMessagePositionInfo
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

if (mappedFile != null) {
    return mappedFile.appendMessage(this.byteBufferIndex.array());
}

第二步:依次将消息偏移量、消息长度、tag哈希码写入ByteBuffer,并根据consume-QueueOffset计算ConsumeQueue中的物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘。

根据消息更新 Index 文件

哈希索引文件转发任务实现类为CommitLogDispatcherBuildIndex,如代码清单4-54所示。

代码清单4-54 CommitLogDispatcherBuildIndex#dispatch
public void dispatch(DispatchRequest request) {
    if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
        DefaultMessageStore.this.indexService.buildIndex(request);
    }
}

如果messsageIndexEnable设置为true,则调用IndexService#buildIndex构建哈希索引,否则忽略本次转发任务,如代码清单4-55所示。

代码清单4-55 IndexService#buildIndex
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
    long endPhyOffset = indexFile.getEndPhyOffset();
    DispatchRequest msg = req;
    String topic = msg.getTopic();
    String keys = msg.getKeys();
    if (msg.getCommitLogOffset() < endPhyOffset) {
        return;
    }
    // 省略部分代码
}

第一步:获取或创建Index文件并获取所有文件最大的物理偏移量。如果该消息的物理偏移量小于Index文件中的物理偏移量,则说明是重复数据,忽略本次索引构建,如代码清单4-56所示。

代码清单4-56 IndexService#buildIndex
if (req.getUniqKey() != null) {
    indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
    if (indexFile == null) {
        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
        return;
    }
}

第二步:如果消息的唯一键不为空,则添加到哈希索引中,以便加速根据唯一键检索消息,如代码清单4-57所示。

代码清单4-57 IndexService#buildIndex
if (keys != null && keys.length() > 0) {
    String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
    for (int i = 0; i < keyset.length; i++) {
        String key = keyset[i];
        if (key.length() > 0) {
            indexFile = putKey(indexFile, msg, buildKey(topic, key));
            // 返回 topic + "#" + key
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }
    }
}

第三步:构建索引键,RocketMQ支持为同一个消息建立多个索引,多个索引键用空格分开。