消息过滤机制

RocketMQ 支持表达式过滤与类过滤两种消息过滤机制。表达式模式分为 TAG 与 SQL92 模式,SQL92 模式以消息属性过滤上下文,实现 SQL 条件过滤表达式,而 TAG 模式就是简单为消息定义标签,根据消息属性 tag 进行匹配。消息过滤 API 如图 5-24 所示。

image 2025 02 05 12 34 36 738
Figure 1. 图5-24 MessageFilter类图

下面逐一分析 MessageFilter 的核心接口。

  1. boolean isMatchedByConsumeQueue(Long tagsCode,CqExtUnit cqExtUnit):根据 ConsumeQueue 判断消息是否匹配。

    • Long tagsCode:消息标志的哈希码。

    • CqExtUnit:ConsumeQueue 条目扩展属性。

  2. boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, final Mapproperties):根据存储在 CommitLog 文件中的内容判断消息是否匹配。

    • ByteBuffer msgBuffer:消息内容,如果为空,该方法返回true。

    • Map properties:消息属性,主要用于SQL92过滤模式。

本节重点探讨 RocketMQ 基于表达式的消息过滤机制。RocketMQ 消息过滤方式不同于其他消息中间件,是在订阅时进行过滤,从第 4 章的介绍中我们知道 ConsumeQueue 的存储格式如图 5-25 所示。

image 2025 02 05 12 38 12 822
Figure 2. 图5-25 ConsumeQueue存储格式

消息发送者在消息发送时如果设置了消息的标志属性,便会存储在消息属性中,将其从 CommitLog 文件转发到消息消费队列中,消息消费队列会用 8 个字节存储消息标志的哈希码。之所以不直接存储字符串,是因为将 ConsumeQueue 设计为定长结构,以加快消息消费的加载性能。在 Broker 端拉取消息时,遍历 ConsumeQueue,只对比消息标志的哈希码,如果匹配则返回,否则忽略该消息。消费端在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息标志的值而不是哈希码。

接下来从源码角度探究 RocketMQ 是如何实现消息过滤的,先看代码清单 5-77 所示的代码。

代码清单5-77 DefaultMQPushConsumerImpl#subscribe
public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e)
                ;
    }
}

第一步:消费者订阅消息主题与消息过滤表达式。构建订阅信息并加入 RebalanceImpl,以便 RebalanceImpl 进行消息队列负载,订阅过滤数据类图如图 5-26 所示。

image 2025 02 05 12 41 06 365
Figure 3. 图5-26 订阅过滤数据类图

如代码清单 5-78 所示,下面逐一介绍 SubscriptionData 的核心属性。

  1. String SUB_ALL:过滤模式,默认全匹配。

  2. boolean classFilterMode:是否是类过滤模式,默认为 false。

  3. String topic:消息主题名称。

  4. String subString:消息过滤表达式,多个用双竖线隔开,例如 “TAGA || TAGB”。

  5. Set tagsSet:消息过滤标志集合,是消费端过滤时进行消息过滤的依据。

  6. Set codeSet:消息过滤标志哈希码集合。

  7. String expressionType:过滤类型,TAG 或 SQL92。

代码清单5-78 DefaultMQPushConsumerImpl#pullMessage
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
    if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()){
        subExpression = sd.getSubString();
    }
    classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commitOffset
        true, // suspend
        subExpression != null, // subscription
        classFilter // class filter
);

第二步:根据订阅消息属性构建消息属性拉取标记,设置 subExpression、classFilter 等与消息过滤相关参数,如代码清单 5-79 所示。

代码清单5-79 PullMessageProcessor#processRequest
subscriptionData = FilterAPI.build(
        requestHeader.getTopic(),
        requestHeader.getSubscription(),
        requestHeader.getExpressionType());
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(),
            requestHeader.getConsumerGroup(),
            requestHeader.getSubscription(),
            requestHeader.getExpressionType(),
            requestHeader.getSubVersion());
    assert consumerFilterData != null;
}

第三步:根据主题、消息过滤表达式构建订阅消息实体,如代码清单 5-80。如果不是 TAG 模式,构建过滤数据 ConsumeFilterData。

代码清单5-80 PullMessageProcessor#processRequest
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()){
    messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
} else{
    messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
}

第四步:构建消息过滤对象,ExpressionForRetryMessageFilter 支持对重试主题的过滤,ExpressionMessageFilter 表示不支持对重试主题的属性进行过滤,也就是如果是 TAG 模式,执行 isMatchedByCommitLog 方法将直接返回 true,如代码清单 5-81 所示。

代码清单5-81 DefaultMessageStore#getMessage
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(tagsCode,
        extRet ? cqExtUnit : null)) {
    if (getResult.getBufferTotalSize() == 0) {
        status = GetMessageStatus.NO_MATCHED_MESSAGE;
    }
    continue;
}

第五步:根据偏移量拉取消息后,首先根据 ConsumeQueue 条目进行消息过滤,如果不匹配则直接跳过该条消息,继续拉取下一条消息,如代码清单 5-82 所示。

代码清单5-82 DefaultMessageStore#getMessage
if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
    if (getResult.getBufferTotalSize() == 0) {
        status = GetMessageStatus.NO_MATCHED_MESSAGE;
    }
    selectResult.release();
    continue;
}

第六步:如果消息根据 ConsumeQueue 条目进行过滤,则需要从 CommitLog 文件中加载整个消息体,然后根据属性进行过滤。当然如果过滤方式是 TAG 模式,该方法默认返回 true,下文会对该方法进行详细讲解。

至此,在消费拉取服务端的消息过滤流程就基本结束了,RocketMQ 会在消息接收端再次进行消息过滤。在讲解消费端消息过滤之前,先以 ExpressionMessageFilter 为例分析一下消息过滤的实现细节,如代码清单 5-83 所示。

代码清单5-83 ExpressionMessageFilter#isMatchedByConsumeQueue
if (null == subscriptionData) {
    return true;
}
if (subscriptionData.isClassFilterMode()){
    return true;
}
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    if (tagsCode == null || tagsCode < 0L){
        return true;
    }
    if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)){
        return true;
    }
    return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

如果订阅消息为空,则返回 true,不过滤消息。如果是类过滤模式,则返回 true,如果是 TAG 模式,并且消息的 tagsCode 参数为空或小于 0,则返回 true,说明消息在发送时没有设置 tag。如果订阅消息的 TAG hashcodes 集合中包含消息的 tagsCode,则返回 true。基于 TAG 模式根据 ConsumeQueue 进行消息过滤时只对比 tag 的哈希码,所以还需要在消息消费端对消息标志进行精确匹配,如代码清单 5-84 所示。

代码清单5-84 ExpressionMessageFilter#isMatchedByCommitLog
if (subscriptionData == null) {
    return true;
}
if (subscriptionData.isClassFilterMode()){
    return true;
}
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    return true;
}

如代码清单 5-85 所示,如果订阅信息为空,则返回 true。如果是类过滤模式,则返回 true,如果是 TAG 模式,则返回 true。该方法主要是为 SQL92 表达式模式服务的,根据消息属性实现类似于数据库 SQL where 条件的过滤方式。本节不针对 SQL92 表达式模式消息过滤做详细讲解,在第 9 章会给出 SQL92 过滤实例。

代码清单5-85 PullAPIWrapper#processPullResult
if (PullStatus.FOUND == pullResult.getPullStatus()) {
    ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
    List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
    List<MessageExt> msgListFilterAgain = msgList;
    if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
        msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
        for (MessageExt msg : msgList) {
            if (msg.getTags() != null) {
                if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                    msgListFilterAgain.add(msg);
                }
            }
        }
    }
}

消息拉取线程 PullMessageService 默认会使用异步方式从服务器拉取消息,消息消费端会通过 PullAPIWrapper 从响应结果解析拉取到的消息。如果消息过滤模式为 TAG,并且订阅 TAG 集合不为空,则对消息的标志进行判断,如果集合中包含消息的 TAG,则返回给消费者消费,否则跳过。