消息轨迹的引入目的和使用方法
不知道大家在项目中有没有遇到发送方与消费方相互 “扯皮” 的情况,发送方说消息已经发送成功,而消费方说没有消费到,发送方与消费方各执一词,谁也无法说服谁,遇到这种情况该怎么办呢?
这个时候,我们迫切希望能记录一条消息的流转轨迹,即消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?这就是本章要重点探讨的内容:消息轨迹。
下面详细介绍如何使用消息轨迹。
第一步:首先在Broker服务端配置traceTopicEnable,并将其值设置为true,该值默认为false,修改该值需要重启Broker服务端。
第二步:使用如下方法构建消息发送者。
public DefaultMQProducer(String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(String producerGroup, boolean enableMsgTrace, String traceTopic)
参数说明如下。
-
String producerGroup:消息生产组名称。
-
boolean enableMsgTrace:是否启用跟踪消息轨迹。
-
String traceTopic:用于记录消息轨迹的 topic,默认为 RMQ_SYS_TRACE_TOPIC。
第三步:使用如下方法构建消息消费者。
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
public DefaultMQPushConsumer(String croup, boolean enableMsgTrace, String traceTopic)
参数说明如下。
-
String consumerGroup:消息消费组名称。
-
boolean enableMsgTrace:是否启用消息轨迹。
-
String traceTopic:用于记录消息轨迹的 topic。
下面用一个简单的示例来展示消息轨迹的使用。生产者发送示例如代码清单8-1所示。
package org.apache.rocketmq.example.tracemessage;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TraceProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("pGroup", true);
producer.setNamesrvAddr("192.168.0.166:9876; 192.168.0.168:9876; ");
producer.start();
try {
Message msg = new Message(" TopicTest ", null, "OrderID06", "OrderID06".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
通常,如果启用了消息轨迹,在消息发送的时候尽量为消息指定 Key 属性,这样便于对消息进行高性能的查询。启用消息轨迹进行消息发送后,我们可以在消息轨迹菜单对消息进行查询,如图 8-1 所示。

当前版本的消息轨迹只记录了消息的存储时间、存储服务器IP、发送耗时与跟踪类型,Pub表示消息已成功发送,暂未被消费。消费者开启消息轨迹示例如代码清单 8-2 所示。
package org.apache.rocketmq.example.tracemessage;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException, MQCl
ientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
consumer.setNamesrvAddr("192.168.0.166:9876; 192.168 .0 .168:9876;");
consumer.subscribe(" TopicTest ", " * ");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s % n ",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
当消息被成功消费后,其记录的消息轨迹如图8-2所示。
