RocketMQ 读写分离机制

7.1节主要介绍了 RocketMQ 主从服务的实现原理,本节主要介绍从服务器如何参与消息拉取负载。消息消费是基于消息消费队列 MessageQueue 实现的,回顾一下 MessageQueue 的类图,如图 7-8 所示。

image 2025 02 06 11 46 54 798
Figure 1. 图7-8 MessageQueue类图

接下来重点分析RocketMQ根据brokerName查找Broker地址的过程。

RocketMQ根据MessageQueue查找Broker地址的唯一依据是brokerName,从RocketMQ的Broker组织结构中得知,同一组Broker(主从)服务器,它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0,RocketMQ提供MQClientFactory.findBrokerAddressInSubscribe来实现根据brokerName、brokerId查找Broker地址,如代码清单7-19所示。

代码清单7-19 PullAPIWrapper#pullKernelImpl
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),    this.recalculatePullFromWhichNode(mq), false);

返回结果类图如图7-9、代码清单7-20所示。

image 2025 02 06 11 48 25 218
Figure 2. 图7-9 FindBrokerResult类图
代码清单7-20 MQClientInstance#findBrokerAddressInSubscribe
public FindBrokerResult findBrokerAddressInSubscribe(
        String brokerName, long brokerId, boolean onlyThisBroker) {
    String brokerAddr = null;
    boolean slave = false;
    boolean found = false;
    HashMap<Long/* brokerId */, String/* address */>
            map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        brokerAddr = map.get(brokerId);
        slave = brokerId != MixAll.MASTER_ID;
        found = brokerAddr != null;
        if (!found && !onlyThisBroker) {
            Entry<Long, String> entry = map.entrySet().iterator().next();
            brokerAddr = entry.getValue();
            slave = entry.getKey() != MixAll.MASTER_ID;
            found = true;
        }
    }
    if (found) {
        return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    }
    return null;
}

下面解释一下该方法的参数。

  • brokerName:Broker名称。

  • brokerId:BrokerID。

  • onlyThisBroker:是否必须返回与brokerId的Broker对应的服务器信息。

从 ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId */, String/*address */>> brokerAddrTable 地址缓存表中根据 brokerName 获取所有的 Broker 信息。

根据 brokerId 从 Broker 主从缓存表中获取指定的 Broker 名称,如果根据 brokerId 未找到相关条目,且 onlyThisBroker 为 false,则随机返回 Broker 中任意一个 broker,否则返回 null。

组装 FindBrokerResult 时,需要设置节点属性是否是 slave。如果 brokerId=0,表示返回的 Broker 是主节点,否则返回的是从节点。

根据消息消费队列获取 brokerId 的实现如代码清单 7-21 所示。

代码清单7-21 PullAPIWrapper#recalculatePullFromWhichNode
public long recalculatePullFromWhichNode(final MessageQueue mq) {
    if (this.isConnectBrokerByUser()) {
        return this.defaultBrokerId;
    }
    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (suggest != null) {
        return suggest.get();
    }
    return MixAll.MASTER_ID;
}

从ConcurrentMap<MessageQueue, AtomicLong/* brokerId */>pullFromWhichNodeTable 缓存表中获取该消息消费队列的 brokerId,如果找到,则返回,否则返回 brokerName 的主节点。pullFromWhichNodeTable消息从何而来呢?原来消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后,会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新pullFromWhichNodeTable,消息消费者线程更新pullFromWhichNodeTable,如代码清单7-22所示。

代码清单7-22 PullAPIWrapper#processPullResult
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
public void updatePullFromWhichNode (MessageQueue mq,long brokerId){
    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (null == suggest) {
        this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
    } else {
        suggest.set(brokerId);
    }
}

消息服务端是根据何种规则来建议哪个消息消费队列该从哪台Broker服务器上拉取消息呢?请看代码清单7-23。

代码清单7-23 DefaultMessageStore#getMessage
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE *    (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

1)maxOffsetPy:代表当前主服务器消息存储文件的最大偏移量。 2)maxPhyOffsetPulling:此次拉取消息的最大偏移量。 3)diff:对于PullMessageService线程来说,当前未被拉取到消息消费端的消息长度。 4)TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ所在服务器的总内存大小。AccessMessageInMemoryMaxRatio表示RocketMQ所能使用的最大内存比例,超过该比例,消息将被置换出内存。memory表示RocketMQ消息常驻内存的大小,超过该大小,RocketMQ会将旧的消息置换回磁盘。

如果 diff 大于 memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取消息。

如代码清单7-24所示,如果主服务器繁忙则建议下一次从从服务器拉取消息,设置 suggestWhichBrokerId 为配置文件中的 whichBrokerWhenConsumeSlowly 属性,默认为 1。如果一个主服务器拥有多台从服务器,参与消息拉取负载的从服务器只会是其中一个。

代码清单7-24 PullMessageProcessor#processRequest
if (getMessageResult.isSuggestPullingFromSlave()) {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}