消息过滤
消息过滤包括基于表达式和基于类两种过滤模式,其中表达式又分为TAG和SQL92模式,接下来分别介绍三种过滤模式的使用方法。
TAG过滤模式
我们先来看TAG模式消息过滤的示例,如代码清单11-3所示。
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
Message msg = new Message("TopicFilter7", "TOPICA_TAG_ALL"," OrderID001" , " Helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET)); System.out.printf("%s%n", producer.send(msg));
} else {
Message msg = new Message("TopicFilter7", "TOPICA_TAG_ORD", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.printf("%s%n", producer.send(msg));
}
}
第一步:在消息发送时,我们可以为每一条消息设置一个TAG标签,消息消费者订阅自己感兴趣的TAG。在一般使用场景下,对于同一类功能(数据同步)创建一个主题,该主题下不同的系统关心的数据可能不一样,各个系统的基础数据都需要同步,因此设置标签为TOPICA_TAG_ALL。而订单数据只有订单下游的子系统关心,其他系统并不关心,因此设置标签为TOPICA_TAG_ORD。库存子系统关注库存相关的数据,设置标签为TOPICA_TAG_CAPCITY,如代码清单11-4所示。
// 订单系统消费组
DefaultMQPushConsumer orderConsumer = new DefaultMQPushConsumer("Order_Data_Syn");
orderConsumer.subscribe("TopicFilter7", "TOPICA_TAG_ALL | TOPICA_TAG_ORD");// 库存子系统消费组
DefaultMQPushConsumer kuCunConsumer = newDefaultMQPushConsumer("Order_Data_Syn");
kuCunConsumer.subscribe("TopicFilter7", "TOPICA_TAG_ALL | TOPICA_TAG_CAPCITY");
第二步:消费者订阅相同主题不同的TAG,多个TAG用|分隔。注意,同一个消费者订阅的主题的TAG必须相同。
SQL过滤模式
SQL语句是开发者比较熟悉的,RocketMQ采用SQL表达式是为了降低开发者的学习成本,方便开发者直接上手使用,如代码清单11-5所示。
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus", "1");
msg.putUserProperty("sellerId", "21");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
第一步:基于SQL表达式进行消息过滤,其实是对消息的属性运用SQL过滤表达式进行条件匹配,因此消息发送时应该调用putUserProperty方法设置消息属性,如代码清单11-6所示。
consumer.subscribe("TopicTest", MessageSelector.bySql("(orderStatus is not null and orderStatus > 0 )"));
第二步:订阅模式为一条SQL条件过滤表达式,上下文环境为消息的属性。
类过滤模式
自定义消息过滤类的实现接口为 org.apache.rocketmq.common.filter.MessageFilter,如代码清单11-7所示。
package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 10) == 0) && (id > 100)) {
return true;
}
}
return false;
}
}
第一步:实现自定义消息过滤器,实现org.apache.rocketmq.common.filter.MessageFilter,MessageExt实例中封装了整体消息的所有信息,如代码清单11-8所示。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
String filterCode = MixAll.file2String(classFile);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl", filterCode);
第二步:消息消费者订阅主题,并上传自定义订阅类源码。 使用类过滤模式的前提是启动FilterServer。下面给出Eclipse调试FilterServer的方法,与在Linux环境中部署FilterServer的原理相同。
第一步:从distribution模块中将logback_filtersrv.xml复制到${ROCKETMQ_HOME}/conf 下,并新增filtersrv.properties文件,内容如代码清单11-9所示。
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
connectWhichBroker=127.0.0.1:10911
第二步:展开filterSrv模块,右键选中FiltersrvStartup.java并移动至Debug As,然后选中Debug Configurations,切换到Arguments选项卡,之后增加-c配置选项,指定FilterServer配置文件,如图11-1所示。更多FilterServer配置文件属性请参考附录。

第三步:切换到Environment选项卡,配置FilterServer运行主目录,如图11-2所示。

第四步:启动FiltersrvStartup。如果控制台输出如代码清单11-10所示日志消息,表示启动成功。如果启动不成功,可以到logback_broker.xml配置的日志文件中查看错误日志。
load config properties file OK, d:/rocketmq/conf/filtersrv.properties
The Filter Server boot success, 192.168.1.3:62832