消息轨迹设计原理

RocketMQ 消息轨迹主要用于跟踪消息发送、消息消费的轨迹,详细记录消息各个处理环节的日志,从设计上至少需要解决如下 3 个核心问题。

  • 消息轨迹数据格式。

  • 采集轨迹数据。

  • 存储消息轨迹数据。

消息轨迹数据格式

RocketMQ 4.6 版的消息轨迹数据主要包含如下信息。

  • traceType:跟踪类型,可选值为Pub(消息发送)、SubBefore(消息拉取到客户端,在执行业务定义的消费逻辑之前)、SubAfter(消费后)。

  • timeStamp:当前时间戳。

  • regionId:Broker所在的区域ID,取自BrokerConfig#regionId。

  • groupName:组名称,traceType为Pub时表示生产者组的名称,traceType为subBefore或subAfter时表示消费组名称。

  • requestId:在traceType为subBefore、subAfter时使用,消费端的请求ID。

  • topic:消息主题。

  • msgId:消息唯一ID。

  • tags:消息标志。

  • keys:消息索引key,根据该key可快速检索消息。

  • storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP,跟踪类型为subBefore、subAfter时存储消费者IP。

  • bodyLength:消息体的长度。

  • costTime:耗时。

  • msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)。

  • offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量。

  • success:发送成功。

  • contextCode:消费状态码,可选值为SUCCESS、TIME_OUT、EXCEPTION、RETURN-NULL、FAILED。

如何采集轨迹数据

消息中间件的两大核心主题是消息发送和消息消费,核心载体是消息。消息轨迹(消息的流转)主要是记录消息何时发送到哪台Broker、发送耗时是多少、在什么时候被哪个消费者消费等信息。

要记录消息发送的相关信息,最方便的时机就是在消息发送前后将本次调用的信息进行采集。同样,消息消费数据的采集也是在消费处理逻辑的前后进行的。相信各位读者会马上想到RocketMQ的钩子机制,RocketMQ提供了两个接口分别表示消息发送、消费消费的钩子函数,如图8-3所示。

image 2025 02 06 12 19 59 232
Figure 1. 图8-3 RocketMQ RPC Hook

通过实行图8-3所示的两个接口,可以实现在消息发送、消息消费前后记录消息轨迹,为了不明显增加消息发送与消息消费的时延,最好使用异步发送模式记录消息轨迹。

如何存储消息轨迹数据

消息轨迹需要存储什么内容以及如何采集消息轨迹都已解决,接下来就要思考消息轨迹数据存储在哪里?如果存储在数据库或其他存储媒介中,会加重消息中间件的负担,使其依赖外部组件,最佳的选择还是存储在Broker服务器中,将消息轨迹数据当作一条消息。 既然把消息轨迹当作消息存储在Broker服务器中,那么存储消息轨迹的topic如何确定呢?RocketMQ提供了两种方法来定义消息轨迹的topic。

  • 系统默认topic:如果Broker的traceTopicEnable配置项设为true,表示在该Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1,默认该值为false。

  • 自定义topic:在创建消息生产者或消息消费者时,可以通过参数自定义用于记录消息轨迹的topic名称,不过要注意的是,RokcetMQ控制台(rocketmq-console)中只支持配置一个消息轨迹topic,故自定义topic在目前这个阶段或许还不是一个最佳实践,建议使用系统默认的topic。

通常为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议在Broker集群中新增一台机器,只在这台机器上开启消息轨迹跟踪,这样该集群内的消息轨迹数据只会发送到这一台Broker服务器上,并不会增加集群内原先业务Broker的负载压力。