消息拉取

本节将基于推模式详细分析消息拉取机制。消息消费有两种模式:广播模式与集群模式,广播模式比较简单,每一个消费者需要拉取订阅主题下所有消费队列的消息。本节主要基于集群模式进行介绍。在集群模式下,同一个消费组内有多个消息消费者,同一个主题存在多个消费队列,那么消费者如何进行消息队列负载呢?从5.3节介绍的启动流程可知,每一个消费组内维护一个线程池来消费消息,那么这些线程又是如何分工合作的呢? 消息队列负载通常的做法是一个消息队列在同一时间只允许被一个消息消费者消费,一个消息消费者可以同时消费多个消息队列,那么RocketMQ是如何实现消息队列负载的呢?带着上述问题,我们开始RocketMQ消息消费机制的探讨。

从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService执行消息的拉取。

PullMessageService实现机制

PullMessageService继承的是ServiceThread,从名称来看,它是服务线程,通过run()方法启动,如代码清单5-6所示。

代码清单5-6 PullMessageService#run
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                this.pullMessage(pullRequest);
            }
        } catch (InterruptedException e) {
            // Handle InterruptedException if needed
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

PullMessageService消息拉取服务线程,run()方法是其核心逻辑,如代码清单5-7所示。run()方法的核心要点如下。

1)while(!this.isStopped())是一种通用的设计技巧,Stopped声明为volatile,每执行一次业务逻辑,检测一下其运行状态,可以通过其他线程将Stopped设置为true,从而停止该线程。 2)从pullRequestQueue中获取一个PullRequest消息拉取任务,如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入。 3)调用pullMessage方法进行消息拉取。思考一下,PullRequest是什么时候添加的呢?

代码清单5-7 PullMessageService#executePullRequestLaterexecutePullRequestImmediately
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    this.scheduledExecutorService.schedule(new Runnable() {
        public void run() {
            PullMessageService.this.executePullRequestImmediately(pullRequest);
        }
    }, timeDelay, TimeUnit.MILLISECONDS);
}

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

原来,PullMessageService提供了延迟添加与立即添加两种方式将PullRequest放入pullRequestQueue。那么PullRequest是在什么时候创建的呢?executePullRequestImmediately方法调用链如图5-6所示。

image 2025 01 18 15 03 43 915
Figure 1. 图5-6 executePullRequestImmediately调用链

通过跟踪发现,主要有两个地方会调用executePullRequestImmediately:一个是在RocketMQ根据PullRequest拉取任务执行完一次消息拉取任务后,又将PullRequest对象放入pullRequestQueue;另一个是在RebalanceImpl中创建的。RebalanceImpl是5.5节要重点介绍的消息队列负载机制,也就是PullRequest对象真正创建的地方。

从上面的分析可知,PullMessageService只有在得到PullRequest对象时才会执行拉取任务,那么PullRequest究竟是什么呢?其类图如图5-7所示。

image 2025 01 18 15 04 17 186
Figure 2. 图5-7 PullRequest类图

如代码清单5-8所示,下面逐一介绍PullRequest的核心属性。 1)String consumerGroup:消费者组。 2)MessageQueue messageQueue:待拉取消费队列。 3)ProcessQueue processQueue:消息处理队列,从Broker中拉取到的消息会先存入ProccessQueue,然后再提交到消费者消费线程池进行消费。 4)long nextOffset:待拉取的MessageQueue偏移量。 5)boolean lockedFirst:是否被锁定。

代码清单5-8 PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

根据消费组名从MQClientInstance中获取消费者的内部实现类MQConsumerInner,令人意外的是,这里将consumer强制转换为DefaultMQPushConsumerImpl,也就是PullMessageService,该线程只为推模式服务,那拉模式如何拉取消息呢?其实细想也不难理解,对于拉模式,RocketMQ只需要提供拉取消息API,再由应用程序调用API。

ProcessQueue实现机制

ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按消息队列偏移量的顺序存放在ProcessQueue中,PullMessageService将消息提交到消费者消费线程池,消息成功消费后,再从ProcessQueue中移除。ProcessQueue的类图如图5-8所示。

image 2025 01 18 15 05 51 659
Figure 3. 图5-8 PocessQueue类图

下面逐一介绍ProccessQueue的核心属性。

1)ReadWriteLock lockTreeMap:读写锁,控制多线程并发修改msgTreeMap、msgTreeMapTemp。 2)TreeMap msgTreeMap:消息存储容器,键为消息在ConsumeQueue中的偏移量。 3)AtomicLong msgCount:ProcessQueue中总消息数。 4)TreeMap msgTreeMapTemp:消息临时存储容器,键为消息在ConsumeQueue中的偏移量。该结构用于处理顺序消息,消息消费线程从ProcessQueue的msgTreeMap中取出消息前,先将消息临时存储在msgTreeMapTemp中。 5)volatile long queueOffsetMax:当前ProcessQueue中包含的最大队列偏移量。 6)volatile boolean dropped = false:当前ProccesQueue是否被丢弃。 7)volatile long lastPullTimestamp:上一次开始拉取消息的时间戳。 8)volatile long lastConsumeTimestamp:上一次消费消息的时间戳。

继续介绍ProcessQueue的核心方法。

1)public boolean isLockExpired():判断锁是否过期,锁超时时间默认为30s,通过系统参数rocketmq.client. rebalance.lockMaxLiveTime进行设置。 2)public boolean isPullExpired():判断PullMessageService是否空闲,空闲时间默认120s,通过系统参数rocketmq.client. pull.pullMaxIdleTime进行设置。 3)public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer):移除消费超时的消息,默认超过15min未消费的消息将延迟3个延迟级别再消费。 4)public boolean putMessage(final List msgs):添加消息,PullMessageService拉取消息后,调用该方法将消息添加到ProcessQueue。 5)public long getMaxSpan():获取当前消息的最大间隔。getMaxSpan()并不能说明ProceQueue包含的消息个数,但是能说明当前处理队列中第一条消息与最后一条消息的偏移量已经超过的消息个数。 6)public long removeMessage(final List<MessageExt> msgs):移除消息。 7)public void rollback():将msgTreeMapTmp中的所有消息重新放入msgTreeMap并清除msgTreeMapTmp。 8)public long commit():将msgTreeMapTmp中的消息清除,表示成功处理该批消息。 9)public void makeMessageToCosumeAgain(List msgs):重新消费该批消息。 10)public List takeMessags(final int batchSize):从ProcessQueue中取出batchSize条消息。

消息拉取基本流程

本节将以并发消息消费来探讨整个消息消费的流程,顺序消息的实现原理将在5.9节单独分析。

消息拉取分为3个主要步骤。

1)拉取客户端消息拉取请求并封装。 2)消息服务器查找消息并返回。 3)消息拉取客户端处理返回的消息。

  1. 客户端封装消息拉取请求

    消息拉取入口为DefaultMQPushConsumerImpl#pullMessage,如代码清单5-9所示。

    代码清单5-9 DefaultMQPushConsumerImpl#pullMessage
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    
    try {
        this.makeSureStateOK();
    } catch (MQClientException e) {
        log.warn("pullMessage exception, consumer state not ok", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        return;
    }
    
    if (this.isPause()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }

    第一步:从PullRequest中获取ProcessQueue,如果处理队列当前状态未被丢弃,则更新ProcessQueue的lastPullTimestamp为当前时间戳。如果当前消费者被挂起,则将拉取任务延迟1s再放入PullMessageService的拉取任务队列中,最后结束本次消息拉取,如代码清单5-10所示。

    代码清单5-10 DefaultMQPushConsumerImpl#pullMessage
    long size = processQueue.getMsgCount().get();
    if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((flowControlTimes1++ % 1000) == 0) {
            // 省略流控输出语句
        }
        return;
    }
    
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((flowControlTimes2++ % 1000) == 0) {
            // 省略流控输出语句
        }
        return;
    }

    第二步:进行消息拉取流控。从消息消费数量与消费间隔两个维度进行控制,如代码清单5-11所示。

    1)消息处理总数,如果ProcessQueue当前处理的消息条数超过了pullThresholdForQueue=1000,将触发流控,放弃本次拉取任务,并且该队列的下一次拉取任务将在50ms后才加入拉取任务队列。每触发1000次流控后输出提示语:the consumer message buffer is full, sodo flow control, minOffset={队列最小偏移量}, maxOffset={队列最大偏移量}, size={消息总条数}, pullRequest={拉取任务}, flowControlTimes={流控触发次数}。

    2)ProcessQueue中队列最大偏移量与最小偏离量的间距不能超过consumeConcurrentlyMaxSpan,否则触发流控。每触发1000次流控后输出提示语:the queue’s messages, span toolong, so do flow control, minOffset={队列最小偏移量}, maxOffset={队列最大偏移量},maxSpan={间隔}, pullRequest={拉取任务信息}, flowControlTimes={流控触发次数}。这里主要的考量是担心因为一条消息堵塞,使消息进度无法向前推进,可能会造成大量消息重复消费。

    代码清单5-11 DefaultMQPushConsumerImpl#pullMessage
    final SubscriptionData subscriptionData = this.rebalanceImpl
        .getSubscriptionInner()
        .get(pullRequest.getMessageQueue().getTopic());
    
    if (null == subscriptionData) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        return;
    }

    第三步:拉取该主题的订阅信息,如果为空则结束本次消息拉取,关于该队列的下一次拉取任务将延迟3s执行,如代码清单5-12所示。

    代码清单5-12 DefaultMQPushConsumerImpl#pullMessage
    boolean commitOffsetEnable = false;
    long commitOffsetValue = 0L;
    
    if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
        commitOffsetValue = this.offsetStore.readOffset(
            pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY
        );
        if (commitOffsetValue > 0) {
            commitOffsetEnable = true;
        }
    }
    
    String subExpression = null;
    boolean classFilter = false;
    
    if (sd != null) {
        if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
            subExpression = sd.getSubString();
        }
        classFilter = sd.isClassFilterMode();
    }
    
    int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commit offset
        true,
        subExpression != null,
        classFilter // class filter
    );

    第四步:构建消息拉取系统标记,如代码清单5-13所示,拉消息系统标记如图5-9所示。

    image 2025 01 18 15 10 57 740
    Figure 4. 图5-9 PullSysFlag类图

    下面逐一介绍PullSysFlag的枚举值含义。

    1)FLAG_COMMIT_OFFSET:表示从内存中读取的消费进度大于0,则设置该标记位。 2)FLAG_SUSPEND:表示消息拉取时支持挂起。 3)FLAG_SUBSCRIPTION:消息过滤机制为表达式,则设置该标记位。 4)FLAG_CLASS_FILTER:消息过滤机制为类过滤模式。

    代码清单5-13 DefaultMQPushConsumerImpl#pullMessage
    this.pullAPIWrapper.pullKernelImpl(
        pullRequest.getMessageQueue(),
        subExpression,
        subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(),
        pullRequest.getNextOffset(),
        this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag,
        commitOffsetValue,
        BROKER_SUSPEND_MAX_TIME_MILLIS,
        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC,
        pullCallback
    );

    第五步:调用PullAPIWrapper.pullKernelImpl方法后与服务端交互,如代码清单5-14所示,调用pullKernelImpl方法之前,我们先了解一下其参数含义。

    1)MessageQueue mq:从哪个消息消费队列拉取消息。 2)String subExpression:消息过滤表达式。 3)String expressionType:消息表达式类型,分为TAG、SQL92。 4)long offset:消息拉取偏移量。 5)int maxNums:本次拉取最大消息条数,默认32条。 6)int sysFlag:拉取系统标记。 7)long commitOffset:当前MessageQueue的消费进度(内存中)。 8)long brokerSuspendMaxTimeMillis:消息拉取过程中允许Broker挂起的时间,默认15s。 9)long timeoutMillis:消息拉取超时时间。 10)CommunicationMode communicationMode:消息拉取模式,默认为异步拉取。 11)PullCallback pullCallback:从Broker拉取到消息后的回调方法。

    代码清单5-14 PullAPIWrapper#pullKernelImpl
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(
            mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq),
            false
        );
    
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(
            mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq),
            false
        );
    }

    第六步:根据brokerName、BrokerId从MQClientInstance中获取Broker地址,在整个RocketMQ Broker的部署结构中,相同名称的Broker构成主从结构,其BrokerId会不一样,在每次拉取消息后,会给出一个建议,下次是从主节点还是从节点拉取,如代码清单5-15所示,其类图如图5-10所示。

    image 2025 01 18 15 13 17 050
    Figure 5. 图5-10 Find BrokerResult类图

    下面让我们一一来介绍FindBrokerResult的核心属性。

    1)String brokerAddr:Broker地址。 2)bollean slave:是否是从节点。 3)int brokerVersion:Broker版本。

    代码清单5-15 PullAPIWrapper#pullKernelImpl
    String brokerAddr = findBrokerResult.getBrokerAddr();
    
    if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
        brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
    }

    第七步:如果消息过滤模式为类过滤,则需要根据主题名称、broker地址找到注册在Broker上的FilterServer地址,从FilterServer上拉取消息,否则从Broker上拉取消息。上述步骤完成后,RocketMQ通过MQClientAPIImpl#pullMessageAsync方法异步向Broker拉取消息。

  2. 消息服务端Broker组装消息

    根据消息拉取命令RequestCode.PULL_MESSAGE,很容易找到Brokder端处理消息拉取的入口:org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest。

    第一步:根据订阅信息构建消息过滤器,如代码清单5-16所示。

    代码清单5-16 PullMessageProcessor#processRequest
    final GetMessageResult getMessageResult = this.brokerController
        .getMessageStore()
        .getMessage(requestHeader.getConsumerGroup(),
                    requestHeader.getTopic(),
                    requestHeader.getQueueId(),
                    requestHeader.getQueueOffset(),
                    requestHeader.getMaxMsgNums(),
                    messageFilter);

    第二步:调用MessageStore.getMessage查找消息,如代码清单5-17所示,该方法参数含义如下。

    1)String group:消费组名称。 2)String topic:主题名称。 3)int queueId:队列ID。 4)long offset:待拉取偏移量。 5)int maxMsgNums:最大拉取消息条数。 6)MessageFilter messageFilter:消息过滤器。

    代码清单5-17 DefaultMessageStore#getMessage
    GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    long nextBeginOffset = offset;
    long minOffset = 0;
    long maxOffset = 0;
    GetMessageResult getResult = new GetMessageResult();
    final long maxOffsetPy = this.commitLog.getMaxOffset();
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

    第三步:根据主题名称与队列编号获取消息消费队列,如代码清单5-18所示。

    1)nextBeginOffset:待查找队列的偏移量。 2)minOffset:当前消息队列的最小偏移量。 3)maxOffset:当前消息队列的最大偏移量。 4)maxOffsetPy:当前CommitLog文件的最大偏移量。

    代码清单5-18 DefaultMessageStore#getMessage
    minOffset = consumeQueue.getMinOffsetInQueue();
    maxOffset = consumeQueue.getMaxOffsetInQueue();
    
    if (maxOffset == 0) {
        status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    } else if (offset < minOffset) {
        status = GetMessageStatus.OFFSET_TOO_SMALL;
        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
    } else if (offset == maxOffset) {
        status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
        nextBeginOffset = nextOffsetCorrection(offset, offset);
    } else if (offset > maxOffset) {
        status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
        if (0 == minOffset) {
            nextBeginOffset = nextOffsetCorrection(offset, minOffset);
        } else {
            nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
        }
    }

    第四步:消息偏移量异常情况校对下一次拉取偏移量。

    1)maxOffset=0:表示当前消费队列中没有消息,拉取结果为NO_MESSAGE_IN_QUEUE。如果当前Broker为主节点,下次拉取偏移量为0。如果当前Broker为从节点并且offsetCheckInSlave为true,设置下次拉取偏移量为0。其他情况下次拉取时使用原偏移量。 2)offset<minOffset:表示待拉取消息偏移量小于队列的起始偏移量,拉取结果为OFFSET_TOO_SMALL。如果当前Broker为主节点,下次拉取偏移量为队列的最小偏移量。如果当前Broker为从节点并且offsetCheckInSlave为true,下次拉取偏移量为队列的最小偏移量。其他情况下次拉取时使用原偏移量。 3)offset==maxOffset:如果待拉取偏移量等于队列最大偏移量,拉取结果为OFFSET_OVERFLOW_ONE,则下次拉取偏移量依然为offset。 4)offset>maxOffset:表示偏移量越界,拉取结果为OFFSET_OVERFLOW_BADLY。此时需要考虑当前队列的偏移量是否为0,如果当前队列的最小偏移量为0,则使用最小偏移量纠正下次拉取偏移量。如果当前队列的最小偏移量不为0,则使用该队列的最大偏移量来纠正下次拉取偏移量。纠正逻辑与1)、2)相同。

    第五步:如果待拉取偏移量大于minOffset并且小于maxOffset,从当前offset处尝试拉取32条消息,在第4章详细介绍了根据消息队列偏移量(ConsumeQueue)从CommitLog文件中查找消息的过程,在这里就不重复介绍了。

    第六步:根据PullResult填充responseHeader的NextBeginOffset、MinOffset、MaxOffset,如代码清单5-19所示。

    代码清单5-19 PullMessageProcessor#processRequest
    response.setRemark(getMessageResult.getStatus().name());
    responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
    responseHeader.setMinOffset(getMessageResult.getMinOffset());
    responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

    第七步:根据主从同步延迟,如果从节点数据包含下一次拉取的偏移量,则设置下一次拉取任务的brokerId。

    第八步:如代码清单5-20所示,GetMessageResult与Response进行状态编码转换,具体转换说明如表5-1所示。

    image 2025 01 18 15 18 40 190
    Figure 6. 表5-1 GetMessageResult与Response状态编码转换
    代码清单5-20 PullMessageProcessor#processRequest
    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
    
    if (storeOffsetEnable) {
        this.brokerController.getConsumerOffsetManager().commitOffset(
            RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(),
            requestHeader.getTopic(),
            requestHeader.getQueueId(),
            requestHeader.getCommitOffset()
        );
    }

    第九步:如果CommitLog标记为可用并且当前节点为主节点,则更新消息消费进度,消息消费进度的详情将在5.6节重点讨论。

    服务端消息拉取处理完毕,将返回结果拉取到消息调用方。在调用方,需要重点关注PULL_RETRY_IMMEDIATELY、PULL_OFFSET_MOVED、PULL_NOT_FOUND等情况下如何校正拉取偏移量。

  3. 消息拉取客户端处理消息

    回到消息拉取客户端调用入口:MQClientAPIImpl#pullMessageAsync,NettyRemotingClient在收到服务端响应结构后,会回调PullCallback的onSuccess或onException,PullCallBack对象在DefaultMQPushConsumerImpl#pullMessage中创建,如代码清单5-21所示。

    代码清单5-21 MQClientAPIImpl#processPullResponse
    PullStatus pullStatus = PullStatus.NO_NEW_MSG;
    
    switch (response.getCode()) {
        case ResponseCode.SUCCESS:
            pullStatus = PullStatus.FOUND;
            break;
        case ResponseCode.PULL_NOT_FOUND:
            pullStatus = PullStatus.NO_NEW_MSG;
            break;
        case ResponseCode.PULL_RETRY_IMMEDIATELY:
            pullStatus = PullStatus.NO_MATCHED_MSG;
            break;
        case ResponseCode.PULL_OFFSET_MOVED:
            pullStatus = PullStatus.OFFSET_ILLEGAL;
            break;
        default:
            throw new MQBrokerException(response.getCode(), response.getRemark());
    }
    
    return new PullResultExt(); // (省略PullResultExt拉取结果的解码)

    第一步:根据响应结果解码成PullResultExt对象,此时只是从网络中读取消息列表中的byte[]messageBinary属性,如代码清单5-22所示。先重点看一下拉取状态码的转换,如表5-2所示。

    image 2025 01 18 15 22 41 602
    Figure 7. 表5-2 PullStatus与Response状态编码转换
    代码清单5-22 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);

    第二步:调用pullAPIWrapper的processPullResult,将消息字节数组解码成消息列表并填充msgFoundList,对消息进行消息过滤(TAG模式)。PullResult类图如图5-11所示。

    image 2025 01 18 15 23 46 737
    Figure 8. 图5-11 PullResult类图

    下面逐一分析PullResult的核心属性。

    1)pullStatus:拉取结果。 2)nextBeginOffset:下次拉取的偏移量。 3)minOffset:消息队列的最小偏移量。 4)maxOffset:消息队列的最大偏移量。 5)msgFoundList:具体拉取的消息列表。

    接下来按照正常流程,即分析拉取结果为PullStatus.FOUND(找到对应的消息)的情况来分析整个消息拉取过程,如代码清单5-23所示。

    代码清单5-23 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    long prevRequestOffset = pullRequest.getNextOffset();
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }

    第三步:更新PullRequest的下一次拉取偏移量,如果msgFoundList为空,则立即将PullReqeuest放入PullMessageService的pullRequestQueue,以便PullMessageSerivce能及时唤醒并再次执行消息拉取,如代码清单5-24所示。为什么PullStatus.msgFoundList还会为空呢?因为RocketMQ根据TAG进行消息过滤时,在服务端只是验证了TAG的哈希码,所以客户端再次对消息进行过滤时,可能会出现msgFoundList为空的情况。更多有关消息过滤的知识将在5.8节重点介绍。

    代码清单5-24 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume
    );

    第四步:首先将拉取到的消息存入ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中供消费者消费,如代码清单5-25所示。该方法是一个异步方法,也就是PullCallBack将消息提交到ConsumeMessageService中就会立即返回,至于这些消息如何消费,PullCallBack不会关注。

    代码清单5-25 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    } else {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }

    第五步:将消息提交给消费者线程之后,PullCallBack将立即返回,可以说本次消息拉取顺利完成。然后查看pullInterval参数,如果pullInterval>0,则等待pullInterval毫秒后将PullRequest对象放入PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果。

    再来分析消息拉取异常处理是如何校对拉取偏移量的。

    1)NO_NEW_MSG、NO_MATCHED_MSG

    如果返回NO_NEW_MSG(没有新消息)、NO_MATCHED_MSG(没有匹配消息),则直接使用服务器端校正的偏移量进行下一次消息的拉取,如代码清单5-26所示。

    代码清单5-26 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    case NO_NEW_MSG:
    case NO_MATCHED_MSG:
        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
        break;

    再来看服务端如何校正Offset。

    NO_NEW_MSG对应GetMessageResult.OFFSET_FOUND_NULL、GetMessageResult.OFFSET_OVERFLOW_ONE。

    OFFSET_OVERFLOW_ONE表示待拉取消息的物理偏移量等于消息队列最大的偏移量,如果有新的消息到达,此时会创建一个新的ConsumeQueue文件,因为上一个ConsueQueue文件的最大偏移量就是下一个文件的起始偏移量,所以可以按照该物理偏移量第二次拉取消息。

    OFFSET_FOUND_NULL表示根据ConsumeQueue文件的偏移量没有找到内容,使用偏移量定位到下一个ConsumeQueue文件,其实就是offset +(一个ConsumeQueue文件包含多少个条目=MappedFileSize / 20)。

    2)OFFSET_ILLEGAL

    如果拉取结果显示偏移量非法,首先将ProcessQueue的dropped设为true,表示丢弃该消费队列,意味着ProcessQueue中拉取的消息将停止消费,然后根据服务端下一次校对的偏移量尝试更新消息消费进度(内存中),然后尝试持久化消息消费进度,并将该消息队列从RebalacnImpl的处理队列中移除,意味着暂停该消息队列的消息拉取,等待下一次消息队列重新负载。如代码清单5-27所示,OFFSET_ILLEGAL对应服务端GetMessageResult状态的NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY、OFFSET_TOO_SMALL,这些状态服务端偏移量校正基本上使用原偏移量,在客户端更新消息消费进度时只有当消息进度比当前消费进度大才会覆盖,以此保证消息进度的准确性。

    代码清单5-27 DefaultMQPushConsumerImpl$PullCallBack#onSuccess
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    pullRequest.getProcessQueue().setDropped(true);
    
    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
        public void run() {
            try {
                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(
                    pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false
                );
                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
            } catch (Throwable e) {
                // Handle the exception if necessary
            }
        }
    }, 10000);

    RocketMQ的消息拉取过程比较复杂,其核心流程如图5-12所示。

    image 2025 01 18 15 30 53 504
    Figure 9. 图5-12 RocketMQ消息拉取流程图
  4. 消息拉取长轮询机制分析

    RocketMQ并没有真正实现推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发送消息拉取请求,如果消息消费者向RocketMQ发送消息拉取时,消息并未到达消费队列,且未启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起),再去判断消息是否已到达消息队列。如果消息未到达,则提示消息拉取客户端PULL_NOT_FOUND(消息不存在),如果开启长轮询模式,RocketMQ一方面会每5s轮询检查一次消息是否可达,同时一有新消息到达后,立即通知挂起线程再次验证新消息是否是自己感兴趣的,如果是则从CommitLog文件提取消息返回给消息拉取客户端,否则挂起超时,超时时间由消息拉取方在消息拉取时封装在请求参数中,推模式默认为15s,拉模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis进行设置。RocketMQ通过在Broker端配置longPollingEnable为true来开启长轮询模式。消息拉取时服务端从CommitLog文件中未找到消息的处理逻辑,如代码清单5-28所示。

    代码清单5-28 PullMessageProcessor#processRequest
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
        // 省略相关代码
    
        case ResponseCode.PULL_NOT_FOUND:
            if (brokerAllowSuspend && hasSuspendFlag) {
                long pollingTimeMills = suspendTimeoutMillisLong;
                if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                }
    
                String topic = requestHeader.getTopic();
                long offset = requestHeader.getQueueOffset();
                int queueId = requestHeader.getQueueId();
    
                PullRequest pullRequest = new PullRequest(
                    request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(),
                    offset, subscriptionData, messageFilter
                );
    
                this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                response = null;
                break;
            }
    }

    1)Channel channel:网络通道,通过该通道向消息拉取客户端发送响应结果。 2)RemotingCommand request:消息拉取请求。 3)boolean brokerAllowSuspend:Broker端是否支持挂起,处理消息拉取时默认传入true,表示如果未找到消息则挂起,如果该参数为false,未找到消息时直接返回客户端消息未找到。 如果brokerAllowSuspend为true,表示支持挂起,则将响应对象response设置为null,不会立即向客户端写入响应,hasSuspendFlag参数在拉取消息时构建的拉取标记默认为true。

    默认支持挂起,根据是否开启长轮询决定挂起方式。如果开启长轮询模式,挂起超时时间来自请求参数,推模式默认为15s,拉模式通过DefaultMQPullConsumer#brokerSuspenMaxTimeMillis进行设置,默认20s。然后创建拉取任务PullRequest并提交到PullRequestHoldService线程中。

    RocketMQ轮询机制由两个线程共同完成。

    1)PullRequestHoldService:每隔5s重试一次。 2)DefaultMessageStore#ReputMessageService:每处理一次重新拉取,线程休眠1s,继续下一次检查。

  5. PullRequestHoldService线程详解

PullRequestHoldService线程如代码清单5-29所示。

代码清单5-29 PullRequestHoldService#suspendPullRequest
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);

    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }

    mpr.addPullRequest(pullRequest);
}

根据消息主题与消息队列构建key,从ConcurrentMap<String/* topic@queueId */,ManyPullRequest> pullRequestTable中获取该主题队列对应的ManyPullRequest,通过ConcurrentMap的并发特性,维护主题队列的ManyPullRequest,然后将PullRequest放入ManyPullRequest。ManyPullRequest对象内部持有一个PullRequest列表,表示同一主题队列的累积拉取消息任务,如代码清单5-30所示。

代码清单5-30 PullRequestHoldService#run
public void run() {
    log.info("{} service started", this.getServiceName());

    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;

            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

如果开启长轮询,每5s判断一次新消息是否到达。如果未开启长轮询,则默认等待1s再次判断,可以通过BrokerConfig#shortPollingTimeMills改变等待时间。

PullRequestHold Service的核心逻辑如代码清单5-31所示。

代码清单5-31 PullRequestHoldService#checkHoldRequest
private void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);

        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);

            try {
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}
代码清单5-32 PullRequestHoldService#notifyMessageArriving
List<PullRequest> requestList = mpr.cloneListAndClear();

public synchronized List<PullRequest> cloneListAndClear() {
    if (!this.pullRequestList.isEmpty()) {
        List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
        this.pullRequestList.clear();
        return result;
    }
    return null;
}

第一步:首先从ManyPullRequest中获取当前该主题队列所有的挂起拉取任务。值得注意的是,该方法使用了synchronized,说明该数据结构存在并发访问,该属性是PullRequestHoldService线程的私有属性。下文重点提到的ReputMessageService内部将持有PullRequestHoldService,也会唤醒挂起线程,从而执行消息拉取尝试,如代码清单5-33所示。

代码清单5-33 PullRequestHoldService#notifyMessageArriving
if (newestOffset > request.getPullFromThisOffset()) {
    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(
        tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)
    );

    if (match && properties != null) {
        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
    }

    if (match) {
        try {
            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
                request.getClientChannel(), request.getRequestCommand()
            );
        } catch (Throwable e) {
            log.error("execute request when wakeup failed.", e);
        }
        continue;
    }
}

第二步:如果消息队列的最大偏移量大于待拉取偏移量,且消息匹配,则调用executeRequest WhenWakeup将消息返回给消息拉取客户端,否则等待下一次尝试,如代码清单5-34所示。

代码清单5-34 PullRequestHoldService#notifyMessageArriving
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
    try {
        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
            request.getClientChannel(), request.getRequestCommand()
        );
    } catch (Throwable e) {
        log.error("execute request when wakeup failed.", e);
    }
    continue;
}

第三步:如果挂起超时,则不继续等待,直接返回客户消息未找到,如代码清单5-35所示。

代码清单5-35 PullMessageProcessor#executeRequestWhenWakeup
final RemotingCommand response = PullMessageProcessor.this.
processRequest(channel, request, false);

第四步:这里的核心又回到长轮询的入口代码了,其核心是设置brokerAllowSuspend为false,表示不支持拉取线程挂起,即当根据偏移量无法获取消息时,将不挂起线程并等待新消息,而是直接返回告诉客户端本次消息拉取未找到消息。

回想一下,如果开启了长轮询机制,PullRequestHoldService线程每隔5s被唤醒,尝试检测是否有新消息到来,直到超时才停止,如果被挂起,需要等待5s再执行。消息拉取的实时性比较差,为了避免这种情况,RocketMQ引入另外一种机制:当消息到达时唤醒挂起线程,触发一次检查。

  1. DefaultMessageStore#ReputMessageService详解

ReputMessageService线程主要是根据CommitLog文件将消息转发到ConsumeQueue、Index等文件,该部分已经在第4章进行了详细解读,本节关注doReput()方法关于长轮询的相关实现,如代码清单5-36所示。

代码清单5-36 DefaultMessageStore#start
if (this.getMessageStoreConfig().isDuplicationEnable()) {
    this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
    this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();

如果允许消息重复,将重新推送偏移量设置为CommitLog文件的提交偏移量,如果不允许重复推送,则设置重新推送偏移为CommitLog的当前最大偏移量,如代码清单5-37所示。

代码清单5-37 DefaultMessageStore#ReputMessageService#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(
        dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(),
        dispatchRequest.getConsumeQueueOffset() + 1,
        dispatchRequest.getTagsCode(),
        dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(),
        dispatchRequest.getPropertiesMap()
    );
}

当新消息达到CommitLog文件时,ReputMessageService线程负责将消息转发给ConsumeQueue文件和Index文件,如果Broker端开启了长轮询模式并且当前节点角色主节点,则将调用PullRequestHoldService线程的notifyMessageArriving()方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式实现了准实时消息拉取。