顺序消息

RocketMQ 支持局部消息顺序消费,可以确保同一个消息消费队列中的消息按顺序消费,如果需要做到全局顺序消费,则可以将主题配置成一个队列,适用于数据库 BinLog 等严格要求顺序消息消费的场景。并发消息消费包含 4 个步骤:消息队列负载、消息拉取、消息消费、消息消费进度存储。

消息队列负载

RocketMQ 首先需要通过 RebalanceService 线程实现消息队列的负载,集群模式下同一个消费组内的消费者共同承担其订阅主题下消息队列的消费,同一个消息消费队列在同一时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列,如代码清单 5-86 所示。

代码清单5-86 RebalanceImpl#updateProcessQueueTableInRebalance
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }

        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();
        // 省略部分代码
        long nextOffset = this.computePullFromWhere(mq);
        PullRequest pullRequest = new PullRequest();
        pullRequest.setProcessQueue(pq);
        pullRequestList.add(pullRequest);
    }
}

经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向 Broker 发起锁定该消息队列的请求,如果返回加锁成功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。

顺序消息消费与并发消息消费的一个关键区别是,顺序消息在创建消息队列拉取任务时,需要在 Broker 服务器锁定该消息队列。

消息拉取

RocketMQ 消息拉取由 PullMessageService 线程负责,根据消息拉取任务循环拉取消息,如代码清单 5-87 所示。

代码清单5-87 DefaultMQPushConsumerImpl#pullMessage
if (this.consumeOrderly) {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean broker
            Busy = offset < pullRequest.getNextOffset();
            log.info("the first time to pull message, so fix offset from br oker. pullReques t: {} NewOffset: {} brokerBusy: {}"
                    , pullRequest, offset, brokerBusy);
            if (brokerBusy) {
                log.info(" [NOTIFYME]the first time to pull message, but pull request offset larger tha n broker consume offset. pullReques t: {} NewOffset:                {}", pullRe
                        quest, offset);
            }
            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
}

如果消息处理队列未被锁定,则延迟 3s 后再将 PullRequest 对象放入拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取偏移量,然后向消息服务端拉取消息。

消息消费

顺序消息消费实现类 org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderly Service 的核心类图如图 5-27 所示。

image 2025 02 05 13 48 17 978
Figure 1. 图5-27 ConsumeMessageOrderlyService类图

下面逐一介绍 ConsumeMessageOrderlyService 的核心属性。

  1. DefaultMQPushConsumerImpl defaultMQPushConsumerImpl:消息消费者实现类。

  2. DefaultMQPushConsumer defaultMQPushConsumer:消息消费者。

  3. MessageListenerOrderly messageListener:顺序消息消费监听器。

  4. BlockingQueue consumeRequestQueue:消息消费任务队列。

  5. ThreadPoolExecutor consumeExecutor:消息消费线程池。

  6. String consumerGroup:消息组名。

  7. MessageQueueLock messageQueueLock:消息消费端消息消费队列锁容器,内部持有 ConcurrentMap<MessageQueue, Object> mqLockTable =newConcurrentHashMap<MessageQueue, Object>()。

  8. ScheduledExecutorService scheduledExecutorService:调度任务线程池。

ConsumeMessageOrderlyService构造方法

ConsumeMessageOrderlyService 构造方法如代码清单 5-88 所示。

代码清单5-88 ConsumeMessageOrderlyService构造方法
this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeTh readMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

初始化实例参数,这里的关键是消息任务队列为 LinkedBlockingQueue,消息消费线程池最大运行时线程个数为 consumeThreadMin,consumeThreadMax 参数将失效。

ConsumeMessageOrderlyService启动方法

ConsumeMessageOrderlyService 启动方法如代码清单 5-89 所示。

代码清单5-89 ConsumeMessageOrderlyService#start
public void start() {

    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                     public void run() {
                         ConsumeMessageOrderlyService.this.lockMQPeriodically();
                     }
                 },
            1000 * 1,
            ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

如果消费模式为集群模式,启动定时任务,默认每隔 20s 锁定一次分配给自己的消息消费队列。通过 Drocketmq.client.rebalance.lockInterval=20000 设置间隔,该值建议与一次消息负载频率相同。从上文可知,集群模式下顺序消息消费在创建拉取任务时并未将 ProcessQueue 的 locked 状态设置为 true,在未锁定消息队列时无法执行消息拉取任务,ConsumeMessageOrderlyService 以 20s 的频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。接下来分析一下解锁的具体实现,如代码清单 5-90 所示。

代码清单5-90 RebalanceImpl#buildProcessQueueTableByBrokerName
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
    HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
    for (MessageQueue mq : this.processQueueTable.keySet()) {
        Set<MessageQueue> mqs = result.get(mq.getBrokerName());
        if (null == mqs) {
            mqs = new HashSet<MessageQueue>();
            result.put(mq.getBrokerName(), mqs);
        }
        mqs.add(mq);
    }
    return result;
}

第一步:ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 表示将消息队列按照 Broker 组织成 Map<String brokerName,Set<MessageQueue>>,方便下一步向 Broker 发送锁定消息队列请求,如代码清单 5-91 所示。

代码清单5-91 RebalanceImpl#lockAll
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

第二步:向 Broker(主节点)发送锁定消息队列,该方法会返回成功被当前消费者锁定的消息消费队列,如代码清单 5-92 所示。

代码清单5-92 RebalanceImpl#lockAll
for (MessageQueue mq : lockOKMQSet) {
    ProcessQueue processQueue = this.processQueueTable.get(mq);
    if (processQueue != null) {
        if (!processQueue.isLocked()) {
            log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
        }
        processQueue.setLocked(true);
        processQueue.setLastLockTimestamp(System.currentTimeMillis());
    }
}

第三步:将成功锁定的消息消费队列对应的处理队列设置为锁定状态,同时更新加锁时间,如代码清单 5-93 所示。

代码清单5-93 RebalanceImpl#lockAll
for (MessageQueue mq : mqs) {
    if (!lockOKMQSet.contains(mq)) {
        ProcessQueue processQueue = this.processQueueTable.get(mq);
        if (processQueue != null) {
            processQueue.setLocked(false);
            log.warn("the mes sage queue locked Failed, Group:{} {}", this.consumerGroup, mq);
        }
    }
}

第四步:遍历当前处理队列中的消息消费队列,如果当前消费者不持该消息队列的锁,则将处理队列锁的状态设置为 false,暂停该消息消费队列的消息拉取与消息消费。

ConsumeMessageOrderlyService提交消费任务

ConsumeMessageOrderlyService 提交消费任务如代码清单 5-94 所示。

代码清单5-94 ConsumeMessageOrderlyService#submitConsumeRequest
public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue,final MessageQueue messageQueue, final boolean dispathToConsume){
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

构建消费任务 ConsumeRequest 并提交到消费线程池中。ConsumeRequest 类图如图 5-28 所示。

image 2025 02 05 23 14 25 750
Figure 2. 图5-28 ConsumeMessageOrderlyService$ConsumeRequest类图

顺序消息的 ConsumeRequest 消费任务不会直接消费本次拉取的消息,而是在消息消费时从处理队列中拉取消息,接下来详细分析 ConsumeRequest 的 run() 方法,如代码清单 5-95 所示。

代码清单5-95 ConsumeMessageOrderlyService$ConsumeRequest#run
if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
}

第一步:如果消息处理队列为丢弃,则停止本次消费任务,如代码清单 5-96 所示。

代码清单5-96 ConsumeMessageOrderlyService$ConsumeRequest#run
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {

第二步:根据消息队列获取一个对象。消费消息时申请独占 objLock,顺序消息消费的并发度为消息队列,也就是一个消息消费队列同一时刻只会被一个消费线程池中的一个线程消费,如代码清单 5-97 所示。

代码清单5-97 ConsumeMessageOrderlyService$ConsumeRequest#run
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) ||(this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
    // 消息消费逻辑
} else {
    if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    }
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}

第三步:如果是广播模式,则直接进入消费,无须锁定处理队列,因为相互之间无竞争。如果是集群模式,消息消费的前提条件是 proceessQueue 被锁定并且锁未超时。思考一下,如果消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,还未来得及将 processQueue 解除锁定,就被另外一个消费者添加进去,此时会不会出现多个消息消费者同时消费一个消息队列的情况?答案是不会的,因为当一个新的消费队列分配给消费者时,在添加其拉取任务之前必须先向 Broker 发送对该消息队列加锁的请求,只有加锁成功后,才能添加拉取消息,否则等到下一次负载后,只有消费队列被原先占有的消费者释放后,才能开始新的拉取任务。集群模式下,如果未锁定处理队列,则延迟该队列的消息消费,如代码清单 5-98 所示。

代码清单5-98 ConsumeMessageOrderlyService$ConsumeRequest#run
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
    // 省略相关代码
    long interval = System.currentTimeMillis() - beginTime;
    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
        break;
    }
}

第四步:顺序消息消费处理逻辑,每一个 ConsumeRequest 消费任务不是以消费消息条数来计算的,而是根据消费时间,默认当消费时长大于 MAX_TIME_CONSUME_CONTINUOUSLY 后,结束本次消费任务,由消费组内其他线程继续消费,如代码清单 5-99 所示。

代码清单5-99 ConsumeMessageOrderlyService$ConsumeRequest#run
final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);

第五步:每次从处理队列中按顺序取出 consumeBatchSize 消息,如果未取到消息,则设置 continueConsume 为 false,本次消费任务结束。消费顺序消息时,从 ProceessQueue 中取出的消息会临时存储在 ProcessQueue 的 consumingMsgOrderlyTreeMap 属性中。

第六步:执行消息消费钩子函数(消息消费之前 before 方法)。通过 DefaultMQPushConsumerImpl#registerConsumeMessageHook(ConsumeMessageHookconsumeMessagehook)注册消息消费钩子函数并可以注册多个,如代码清单 5-100 所示。

代码清单5-100 ConsumeMessageOrderlyService$ConsumeRequest#run
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
    this.processQueue.getLockConsume().lock();
    if (this.processQueue.isDropped()) {
        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        break;
    }
    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {
    hasException = true;
} finally {
    this.processQueue.getLockConsume().unlock();
}

第七步:申请消息消费锁,如果消息队列被丢弃,则放弃消费该消息消费队列,然后执行消息消费监听器,调用业务方具体的消息监听器执行真正的消息消费处理逻辑,并通知 RocketMQ 消息消费结果。

第八步:执行消息消费钩子函数,计算消息消费过程中应用程序抛出的异常,钩子函数的后处理逻辑也会被调用。

第九步:如果消息消费结果为 ConsumeOrderlyStatus.SUCCESS,执行 ProcessQueue 的 commit() 方法,并返回待更新的消息消费进度,如代码清单 5-101 所示。

代码清单5-101 ProcessQueue#commit
public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.msgTreeMapTemp.lastKey();
            msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
            this.msgTreeMapTemp.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }
    return -1;
}

提交就是将该批消息从 ProcessQueue 中移除,维护 msgCount(消息处理队列中的消息条数)并获取消息消费的偏移量 offset,然后将该批消息从 msgTreeMapTemp 中移除,并返回待保存的消息消费进度(offset+1)。从中可以看出,offset 表示消息消费队列的逻辑偏移量,类似于数组的下标,代表第 n 个 ConsumeQueue 条目,如代码清单 5-102 所示。

代码清单5-102 ConsumeMessageOrderlyService#processConsumeResult
if (checkReconsumeTimes(msgs)) {
    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
    this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
    continueConsume = false;
} else {
    commitOffset = consumeRequest.getProcessQueue().commit();
}

检查消息的重试次数。如果消息重试次数大于或等于允许的最大重试次数,将该消息发送到 Broker 端。该消息在消息服务端最终会进入 DLQ(死信队列),也就是 RocketMQ 不会再次消费,需要人工干预。如果消息成功进入 DLQ 队列,checkReconsumeTimes 返回 false,将直接调用 ProcessQueue#commit 提交该批消息,表示消息消费成功,如果这批消息中有任意一条消息的重试次数小于允许的最大重试次数,将返回 true,执行消息重试。

对于 RocketMQ 顺序消费,失败重试次数为 Integer.MAX_VALUE,即一直重试,会组织消息进度向前推进,故应用需要在超过重试次数时,引入人为干预机制。特别是要区分业务异常与系统异常,业务异常通常是因为不满足某项业务规则,重试将注定无法成功,故一定要设置一定的规则,进行业务降级。

消息消费重试是先将该批消息重新放入 ProcessQueue 的 msgTreeMap,然后清除 consumingMsgOrderlyTreeMap,默认延迟 1s 再加入消费队列并结束此次消息消费。

可以通过 DefaultMQPushConsumer#setSuspendCurrentQueueTimeMillis 设置当前队列的重试挂起时间。执行消息重试时,如果消息消费进度并未向前推进,则将本次消费视为无效消费,将不更新消息消费进度。

第十步:存储消息消费进度。

消息队列锁实现

顺序消息消费的各个环节基本都是围绕消息消费队列(MessageQueue)与消息处理队列(ProcessQueue)展开的。拉取消息消费进度,要判断 ProcessQueue 的 locked 是否为 true,为 true 的前提条件是消息消费者(cid)向 Broker 端发送锁定消息队列的请求并返回加锁成功。服务端关于 MessageQueue 加锁处理类是 org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager。类图如图 5-29 所示。

image 2025 02 05 23 39 58 671
Figure 3. 图5-29 RebalanceLockManager类图

ConcurrentMap mqLockTable:锁容器,以消息消费组分组,每个消息队列对应一个锁对象,表示当前该消息队列被消费组中哪个消费者所持有。核心方法如下。

  1. public Set tryLockBatch(String group, Set mqs, String clientId):申请对 mqs 消息消费队列集合加锁。

    • String group:消息消费组名。

    • Set mqs:待加锁的消息消费队列集合。

    • String clientId:消息消费者(cid)。返回成功加锁的消息队列集合。

  2. public void unlockBatch(final String group, final Set mqs, final String clientId):申请对 mqs 消息消费队列集合解锁。

    • String group:消息消费组。

    • Set mqs:待解锁消息队列集合。

    • String clientId:持有锁的消息消费者。

上述方法都是对 ConcurrentMap<String/* group */,ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable 数据结构的维护,实现简单,不再对其进行源码分析。