RocketMQ 元数据同步

7.1节和7.2节详细介绍了RocketMQ高可用主从数据同步和读写分离机制,本节将详细介绍RocketMQ元数据的同步机制。所谓RocketMQ元数据主要是指topic、消费组订阅信息、消费组消费进度、延迟队列进度。

RocketMQ高可用机制虽不能提供主从切换,即当主节点宕机后,从节点无法接管写入请求,但能承担读请求,即不影响消费。集群模式下消费进度是存储在Broker服务器上的,主节点宕机后,消费进度会存储在从服务器上,如果主节点恢复了,消费进度在主从节点又是如何同步的呢?

从节点主动同步元数据

在RocketMQ的设计中,元数据的同步是单向的,即元数据只能由从节点向主节点发起同步请求,而主节点不能向从节点同步元数据,即使主节点宕机后重启,也不会向从节点同步数据。

Broker在启动的时候,如果节点角色为从节点,会调用handleSlaveSynchronize方法,如图7-10所示。

image 2025 02 06 11 57 41 112
Figure 1. 图7-10 Broker启动时按照角色启动同步元数据任务示例图

如果Broker节点的角色为从节点,会开启一个定时任务,每隔10s执行一次元数据同步任务,同步任务的实现逻辑封装在SlaveSynchronize中,如代码清单7-25所示。

代码清单7-25 SlaveSynchronize#syncAll
public void syncAll() {
    this.syncTopicConfig();
    this.syncConsumerOffset();
    this.syncDelayOffset();
    this.syncSubscriptionGroupConfig();
}

从节点会定时执行上述同步任务,从主节点同步 topic 路由信息、消息消费进度、延迟队列调度进度、消费组信息。

接下来主要以消费组消费进度同步为例进行介绍,如代码清单 7-26 所示。

代码清单7-26 SlaveSynchronize#syncConsumerOffset
private void syncConsumerOffset() {
    String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
        try {
            ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
            this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsetWrapper.getOffsetTable());
            this.brokerController.getConsumerOffsetManager().persist();
        } catch (Exception e) {
        }
    }
}

如果主节点的地址不为空,则向主节点发送 GET_ALL_CONSUMER_OFFSET 命令,查询主节点中所有存储的消息消费进度,然后直接覆盖从服务器中存储的消费进度。

主节点消息拉取主动同步消费进度

消息消费进度比起其他元数据来说比较特殊,因为消费进度的变化频率非常快,并且与消费端的行为息息相关,为了解答本章开头的疑问,接下来重点介绍消息消费进度额外的同步机制。关于消息消费进度,RocketMQ 还引入了另外一种同步机制:根据消息拉取的偏移量来更新消息消费进度,如代码清单7-27所示。

代码清单7-27 DefaultMQPushConsumerImpl#pullMessage
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.GetMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}
//省略部分代码
int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffset
        true, // suspend
        subExpression != null, // subscription
        classFilter // class filter
);

第一步:消费者在向 Broker 发送拉取消息请求时,会先将客户端存储的消费进度提交到Broker端,如代码清单7-28所示。

代码清单7-28 PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

第二步:Broker 收到消息客户端消息拉取请求后,如果拉取请求中有包含消费端的消息消费进度,则使用该进度更新 Broker 端的消费进度,即提供了根据消费端覆盖服务端消费进度的机制。