消息队列负载与重新分布机制

因为在启动PullMessageService时,LinkedBlockingQueue<PullRequest>pullRequestQueue中没有PullRequest对象,所以PullMessageService线程将阻塞。

  • 问题1:PullRequest对象在什么时候创建并加入pullRequestQueue,可以唤醒PullMessageService线程?

  • 问题2:集群内多个消费者是如何负载主题下多个消费队列的?如果有新的消费者加入,消息队列又会如何重新分布?

RocketMQ消息队列重新分布是由RebalanceService线程实现的,如代码清单5-38所示。一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。接下来我们带着上面两个问题,了解一下RebalanceService的run()方法。

代码清单5-38 RebalanceService#run
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

RebalanceService线程默认每隔20s执行一次mqClientFactory.doRebalance()方法,如代码清单5-39所示。可以使用Drocketmq.client.rebalance.waitInterval=interval改变默认值。

代码清单5-39 MQClientInstance#doRebalance
public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

MQClientIinstance遍历已注册的消费者,对消费者执行doRebalance()方法,如代码清单5-40所示。

代码清单5-40 RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}

每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象,该方法主要遍历订阅信息对每个主题的队列进行重新负载。RebalanceImpl的Map<String,SubscriptionData>subTable在调用消费者DefaultMQPushConsumerImpl#subscribe方法时填充。如果订阅信息发生变化,例如调用了unsubscribe()方法,则需要将不关心的主题消费队列从processQueueTable中移除。接下来重点分析RebalanceImpl#rebalanceByTopic,了解RocketMQ如何针对单个主题进行消息队列重新负载(以集群模式),如代码清单5-41所示。

代码清单5-41 RebalanceImpl#rebalanceByTopic
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic,consumerGroup);

第一步:从主题订阅信息缓存表中获取主题的队列信息。发送请求从Broker中获取该消费组内当前所有的消费者客户端ID,主题的队列可能分布在多个Broker上,那么请求该发往哪个Broker呢?RocketeMQ从主题的路由信息表中随机选择一个Broker。Broker为什么会存在消费组内所有消费者的信息呢?我们不妨回忆一下,消费者在启动的时候会向MQClientInstance中注册消费者,然后MQClientInstance会向所有的Broker发送心跳包,心跳包中包含MQClientInstance的消费者信息,如代码清单5-42所示。如果mqSet、cidAll任意一个为空,则忽略本次消息队列负载。

代码清单5-42 RebalanceImpl#rebalanceByTopic
Collections.sort(mqAll);
Collections.sort(cidAll);

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;

try {
    allocateResult = strategy.allocate(
        this.consumerGroup,
        this.mQClientFactory.getClientId(),
        mqAll,
        cidAll
    );
} catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e);
    return;
}

第二步:对cidAll、mqAll进行排序。这一步很重要,同一个消费组内看到的视图应保持一致,确保同一个消费队列不会被多个消费者分配。RocketMQ消息队列分配算法接口,如代码清单5-43所示。

代码清单5-43 AllocateMessageQueueStrategy
/**
 * Strategy Algorithm for message allocating between consumers
 */
public interface AllocateMessageQueueStrategy {

    List<MessageQueue> allocate(
        String consumerGroup,
        String currentCID,
        List<MessageQueue> mqAll,
        List<String> cidAll
    );

    /**
     * Algorithm name
     *
     * @return The strategy name
     */
    String getName();
}

RocketMQ默认提供5种分配算法。 1)AllocateMessageQueueAveragely:平均分配,推荐使用。 举例来说,如果现在有8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,那么根据该负载算法,消息队列分配如下。

c1:q1、q2、q3。
c2:q4、q5、q6。
c3:q7、q8。

2)AllocateMessageQueueAveragelyByCircle:平均轮询分配,推荐使用。 举例来说,如果现在有8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,那么根据该负载算法,消息队列分配如下。

c1:q1、q4、q7。
c2:q2、q5、q8。
c3:q3、q6。

3)AllocateMessageQueueConsistentHash:一致性哈希。因为消息队列负载信息不容易跟踪,所以不推荐使用。 4)AllocateMessageQueueByConfig:根据配置,为每一个消费者配置固定的消息队列。 5)AllocateMessageQueueByMachineRoom:根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。

消息负载算法如果没有特殊的要求,尽量使用AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle,这是因为分配算法比较直观。消息队列分配原则为一个消费者可以分配多个消息队列,但同一个消息队列只会分配给一个消费者,故如果消费者个数大于消息队列数量,则有些消费者无法消费消息。

对比消息队列是否发生变化,主要思路是遍历当前负载队列集合,如果队列不在新分配队列的集合中,需要将该队列停止消费并保存消费进度;遍历已分配的队列,如果队列不在队列负载表中(processQueueTable),则需要创建该队列拉取任务PullRequest,然后添加到PullMessageService线程的pullRequestQueue中,PullMessageService才会继续拉取任务,如代码清单5-44所示。

代码清单5-44 RebalanceImpl#updateProcessQueueTableInRebalance
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();
    if (mq.getTopic().equals(topic)) {
        if (!mqSet.contains(mq)) {
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                it.remove();
                changed = true;
                log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
            }
        }
    }
}

第三步:ConcurrentMap〈MessageQueue, ProcessQueue〉 processQueueTable是当前消费者负载的消息队列缓存表,如果缓存表中的MessageQueue不包含在mqSet中,说明经过本次消息队列负载后,该mq被分配给其他消费者,需要暂停该消息队列消息的消费。方法是将ProccessQueue的状态设置为droped=true,该ProcessQueue中的消息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是否将MessageQueue、ProccessQueue从缓存表中移除。removeUnnecessaryMessageQueue在RebalanceImple中定义为抽象方法。removeUnnecessaryMessageQueue方法主要用于持久化待移除MessageQueue的消息消费进度。在推模式下,如果是集群模式并且是顺序消息消费,还需要先解锁队列,如代码清单5-45,关于顺序消息将在5.9节详细讨论。

代码清单5-45 RebalanceImpl#updateProcessQueueTableInRebalance
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }
        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();
        long nextOffset = this.computePullFromWhere(mq);
        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}

第四步:遍历本次负载分配到的队列集合,如果processQueueTable中没有包含该消息队列,表明这是本次新增加的消息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用DefaultMQPushConsumer#setConsumeFromWhere方法进行设置。 PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere。 1)ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列最新偏移量开始消费,如代码清单5-46所示。

代码清单5-46 RebalancePushImpl#computePullFromWhere
case CONSUME_FROM_LAST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    if (lastOffset >= 0) {
        result = lastOffset;
    } else if (-1 == lastOffset) {
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            result = 0L;
        } else {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
            } catch (MQClientException e) {
                result = -1;
            }
        }
    } else {
        result = -1;
    }
    break;
}

offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)返回-1表示该消息队列刚创建。从磁盘中读取消息队列的消费进度,如果大于0则直接返回,如果等于-1,在CONSUME_FROM_LAST_OFFSET模式下获取该消息队列当前最大的偏移量,如果小于-1,表示该消息进度文件中存储了错误的偏移量,则返回-1。 2)CONSUME_FROM_FIRST_OFFSET:从头开始消费,如代码清单5-47所示。

代码清单5-47 RebalancePushImpl#computePullFromWhere
case CONSUME_FROM_FIRST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    if (lastOffset >= 0) {
        result = lastOffset;
    } else if (-1 == lastOffset) {
        result = 0L;
    } else {
        result = -1;
    }
    break;
}

从磁盘中读取消息队列的消费进度,如果大于0则直接返回,如果等于-1,在CONSUME_FROM_FIRST_OFFSET模式下直接返回0,从头开始消费,如果小于-1,表示该消息进度文件中存储了错误的偏移量,则返回-1。 3)CONSUME_FROM_TIMESTAMP:从消费者启动时间戳对应消费进度开始消费,如代码清单5-48所示。

代码清单5-48 RebalancePushImpl#computePullFromWhere
try {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    if (lastOffset >= 0) {
        result = lastOffset;
    } else if (-1 == lastOffset) {
        try {
            long timestamp = UtilAll.parseDate(
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                UtilAll.YYYYMMDDHHMMSS
            ).getTime();
            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
        } catch (MQClientException e) {
            result = -1;
        }
    } else {
        result = -1;
    }
} catch (Exception e) {
    // Handle outer exception if needed
}

从磁盘中读取消息队列的消费进度,如果大于0则直接返回。如果等于-1,在CONSUME_FROM_TIMESTAMP模式下会尝试将消息存储时间戳更新为消费者启动的时间戳,如果能找到则返回找到的偏移量,否则返回0。如果小于-1,表示该消息进度文件中存储了错误的偏移量,则返回-1,如代码清单5-49所示。

ConsumeFromWhere相关消费进度校正策略只有在从磁盘中获取消费进度返回-1时才会生效,如果从消息进度存储文件中返回的消费进度小于-1,表示偏移量非法,则使用偏移量-1去拉取消息,那么会发生什么呢?首先第一次去消息服务器拉取消息时无法取到消息,但是会用-1去更新消费进度,然后将消息消费队列丢弃,在下一次消息队列负载时再次消费。

代码清单5-49 RebalancePushImpl#dispatchPullRequest
this.dispatchPullRequest(pullRequestList);

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

第五步:将PullRequest加入PullMessageService,以便唤醒PullMessageService线程。消息队列负载机制就介绍到这里,回到本节的两个问题。

问题1:PullRequest对象在什么时候创建并加入pullRequestQueue,可以唤醒PullMessageService线程?

RebalanceService线程每隔20s对消费者订阅的主题进行一次队列重新分配,每一次分配都会获取主题的所有队列、从Broker服务器实时查询当前该主题该消费组内的消费者列表,对新分配的消息队列会创建对应的PullRequest对象。在一个JVM进程中,同一个消费组同一个队列只会存在一个PullRequest对象。

问题2:集群内多个消费者是如何负载主题下多个消费队列的?如果有新的消费者加入,消息队列又会如何重新分布?

每次进行队列重新负载时,会从Broker实时查询当前消费组内所有的消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列,从而消费消息。

本节分析了消息队列的负载机制,RocketMQ消息拉取由PullMessageService与Rebalance-Service共同协作完成,如图5-13所示。

image 2025 01 18 15 55 27 260
Figure 1. 图5-13 PullMessageService线程与RebalanceService线程交互图