实时更新 ConsumeQueue 与 Index 文件
因为ConsumeQueue文件、Index文件都是基于CommitLog文件构建的,所以当消息生产者提交的消息存储到CommitLog文件中时,ConsumeQueue文件、Index文件需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageServcie来准实时转发CommitLog文件的更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue文件、Index文件,如代码清单4-48所示。
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
Broker服务器在启动时会启动ReputMessageService线程,并初始化一个非常关键的参数reputFromOffset,该参数的含义是ReputMessageService从哪个物理偏移量开始转发消息给ConsumeQueue和Index文件。如果允许重复转发,将reputFromOffset设置为CommitLog文件的提交指针。如果不允许重复转发,将reputFromOffset设置为CommitLog文件的内存中最大偏移量,如代码清单4-49所示。
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
ReputMessageService线程每执行一次任务推送,休息1ms后继续尝试推送消息到ConsumeQueue和Index文件中,消息消费转发由doReput()方法实现,如代码清单4-50所示。
SelectMappedBufferResult result =
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
第一步:返回reputFromOffset偏移量开始的全部有效数据(CommitLog文件)。然后循环读取每一条消息,如代码清单4-51所示。
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
}
}
第二步:从result返回的ByteBuffer中循环读取消息,一次读取一条,创建Dispatch Request对象。DispatchRequest类图如图4-20所示,如果消息长度大于0,则调用doDispatch()方法。最终将分别调用CommitLogDispatcherBuildConsumeQueue(构建消息消费队列)、CommitLogDispatcherBuildIndex(构建索引文件)。

下面介绍DispatchRequest的核心属性。
1)String topic:消息主题名称。 2)int queueId:消息队列ID。 3)long commitLogOffset:消息物理偏移量。 4)int msgSize:消息长度。 5)long tagsCode:消息过滤tag哈希码。 6)long storeTimestamp:消息存储时间戳。 7)long consumeQueueOffset:消息队列偏移量。 8)String keys:消息索引key。多个索引key用空格隔开,例如key1 key2。 9)boolean success:是否成功解析到完整的消息。 10)String uniqKey:消息唯一键。 11)int sysFlag:消息系统标记。 12)long preparedTransactionOffset:消息预处理事务偏移量。 13)Map propertiesMap:消息属性。 14)byte bitMap[]:位图。
根据消息更新 ConsumeQueue 文件
消息消费队列转发任务实现类为CommitLogDispatcherBuildConsumeQueue,内部最终将调用putMessagePositionInfo()方法,如代码清单4-52所示。
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(),
dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
第一步:根据消息主题与队列ID,先获取对应的ConsumeQueue文件,其逻辑比较简单,因为每一个消息主题对应一个ConsumeQueue目录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后的ConsumeQueue文件即可,如代码清单4-53所示。
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
第二步:依次将消息偏移量、消息长度、tag哈希码写入ByteBuffer,并根据consume-QueueOffset计算ConsumeQueue中的物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘。
根据消息更新 Index 文件
哈希索引文件转发任务实现类为CommitLogDispatcherBuildIndex,如代码清单4-54所示。
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
如果messsageIndexEnable设置为true,则调用IndexService#buildIndex构建哈希索引,否则忽略本次转发任务,如代码清单4-55所示。
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
// 省略部分代码
}
第一步:获取或创建Index文件并获取所有文件最大的物理偏移量。如果该消息的物理偏移量小于Index文件中的物理偏移量,则说明是重复数据,忽略本次索引构建,如代码清单4-56所示。
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
第二步:如果消息的唯一键不为空,则添加到哈希索引中,以便加速根据唯一键检索消息,如代码清单4-57所示。
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
// 返回 topic + "#" + key
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
第三步:构建索引键,RocketMQ支持为同一个消息建立多个索引,多个索引键用空格分开。