RocketMQ 元数据同步
7.1节和7.2节详细介绍了RocketMQ高可用主从数据同步和读写分离机制,本节将详细介绍RocketMQ元数据的同步机制。所谓RocketMQ元数据主要是指topic、消费组订阅信息、消费组消费进度、延迟队列进度。
RocketMQ高可用机制虽不能提供主从切换,即当主节点宕机后,从节点无法接管写入请求,但能承担读请求,即不影响消费。集群模式下消费进度是存储在Broker服务器上的,主节点宕机后,消费进度会存储在从服务器上,如果主节点恢复了,消费进度在主从节点又是如何同步的呢? |
从节点主动同步元数据
在RocketMQ的设计中,元数据的同步是单向的,即元数据只能由从节点向主节点发起同步请求,而主节点不能向从节点同步元数据,即使主节点宕机后重启,也不会向从节点同步数据。
Broker在启动的时候,如果节点角色为从节点,会调用handleSlaveSynchronize方法,如图7-10所示。

如果Broker节点的角色为从节点,会开启一个定时任务,每隔10s执行一次元数据同步任务,同步任务的实现逻辑封装在SlaveSynchronize中,如代码清单7-25所示。
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
从节点会定时执行上述同步任务,从主节点同步 topic 路由信息、消息消费进度、延迟队列调度进度、消费组信息。
接下来主要以消费组消费进度同步为例进行介绍,如代码清单 7-26 所示。
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
} catch (Exception e) {
}
}
}
如果主节点的地址不为空,则向主节点发送 GET_ALL_CONSUMER_OFFSET 命令,查询主节点中所有存储的消息消费进度,然后直接覆盖从服务器中存储的消费进度。
主节点消息拉取主动同步消费进度
消息消费进度比起其他元数据来说比较特殊,因为消费进度的变化频率非常快,并且与消费端的行为息息相关,为了解答本章开头的疑问,接下来重点介绍消息消费进度额外的同步机制。关于消息消费进度,RocketMQ 还引入了另外一种同步机制:根据消息拉取的偏移量来更新消息消费进度,如代码清单7-27所示。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.GetMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
//省略部分代码
int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
第一步:消费者在向 Broker 发送拉取消息请求时,会先将客户端存储的消费进度提交到Broker端,如代码清单7-28所示。
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
第二步:Broker 收到消息客户端消息拉取请求后,如果拉取请求中有包含消费端的消息消费进度,则使用该进度更新 Broker 端的消费进度,即提供了根据消费端覆盖服务端消费进度的机制。