消息消费过程

我们先回顾一下消息拉取的过程:PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后存入 ProcessQueue 消息处理队列中,然后调用 ConsumeMessageService#submitConsumeRequest 方法进行消息消费。使用线程池消费消息,确保了消息拉取与消息消费的解耦。RocketMQ 使用 ConsumeMessageService 来实现消息消费的处理逻辑。RocketMQ 支持顺序消费与并发消费,本节将重点关注并发消费的流程,顺序消费将在 5.9 节详细分析。ConsumeMessageService 核心类图如图 5-14 所示。

image 2025 02 05 09 41 29 342
Figure 1. 图5-14 消息消费类图

核心方法描述如下。

  1. ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg,String brokerName):直接消费消息,主要用于通过管理命令接收消费消息。

    • MessageExt msg:消息。

    • brokerName:Broker名称。

  2. void submitConsumeRequest(Listmsgs, ProcessQueue processQueue,MessageQueue messageQueue, boolean dispathToConsume):提交消息消费。

    • Listmsgs:消息列表,默认一次从服务器最多拉取32条消息。

    • ProcessQueue processQueue:消息处理队列。

    • MessageQueue messageQueue:消息所属消费队列。

    • boolean dispathToConsume:是否转发到消费线程池,并发消费时忽略该参数。

ConsumeMessageConcurrentlyService 并发消息消费核心参数解释如下。

  1. DefaultMQPushConsumerImpl defaultMQPushConsumerImpl:消息推模式实现类。

  2. DefaultMQPushConsumer defaultMQPushConsumer:消费者对象。

  3. MessageListenerConcurrently messageListener:并发消息业务事件类。

  4. BlockingQueue consumeRequestQueue:消息消费任务队列。

  5. ThreadPoolExecutor consumeExecutor:消息消费线程池。

  6. String consumerGroup:消费组。

  7. ScheduledExecutorService scheduledExecutorService:添加消费任务到 consumeExecutor 延迟调度器。

  8. ScheduledExecutorService cleanExpireMsgExecutors:定时删除过期消息线程池。为了揭示消息消费的完整过程,从服务器拉取到消息后,回调 PullCallBack 方法,先将消息放入 ProcessQueue 中,然后把消息提交到消费线程池中执行,也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入消息消费的世界。

消息消费

消费者消息消费服务 ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消费请求,具体逻辑如代码清单 5-50 所示。

代码清单5-50 ConsumeMessageConcurrentlyService#submitConsumeRequest
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
    ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
    try {
        this.consumeExecutor.submit(consumeRequest);
    } catch (RejectedExecutionException e) {
        this.submitConsumeRequestLater(consumeRequest);
    }
}

第一步:consumeMessageBatchMaxSize 表示消息批次,也就是一次消息消费任务 ConsumeRequest 中包含的消息条数,默认为 1。msgs.size() 默认最多为 32 条消息,受 DefaultMQPushConsumer.pullBatchSize 属性控制,如果 msgs.size() 小于 consumeMessage BatchMaxSize,则直接将拉取到的消息放入 ConsumeRequest,然后将 consumeRequest 提交到消息消费者线程池中。如果提交过程中出现拒绝提交异常,则延迟 5s 再提交,如代码清单 5-51 所示。这里其实是给出一种标准的拒绝提交实现方式,实际上,由于消费者线程池使用的任务队列 LinkedBlockingQueue 为无界队列,故不会出现拒绝提交异常。

代码清单5-51 ConsumeMessageConcurrentlyService#submitConsumeRequest
if (msgs.size() > consumeBatchSize) {
    for (int total = 0; total < msgs.
            size(); ) {
        List<MessageExt> msgThis = ne
        w ArrayList<MessageExt > (consumeBatchSize);
        for (int i = 0; i < consumeBa
        tchSize;
        i++, total++){
            if (total < msgs.size()) {
                msgThis.add(msgs.get(
                        total));
            } else {
                break;
            }
        }
        ConsumeRequest consumeRequest
                = new ConsumeRequest(msgThis, processQueue, messageQueue
        );
        try {
            this.consumeExecutor.subm
            it(consumeRequest);
        } catch (RejectedExecutionExc
                eption e){
            for (total < msgs.size();
                 total++)
                ; {
                msgThis.add(msgs.get(
                        total));
            }
            this.submitConsumeRequest
            Later(consumeRequest);
        }
    }
}

第二步:如果拉取的消息条数大于 consumeMessageBatchMaxSize,则对拉取消息进行分页,每页 consumeMessageBatchMaxSize 条消息,创建多个 ConsumeRequest 任务并提交到消费线程池。ConsumeRequest 的 run() 方法封装了消息消费的具体逻辑,如代码清单 5-52 所示。

代码清单5-52 ConsumeMessageConcurrentlyService$ConsumeRequest#run
if (this.processQueue.isDropped()) {
    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
    return;
}

第三步:进入具体的消息消费队列时,会先检查 processQueue 的 dropped,如果设置为 true,则停止该队列的消费。在进行消息重新负载时,如果该消息队列被分配给消费组内的其他消费者,需要将 droped 设置为 true,阻止消费者继续消费不属于自己的消息队列。

第四步:执行消息消费钩子函数 ConsumeMessageHook#consumeMessageBefore。通过 consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook) 方法消息消费执行钩子函数,如代码清单 5-53 所示。

代码清单5-53 ConsumeMessageConcurrentlyService#resetRetryTopic
public void resetRetryTopic(final List<MessageExt> msgs) {
    final String groupTopic = MixAll.getRetryTopic(consumerGroup);
    for (MessageExt msg : msgs) {
        String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
            msg.setTopic(retryTopic);
        }
    }
}

第五步:恢复重试消息主题名。这是为什么呢?这是由消息重试机制决定的,RocketMQ 将消息存入 CommitLog 文件时,如果发现消息的延时级别 delayTimeLevel 大于 0,会先将重试主题存入消息的属性,然后将主题名称设置为 SCHEDULE_TOPIC_XXXX,以便之后重新参与消息消费,如代码清单 5-54 所示。

代码清单5-54 ConsumeMessageConcurrentlyService$ConsumeRequest#run
try {
    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    hasException = true;
}

第六步:执行具体的消息消费,调用应用程序消息监听器的 consumeMessage 方法,进入具体的消息消费业务逻辑,返回该批消息的消费结果,即 CONSUME_SUCCESS(消费成功)或 RECONSUME_LATER(需要重新消费)。

第七步:执行消息消费钩子函数 ConsumeMessageHook#consumeMessageAfter,如代码清单 5-55 所示。

代码清单5-55 ConsumeMessageConcurrentlyService$ConsumeRequest#run
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status,context, this);
}

第八步:执行业务消息消费后,在处理结果前再次验证一次 ProcessQueue 的 isDroped 状态值。如果状态值为 true,将不对结果进行任何处理。也就是说,在消息消费进入第四步时,如果因新的消费者加入或原先的消费者出现宕机,导致原先分配给消费者的队列在负载之后分配给了别的消费者,那么消息会被重复消费,如代码清单 5-56 所示。

代码清单5-56 ConsumeMessageConcurrentlyService#processConsumeResult
switch (status) {
    case CONSUME_SUCCESS:
        if (ackIndex >= consumeRequest.getMsgs().size()) {
            ackIndex = consumeRequest.getMsgs().size() - 1;
        }
        break;
    case RECONSUME_LATER:
        ackIndex = -1;
        break;
    default:
        break;
}

第九步:根据消息监听器返回的结果计算 ackIndex,如果返回 CONSUME_SUCCESS,则将 ackIndex 设置为 msgs.size()-1,如果返回 RECONSUME_LATER,则将 ackIndex 设置为 -1,这是为下文发送 msg back(ACK)消息做的准备,如代码清单 5-57 所示。

代码清单5-57 ConsumeMessageConcurrentlyService#processConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }
        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);
            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

第十步:如果是广播模式,业务方会返回 RECONSUME_LATER,消息并不会被重新消费,而是以警告级别输出到日志文件中。如果是集群模式,消息消费成功,因为 ackIndex=consumeRequest.getMsgs().size()-1,所以 i=ackIndex+1 等于 consumeRequest.getMsgs().size(),并不会执行 sendMessageBack。只有在业务方返回 RECONSUME_LATER 时,该批消息都需要发送 ACK 消息,如果消息发送失败,则直接将本批 ACK 消费发送失败的消息再次封装为 ConsumeRequest,然后延迟 5s 重新消费。如果 ACK 消息发送成功,则该消息会延迟消费,如代码清单 5-58 所示。

代码清单5-58 ConsumeMessageConcurrentlyService#processConsumeResult
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

第十一步:从 ProcessQueue 中移除这批消息,这里返回的偏移量是移除该批消息后最小的偏移量。然后用该偏移量更新消息消费进度,以便消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。值得注意的是,当消息监听器返回 RECONSUME_LATER 时,消息消费进度也会向前推进,并用 ProcessQueue 中最小的队列偏移量调用消息消费进度存储器 OffsetStore 更新消费进度。这是因为当返回 RECONSUME_LATER 时,RocketMQ 会创建一条与原消息属性相同的消息,拥有一个唯一的新 msgId,并存储原消息 ID,该消息会存入 CommitLog 文件,与原消息没有任何关联,所以该消息也会进入 ConsuemeQueue,并拥有一个全新的队列偏移量。

并发消息消费的整体流程就介绍到这里,下文会对消息消费的其中两个重要步骤进行详细分析。

消息确认

如果消息监听器返回的消费结果为 RECONSUME_LATER,则需要将这些消息发送给 Broker 以延迟消息。如果发送 ACK 消息失败,将延迟 5s 后提交线程池进行消费。ACK 消息发送的网络客户端入口为 MQClientAPIImpl#consumerSendMessageBack,命令编码为 RequestCode.CONSUMER_SEND_MSG_BACK,协议头部如图 5-15 所示。

image 2025 02 05 10 48 46 723
Figure 2. 图5-15 ACK消息请求头部类图

下面逐一介绍 ConsumerSendMsgBackRequestHeader 的核心属性。

  • offset:消息物理偏移量。

  • group:消费组名。

  • delayLevel:延迟级别。RocketMQ 不支持精确的定时消息调度,而是提供几个延时级别,MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m9m 10m 20m 30m 1h 2h",delayLevel=1,表示延迟5s,delayLevel=2,表示延迟10s。

  • originMsgId:消息ID。

  • originTopic:消息主题。

  • maxReconsumeTimes:最大重新消费次数,默认16次。

客户端以同步方式发送 RequestCode.CONSUMER_SEND 到服务端。服务端命令处理器为 org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack。

第一步:获取消费组的订阅配置信息,如果配置信息为空,返回配置组信息不存在错误,如果重试队列数量小于 1,则直接返回成功,说明该消费组不支持重试。消费组核心类图如图 5-16 所示。

image 2025 02 05 10 51 43 883
Figure 3. 图5-16 消息订阅组配置信息

下面逐一介绍 SubscriptionGroupConfig 的核心属性。

  1. String groupName:消费组名。

  2. consumeEnable:是否可以消费,默认为 true,如果 consumeEnable=false,该消费组无法拉取消息,因而无法消费消息。

  3. consumeFromMinEnable:是否允许从队列最小偏移量开始消费,默认为 true,目前未使用该参数。

  4. consumeBroadcastEnable:设置该消费组是否能以广播模式消费,默认为 true,如果设置为 false,表示只能以集群模式消费。

  5. retryQueueNums:重试队列个数,默认为 1,每一个 Broker 上有一个重试队列。

  6. retryMaxTimes:消息最大重试次数,默认 16 次。

  7. brokerId:主节点 ID。

  8. whichBrokerWhenConsumeSlowly:如果消息堵塞(主节点),将转向该 brokerId 的服务器上拉取消息,默认为 1。

  9. notifyConsumerIdsChangedEnable:当消费发生变化时,是否立即进行消息队列重新负载。消费组订阅信息配置信息存储在 Broker 的 ${ROCKET_HOME}/store/config/subscriptionGroup.json 中。BrokerConfig.autoCreateSubscriptionGroup 默认为 true,表示在第一次使用消费组配置信息时如果不存在消费组,则使用上述默认值自动创建一个,如果为 false,则只能通过客户端命令 mqadmin updateSubGroup 创建消费组后再修改相关参数,如代码清单 5-59 所示。

代码清单5-59 SendMessageProcessor#consumerSendMsgBack
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

第二步:创建重试主题,重试主题名称为 %RETRY%+消费组名称,从重试队列中随机选择一个队列,并构建 TopicConfig 主题配置信息,如代码清单 5-60 所示。

代码清单5-60 SendMessageProcessor#consumerSendMsgBack
MessageExt msgExt = this.brokerController.getMessageStore()
        .lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("look message by offset failed, " +
            requestHeader.getOffset());
    return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
    MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

第三步:根据消息物理偏移量从 CommitLog 文件中获取消息,同时将消息的主题存入属性。

第四步:设置消息重试次数,如果消息重试次数已超过 maxReconsumeTimes,再次改变 newTopic 主题为 DLQ("%DLQ%"),该主题的权限为只写,说明消息一旦进入 DLQ 队列,RocketMQ 将不负责再次调度消费了,需要人工干预,如代码清单 5-61 所示。

代码清单5-61 SendMessageProcessor#consumerSendMsgBack
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.g etProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

第五步:根据原先的消息创建一个新的消息对象,重试消息会拥有一个唯一消息 ID(msgId)并存入 CommitLog 文件。这里不会更新原先的消息,而是会将原先的主题、消息 ID 存入消息属性,主题名称为重试主题,其他属性与原消息保持一致。

第六步:将消息存入 CommitLog 文件。这里想再重点突出消息重试机制,该机制的实现依托于定时任务,如代码清单 5-62 所示。

代码清单5-62 CommitLog#putMessage
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    topic = ScheduleMessageService.SCHEDULE_TOPIC;
    queueId = ScheduleMess
    ageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getPro perties()));
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

在存入 CommitLog 文件之前,如果消息的延迟级别 delayTimeLevel 大于 0,将消息的主题与队列替换为定时任务主题 “SCHEDULE_TOPIC_XXXX”,队列 ID 为延迟级别减 1。再次将消息主题、队列存入消息属性,键分别为 PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID。

ACK 消息存入 CommitLog 文件后,将依托 RocketMQ 定时消息机制在延迟时间到期后,再次拉取消息,提交至消费线程池,定时任务机制的细节将在 5.7 节进行分析。ACK 消息是同步发送的,如果在发送过程中出现错误,将记录所有发送 ACK 消息失败的消息,然后再次封装成 ConsumeRequest,延迟 5s 执行。

消费进度管理

消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,否则当消费者重新启动时,又要从消息消费队列最开始消费。从 5.6.1 节也可以看到,一次消息消费后会从 ProcessQueue 处理队列中移除该批消息,返回 ProcessQueue 的最小偏移量,并存入消息进度表。那么消息进度文件存储在哪里合适呢?

  • 广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。

  • 集群模式:同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化而重新负载,因此消费进度需要保存在每个消费者都能访问到的地方。

RocketMQ 消息消费进度接口如图 5-17 所示。

image 2025 02 05 11 12 59 548
Figure 4. 图5-17 消息进度OffsetStore类图
  • void load():从消息进度存储文件加载消息进度到内存。

  • void updateOffset(MessageQueue mq, long offset, boolean increaseOnly):更新内存中的消息消费进度。

    • MessageQueue mq:消息消费队列。

    • long offset:消息消费偏移量。

    • increaseOnly:true表示offset必须大于内存中当前的消费偏移量才更新。

  • long readOffset(final MessageQueue mq, final ReadOffsetType type):读取消息消费进度。

    • mq:消息消费队列。

    • ReadOffsetType type:读取方式,可选值包括 READ_FROM_MEMORY,即从内存中读取,READ_FROM_STORE,即从磁盘中读取,MEMORY_FIRST_THEN_STORE,即先从内存中读取,再从磁盘中读取。

  • void persistAll(final Set messageQueue)持久化指定消息队列进度到磁盘。

Set messageQueue:消息队列集合。

  • void removeOffset(messageQueue mq):将消息队列的消息消费进度从内存中移除。

  • Map cloneOffsetTable(String topic):复制该主题下所有消息队列的消息消费进度。

  • void updateConsumeOffsetToBroker(MessageQueue mq,long offset,booleanisOneway):使用集群模式更新存储在 Broker 端的消息消费进度。

广播模式消费进度存储

广播模式消息消费进度存储在消费者本地,其实现类为 org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore,如代码清单 5-63 所示。

代码清单5-63 LocalFileOffsetStore
public final static String LOCAL_OFFSET_STORE_DIR =
        System.getProperty("rocketmq.client.localOffset StoreDir", System.getProperty(" user.home") + File.separator + ".rocketmq_offsets");
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
  • LOCAL_OFFSET_STORE_DIR:消息进度存储目录,可以通过 -Drocketmq.client.localOffsetStoreDir 指定,如果未指定,则默认为用户主目录 /.rocketmq_offsets。

  • MQClientInstance mQClientFactory:消息客户端。

  • groupName:消息消费组。

  • storePath:消息进度存储文件 LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json。

  • ConcurrentMap<MessageQueue, AtomicLong> offsetTable:消息消费进度(内存)。

下面对 LocalFileOffsetStore 核心方法进行简单介绍。

load() 方法如代码清单 5-64 所示。

代码清单5-64 LocalFileOffsetStore#load
public void load() throws MQClientException {
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null){
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
        for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()){
            AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
            log.info("load consumer's offset, {} {} {}", this.groupName, mq, offset.get());
        }
    }
}

OffsetSerializeWrapper 内部就是 ConcurrentMap<MessageQueue,AtomicLong>offsetTable 数据结构的封装,readLocalOffset 方法首先从 storePath 中尝试加载内容,如果读取的内容为空,尝试从 storePath + ".bak" 中加载,如果还是未找到内容,则返回 null。消息进度文件存储内容如图 5-18 所示。

image 2025 02 05 11 27 32 345
Figure 5. 图5-18 消息进度文件内容

广播模式下消费进度与消费组没什么关系,直接保存 MessageQueue:Offset。

persistAll(Set<MessageQueue> mqs) 持久化消息进度如代码清单 5-65 所示。

public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty()) return;
    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }
    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) {
        try {
            MixAll.string2File(jsonString, this.storePath);
        } catch (IOException e) {
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        }
    }
}

持久化消息进度就是将 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 序列化到磁盘文件中。代码不容易理解,我们只需要知道是什么时候持久化消息消费进度的。原来在 MQClientInstance 中会启动一个定时任务,默认每 5s 持久化消息消费进度一次,可通过 persistConsumerOffsetInterval 进行设置,如代码清单 5-66 所示。

代码清单5-66 LocalFileOffsetStore#persistAll
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

对广播模式的消息消费进度进行存储、更新、持久化还是比较容易的,本书就简单介绍到这里,接下来重点分析集群模式下的消息进度管理。

集群模式消费进度存储

集群模式消息进度存储文件存放在消息服务端。消息消费进度集群模式实现类 org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore 的实现原理如图 5-19 所示。

image 2025 02 05 11 32 38 285
Figure 6. 图5-19 集群模式消息消费进度实现原理图

集群模式下消息消费进度的读取、持久化与广播模式的实现细节差不多,集群模式下如果从内存中读取消费进度,则是从 RemoteBrokerOffsetStore 的 ConcurrentMap<MessageQueue,Atomic-Long> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>() 中根据消息消费队列获取其消息消费进度。如果从磁盘读取,则发送网络请求,请求命令为 QUERY_CONSUMER_OFFSET 。持久化消息进度的请求命令为 UPDATE_CONSUMER_OFFSET,更新 ConsumerOffsetManager 的 ConcurrentMap<String/* topic@group /,ConcurrentMap< Integer/*消息队列ID/, Long/* 消息消费进度 */>> offsetTable,Broker 端默认 10s 持久化一次消息进度,存储文件名为 ${RocketMQ_HOME}/store/config/consumerOffset.json。存储内容如图 5-20 所示。

image 2025 02 05 11 34 05 471
Figure 7. 图5-20 集群模式消息消费进度存储

消费进度设计思考

广播模式下,消息消费进度的存储与消费组无关,集群模式下则以主题与消费组为键,保存该主题所有队列的消费进度。我们结合并发消息消费的整个流程,思考一下并发消息消费关于消息进度更新的问题,顺序消息消费将在 5.9 节重点讨论。

  1. 消费者线程池每处理完一个消息消费任务(ConsumeRequest),会从 ProcessQueue 中移除本批消费的消息,并返回 ProcessQueue 中最小的偏移量,用该偏移量更新消息队列消费进度,也就是说更新消费进度与消费任务中的消息没有关系。例如现在有两个消费任务 task1(queueOffset 分别为 20、40)和 task2(queueOffset 分别为 50、70),并且 ProcessQueue 中当前包含最小消息偏移量为 10 的消息,则 task2 消费结束后,将使用 10 更新消费进度,而不是 70。

    当 task1 消费结束后,还是以 10 更新消息队列消费进度,消息消费进度的推进取决于 ProcessQueue 中偏移量最小的消息消费速度。如果偏移量为 10 的消息消费成功,且 ProcessQueue 中包含消息偏移量为 100 的消息,则消息偏移量为 10 的消息消费成功后,将直接用 100 更新消息消费进度。如果在消费消息偏移量为 10 的消息时发生了死锁,会导致消息一直无法被消费,岂不是消息进度无法向前推进了?是的,为了避免这种情况,RocketMQ 引入了一种消息拉取流控措施:DefaultMQPushConsumer#consumeConcurrentlyMaxSpan=2000,消息处理队列 ProcessQueue 中最大消息偏移与最小偏移量不能超过该值,如果超过该值,将触发流控,延迟该消息队列的消息拉取。

  2. 在进行消息负载时,如果消息消费队列被分配给其他消费者,会将该 ProcessQueue 状态设置为 droped,持久化该消息队列的消费进度,并从内存中将其移除。