消息轨迹的引入目的和使用方法

不知道大家在项目中有没有遇到发送方与消费方相互 “扯皮” 的情况,发送方说消息已经发送成功,而消费方说没有消费到,发送方与消费方各执一词,谁也无法说服谁,遇到这种情况该怎么办呢?

这个时候,我们迫切希望能记录一条消息的流转轨迹,即消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?这就是本章要重点探讨的内容:消息轨迹。

下面详细介绍如何使用消息轨迹。

第一步:首先在Broker服务端配置traceTopicEnable,并将其值设置为true,该值默认为false,修改该值需要重启Broker服务端。

第二步:使用如下方法构建消息发送者。

public DefaultMQProducer(String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(String producerGroup, boolean enableMsgTrace, String traceTopic)

参数说明如下。

  • String producerGroup:消息生产组名称。

  • boolean enableMsgTrace:是否启用跟踪消息轨迹。

  • String traceTopic:用于记录消息轨迹的 topic,默认为 RMQ_SYS_TRACE_TOPIC。

第三步:使用如下方法构建消息消费者。

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
public DefaultMQPushConsumer(String croup, boolean enableMsgTrace, String traceTopic)

参数说明如下。

  • String consumerGroup:消息消费组名称。

  • boolean enableMsgTrace:是否启用消息轨迹。

  • String traceTopic:用于记录消息轨迹的 topic。

下面用一个简单的示例来展示消息轨迹的使用。生产者发送示例如代码清单8-1所示。

代码清单8-1 生产者发送示例代码
package org.apache.rocketmq.example.tracemessage;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("pGroup", true);
        producer.setNamesrvAddr("192.168.0.166:9876; 192.168.0.168:9876; ");
        producer.start();
        try {
            Message msg = new Message(" TopicTest ", null, "OrderID06", "OrderID06".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

通常,如果启用了消息轨迹,在消息发送的时候尽量为消息指定 Key 属性,这样便于对消息进行高性能的查询。启用消息轨迹进行消息发送后,我们可以在消息轨迹菜单对消息进行查询,如图 8-1 所示。

image 2025 02 06 12 13 56 847
Figure 1. 图8-1 消息轨迹查询界面

当前版本的消息轨迹只记录了消息的存储时间、存储服务器IP、发送耗时与跟踪类型,Pub表示消息已成功发送,暂未被消费。消费者开启消息轨迹示例如代码清单 8-2 所示。

代码清单8-2 消费者消息轨迹示例代码
package org.apache.rocketmq.example.tracemessage;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class TracePushConsumer {
    public static void main(String[] args) throws InterruptedException, MQCl

    ientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
        consumer.setNamesrvAddr("192.168.0.166:9876; 192.168 .0 .168:9876;");
        consumer.subscribe(" TopicTest ", " * ");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
                                                                    msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s % n ",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

当消息被成功消费后,其记录的消息轨迹如图8-2所示。

image 2025 02 06 12 17 21 305
Figure 2. 图8-2 消息轨迹查询界面