定时消息机制

定时消息是指消息发送到 Broker 后,不会立即被消费者消费,而是要等到特定的时间后才能被消费,RocketMQ 并不支持任意的时间精度,因为如果要支持任意时间精度的定时调度,则不可避免地需要在 Broker 层做消息排序,再加上持久化方面的考量,将不可避免地带来具大的性能消耗,所以 RocketMQ 只支持特定级别的延迟消息。

消息延迟级别在 Broker 端通过 messageDelayLevel 进行配置,默认为 “1s 5s 10s 30s 1m 2m3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1 表示延迟 1s,delayLevel=2 表示延迟 5s,依次类推。说到定时任务,上文提到的消息重试正是借助定时任务实现的,在将消息存入 CommitLog 文件之前,需要判断消息的重试次数,如果重试次数大于 0,则将消息的主题设置为 SCHEDULE_TOPIC_XXXX。RocketMQ 定时消息实现类为 org.apache.rocketmq.store.schedule.ScheduleMessageService。该类的实例在 DefaultMessageStore 中创建,通过在 DefaultMessageStore 中调用 load() 方法加载并调用 start() 方法进行启动。接下来我们分析一下 ScheduleMessageService 的实现原理。

ScheduleMessageService 类图如图 5-21 所示。

image 2025 02 05 11 40 02 427
Figure 1. 图5-21 ScheduleMessageService类图

下面逐一分析 ScheduleMessageService 的核心属性。

  • SCHEDULE_TOPIC_XXXX:定时消息统一主题。

  • FIRST_DELAY_TIME:第一次调度时延迟的时间,默认为 1s。

  • DELAY_FOR_A_WHILE:每一个延时级别调度一次后,延迟该时间间隔后再放入调度池。

  • DELAY_FOR_A_PERIOD:消息发送异常后延迟该时间后再继续参与调度。

  • ConcurrentMap delayLevelTable:延迟级别,将 “1s 5s 10s 30s 1m 2m 3m 4m 5m6m 7m 8m 9m 10m 20m 30m 1h 2h” 字符串解析成 delayLevelTable,转换后的数据结构类似 {1:1000,2:5000,3:30000,...}

  • ConcurrentMap offsetTable:延迟级别消息消费进度。

  • DefaultMessageStore defaultMessageStore:默认消息存储器。

  • int maxDelayLevel:MessageStoreConfig#messageDelayLevel 中最大消息延迟级别。

ScheduleMessageService 方法的调用顺序为构造方法→load()方法→start()方法。

load()方法

ScheduleMessageService 继承自 ConfigManager,load() 方法如代码清单 5-67 所示。

代码清单5-67 ScheduleMessageService#load
public boolean load() {
    boolean result = super.load();
    result = result && this.parseDelayLevel();
    return result;
}

该方法主要完成延迟消息消费队列消息进度的加载与 delayLevelTable 数据的构造,延迟队列消息消费进度默认存储路径为 ${ROCKET_HOME}/store/config/delayOffset.json,存储格式如图 5-22 所示。

image 2025 02 05 11 51 24 799
Figure 2. 图5-22 延迟队列消息消费进度存储格式

同时解析 MessageStoreConfig#messageDelayLevel 定义的延迟级别并转换为 Map,执行延迟级别对应的延迟时间。

start()方法

start() 方法根据延迟级别创建对应的定时任务,启动定时任务持久化存储延迟消息队列进度,如代码清单 5-68 所示。

代码清单5-68 ScheduleMessageService#start
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
    Integer level = entry.getKey();
    Long timeDelay = entry.getValue();
    Long offset = this.offsetTable.get(level);
    if (null == offset) {
        offset = 0L;
    }
    if (timeDelay != null) {
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    }
}

第一步:根据延迟队列创建定时任务。遍历延迟级别,根据延迟级别从 offsetTable 中获取消息队列的消费进度,如果不存在,则使用 0。也就是说每个延迟级别对应一个消息消费队列。然后创建定时任务,每个定时任务第一次启动时,默认延迟 1s 后执行一次定时任务,从第二次调度开始,才使用相应的延迟时间执行定时任务。延迟级别与消息消费队列的映射关系为消息队列 ID=延迟级别-1,如代码清单 5-69 所示。

代码清单5-69 ScheduleMessageService#queueId2DelayLevel
public static int queueId2DelayLevel(final int queueId) {
    return queueId + 1;
}

public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}

定时消息的第一个设计关键点是,定时消息单独一个主题:SCHEDULE_TOPIC_XXXX,该主题下的队列数量等于 MessageStoreConfig#messageDelayLevel 配置的延迟级别,其对应关系为 queueId 等于延迟级别减 1。ScheduleMessageService 为每个延迟级别创建一个定时器,根据延迟级别对应的延迟时间进行延迟调度。在消息发送时,如果消息的延迟级别 delayLevel 大于 0,将消息的原主题名称、队列 ID 存入消息属性,然后改变消息的主题、队列与延迟主题所属队列,消息将最终转发到延迟队列的消费队列中,如代码清单 5-70 所示。

代码清单5-70 ScheduleMessageService#start
this.timer.scheduleAtFixedRate(new TimerTask() {
   public void run() {
       try {
           ScheduleMessageService.this.persist();
       } catch (Throwable e) {
           log.error("scheduleAtFixedRate flush exception", e);
       }
   }
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

第二步:创建定时任务,每隔 10s 持久化一次延迟队列的消息消费进度(延迟消息调进度),持久化频率可以通过 flushDelayOffsetInterval 配置属性进行设置。

定时调度逻辑

ScheduleMessageService 的 start() 方法启动后,会为每一个延迟级别创建一个调度任务,每个延迟级别对应 SCHEDULE_TOPIC_XXXX 主题下的一个消息消费队列。定时调度任务的实现类为 DeliverDelayedMessageTimerTask,其核心实现为 executeOnTimeup,如代码清单 5-71 所示。

代码清单5-71 ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));

第一步:根据队列 ID 与延迟主题查找消息消费队列,如果未找到,说明当前不存在该延时级别的消息,则忽略本次任务,根据延时级别创建下一次调度任务,如代码清单 5-72 所示。

代码清单5-72 ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

第二步:根据 offset 从消息消费队列中获取当前队列中所有有效的消息。如果未找到,则更新延迟队列的定时拉取进度并创建定时任务,待下一次继续尝试,如代码清单 5-73 所示。

代码清单5-73 ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferCQ.getByte
    Buffer().getLong();
    int sizePy = bufferCQ.getByteBuf
    fer().getInt();
    long tagsCode = bufferCQ.getByte
    Buffer().getLong();
    long now = System.currentTimeMillis();
    long deliverTimestamp = this.cor
    rectDeliverTimestamp(now, tagsCode);
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);    // 省略部分代码
}

第三步:遍历 ConsumeQueue 文件,每一个标准 ConsumeQueue 条目为 20 个字节。解析出消息的物理偏移量、消息长度、消息标志的哈希码,为从 CommitLog 文件加载具体的消息做准备,如代码清单 5-74 所示。

代码清单5-74 ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
MessageExt msgExt = ScheduleMessageService.this
    .defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

第四步:根据消息物理偏移量与消息大小从 CommitLog 文件中查找消息。如果未找到消息,则打印错误日志,根据延迟时间创建下一个定时器,如代码清单 5-75 所示。

代码清单5-75 ScheduleMessageService$DeliverDelayedMessageTimerTask#messageTimeup
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner,MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);

第五步:根据消息属性重新构建新的消息对象,清除消息的延迟级别属性(delayLevel),恢复消息原先的消息主题与消息消费队列,消息的消费次数 reconsumeTimes 并不会丢失,如代码清单 5-76 所示。

代码清单5-76 ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);

第六步:将消息再次存入 CommitLog 文件,并转发到主题对应的消息队列上,供消费者再次消费。

第七步:更新延迟队列的拉取进度。定时消息的第二个设计关键点是消息存储时,如果消息的延迟级别属性 delayLevel 大于 0,则会备份原主题、原队列到消息属性中,其键分别为 PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID,通过为不同的延迟级别创建不同的调度任务,到达延迟时间后执行调度任务。调度任务主要是根据延迟拉取消息消费进度从延迟队列中拉取消息,然后从 CommitLog 文件中加载完整消息,清除延迟级别属性并恢复原先的主题、队列,再次创建一条新的消息存入 CommitLog 文件并转发到消息消费队列中供消息消费者消费。

以上就是定时消息的实现原理,整个流程如图 5-23 所示。

image 2025 02 05 12 27 30 306
Figure 3. 图5-23 定时消息实现流程图
  1. 消息消费者发送消息,如果发送消息的 delayLevel 大于 0,则将消息主题变更为 SCHEDULE_TOPIC_XXXX,消息队列为 delayLevel 减 1。

  2. 消息经由 CommitLog 文件转发到消息消费队列 SCHEDULE_TOPIC_XXXX 中。

  3. 定时任务 Time 每隔 1s 根据上次拉取偏移量从消费队列中取出所有消息。

  4. 根据消息的物理偏移量与消息大小从 CommitLog 文件中拉取消息。

  5. 根据消息属性重新创建消息,恢复原主题 topicA、原队列 ID,清除 delayLevel 属性,并存入 CommitLog 文件。

  6. 将消息转发到原主题 topicA 的消息消费队列,供消息消费者消费。