RocketMQ 读写分离机制
7.1节主要介绍了 RocketMQ 主从服务的实现原理,本节主要介绍从服务器如何参与消息拉取负载。消息消费是基于消息消费队列 MessageQueue 实现的,回顾一下 MessageQueue 的类图,如图 7-8 所示。

接下来重点分析RocketMQ根据brokerName查找Broker地址的过程。
RocketMQ根据MessageQueue查找Broker地址的唯一依据是brokerName,从RocketMQ的Broker组织结构中得知,同一组Broker(主从)服务器,它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0,RocketMQ提供MQClientFactory.findBrokerAddressInSubscribe来实现根据brokerName、brokerId查找Broker地址,如代码清单7-19所示。
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
返回结果类图如图7-9、代码清单7-20所示。

public FindBrokerResult findBrokerAddressInSubscribe(
String brokerName, long brokerId, boolean onlyThisBroker) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */>
map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null;
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = entry.getKey() != MixAll.MASTER_ID;
found = true;
}
}
if (found) {
return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}
下面解释一下该方法的参数。
-
brokerName:Broker名称。
-
brokerId:BrokerID。
-
onlyThisBroker:是否必须返回与brokerId的Broker对应的服务器信息。
从 ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId */, String/*address */>> brokerAddrTable 地址缓存表中根据 brokerName 获取所有的 Broker 信息。
根据 brokerId 从 Broker 主从缓存表中获取指定的 Broker 名称,如果根据 brokerId 未找到相关条目,且 onlyThisBroker 为 false,则随机返回 Broker 中任意一个 broker,否则返回 null。
组装 FindBrokerResult 时,需要设置节点属性是否是 slave。如果 brokerId=0,表示返回的 Broker 是主节点,否则返回的是从节点。
根据消息消费队列获取 brokerId 的实现如代码清单 7-21 所示。
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
return MixAll.MASTER_ID;
}
从ConcurrentMap<MessageQueue, AtomicLong/* brokerId */>pullFromWhichNodeTable 缓存表中获取该消息消费队列的 brokerId,如果找到,则返回,否则返回 brokerName 的主节点。pullFromWhichNodeTable消息从何而来呢?原来消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后,会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新pullFromWhichNodeTable,消息消费者线程更新pullFromWhichNodeTable,如代码清单7-22所示。
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
public void updatePullFromWhichNode (MessageQueue mq,long brokerId){
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}
消息服务端是根据何种规则来建议哪个消息消费队列该从哪台Broker服务器上拉取消息呢?请看代码清单7-23。
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
1)maxOffsetPy:代表当前主服务器消息存储文件的最大偏移量。 2)maxPhyOffsetPulling:此次拉取消息的最大偏移量。 3)diff:对于PullMessageService线程来说,当前未被拉取到消息消费端的消息长度。 4)TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ所在服务器的总内存大小。AccessMessageInMemoryMaxRatio表示RocketMQ所能使用的最大内存比例,超过该比例,消息将被置换出内存。memory表示RocketMQ消息常驻内存的大小,超过该大小,RocketMQ会将旧的消息置换回磁盘。
如果 diff 大于 memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取消息。
如代码清单7-24所示,如果主服务器繁忙则建议下一次从从服务器拉取消息,设置 suggestWhichBrokerId 为配置文件中的 whichBrokerWhenConsumeSlowly 属性,默认为 1。如果一个主服务器拥有多台从服务器,参与消息拉取负载的从服务器只会是其中一个。
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}