消息轨迹实现原理
寻找消息轨迹入口
因为开启消息轨迹的关键是通过构建DefaultMQProducer或DefaultMQPushConsumer时,在其构造函数中指定enableMsgTrace参数,所以我们将以DefaultMQProducer的构造函数为入口探究其实现细节,如代码清单8-3所示。
public DefaultMQProducer( final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic){
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {// 忽略打印日志
}
}
}
从构造函数可知与消息轨迹相关的关键信息如下。
-
SendMessageTraceHookImpl:消息发送用于跟踪消息轨迹的钩子函数,与此对应的消息发送用于消息轨迹跟踪的钩子函数实现类为 ConsumeMessageTraceHookImpl。
-
AsyncTraceDispatcher:消息轨迹异步转发器。在详细介绍上述两个关键类之前,先来介绍一下其类层次结构图,如图8-4所示。

下面逐一介绍上述核心类及核心属性。
1)SendMessageHook:消息发送钩子函数,用于在消息发送之前、发送之后执行一定的业务逻辑,是记录消息轨迹的最佳扩展点。
2)TraceDispatcher:消息轨迹转发处理器,其默认实现类为AsyncTraceDispatcher,异步实现消息轨迹数据的发送。下面对其属性进行简单的介绍。
int queueSize:异步转发队列长度,默认为2048,当前版本不能修改。 int batchSize:批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。 int maxMsgSize:消息轨迹一次发送的最大消息大小,默认为128KB,当前版本不能修改。 DefaultMQProducer traceProducer:用来发送消息轨迹的消息发送者。 ThreadPoolExecutor traceExecuter:线程池,用来异步执行消息发送。 AtomicLong discardCount:记录丢弃的消息个数。 Thread worker:工作线程,主要负责从追加队列中获取一批待发送的消息轨迹数据,将其提交到线程池中执行。 ArrayBlockingQueue<TraceContext> traceContextQueue:消息轨迹TraceContext队列,用来存放待发送到服务端的消息。 ArrayBlockingQueue<Runnable> appenderQueue:线程池内部队列,默认长度为1024。 DefaultMQPushConsumerImpl hostConsumer:消费者信息,记录消息消费时的轨迹信息。 String traceTopicName:用于跟踪消息轨迹的topic名称。
消息发送轨迹数据
消息发送轨迹的数据是由SendMessageTraceHookImpl钩子函数实现的,本文将主要介绍sendMessageBefore()和sendMessageAfter()方法的实现细节。
sendMessageBefore()方法详解
sendMessageBefore() 方法定义如代码清单 8-4 所示。
public void sendMessageBefore(SendMessageContext context) {
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProduce rGroup()));
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
sendMessageBefore()方法是在发送消息之前被调用的,在消息发送之前先收集消息的topic、tag、key、存储Broker的IP地址、消息体的长度等基础信息,并将消息轨迹数据先存储在调用上下文中。
sendMessageAfter()方法详解
sendMessageAfter() 方法是在客户端收到服务端消息发送响应请求后被调用的,如代码清单8-5所示。
if (context == null || context.getMessage().getTopic().startsWith((
(AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) {
return;
}
第一步:如果调用的时候上下文环境为空,那么发送消息的topic和消息轨迹存储的topic,如果服务端未开启消息轨迹跟踪配置,则直接返回,即不记录消息轨迹数据,如代码清单8-6所示。
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
第二步:从MqTraceContext中获取跟踪的TraceBean,虽然设计成List结构体,但在消息发送场景,这里的数据永远只有一条,即使是批量发送也不例外。然后设置costTime(消息发送耗时)、success(是否发送成功)、regionId(发送到Broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime。注意这个存储时间并没有取消息的实际存储时间,而是取一个估算值,即客户端发送时间一半的耗时来表示消息的存储时间,如代码清单8-7所示。
localDispatcher.append(tuxeContext);
第三步:使用AsyncTraceDispatcher异步将消息轨迹数据发送到消息服务器(Broker)上。
消息轨迹异步转发实现机制
通过SendMessageTraceHookImpl收集每次消息发送的轨迹数据,然后异步转发到消息服务器,尽最大可能减少消息发送的性能损耗,异步消息发送的实现类为TraceDispatcher。
TraceDispatcher构造函数
AsyncTraceDispatcher 构造函数如代码清单8-8所示。
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(10, 20, 1000 * 60, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
初始化AsyncTraceDispatcher的核心属性如下。注意,目前这些属性无法改变。
1)queueSize:队列长度,默认为2048,表示异步线程池能够积压的消息轨迹数量。 2)batchSize:一次向Broker批量发送的消息条数,默认为100。 3)maxMsgSize:向Broker汇报消息轨迹时,消息体的大小不能超过该值,默认为128KB。 4)discardCount:整个运行过程中丢弃的消息轨迹数据,这里要说明一点,如果消息TPS发送过大,异步转发线程处理不过来就会主动丢弃消息轨迹数据。 5)traceContextQueue:traceContext积压队列,客户端(消息发送者、消息消费者)在收到处理结果后,将消息轨迹提交到这个队列中并立即返回。 6)appenderQueue:提交到Broker线程池中的队列。 7)traceTopicName:用于接收消息轨迹的topic,默认为RMQ_SYS_TRANS_HALF_TOPIC。 8)traceExecuter:用于发送到Broker服务的异步线程池,核心线程数默认为10,最大线程池为20,队列堆积长度为2048,线程名称为MQTraceSendThread_。 9)traceProducer:发送消息轨迹的Producer,通过getAndCreateTraceProducer()方法创建,其所属的消息发送者组名为_INNER_TRACE_PRODUCER。在实践中可以通过该组名查看启用了消息轨迹的客户端信息。
启动异步处理任务
通过调用AsyncTraceDispatcher的start()方法来启动后台异步处理线程,在构建用于发送消息轨迹的发送者后被调用,启动异步处理消息轨迹数据的发送,如代码清单8-9所示。
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
首先使用CAS机制避免start()方法重复执行,然后启动一个后台线程,其执行逻辑被封装在AsyncRunnable中,我们接着看AsyncRunnable的实现细节,学习一种后台异步任务的编程技巧。
转发消息轨迹数据
RocketMQ通过AsyncRunnable实现一个异步任务,从待发送队列中不断获取消息轨迹的数据,并将其异步发送到消息服务器,如代码清单8-10所示。
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
先介绍一种编程技巧,一个线程要“孜孜不倦”地处理任务,通常是在run()方法中加上一个while(!stopped)结构,然后从一个阻塞队列中不断获取任务。 AsyncRunnable为了提高消息的发送效率引入批量机制,即一次从队列中获取一批消息,然后封装成AsyncAppenderRequest任务并提交到线程池中异步执行,即真正的发送消息轨迹数据的逻辑被封装在AsyncAppenderRequest的run()方法中。
发送消息轨迹数据
通过上面的转发线程将消息轨迹数据批量提交到线程池中异步执行,具体发送消息轨迹由AsyncAppenderRequest的run()方法执行,该方法中直接调用sendTraceData()方法,如代码清单8-11所示。
public void sendTraceData(List<TraceContext> contextList) {}
其参数是一个消息轨迹批次列表,如代码清单8-12所示。
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
第一步:将本批消息按照原始消息的topic组装成Map<String, List<TraceTransferBean>>,如代码清单8-13所示,其实现关键点如下。
-
如果待发送的消息轨迹数据列表为空,则直接返回,表明没有待发送的消息。
-
临时将原主题的名称与所属regionId存入key属性。
-
按照消息轨迹的存储协议对消息轨迹内容进行编码,当前版本使用的是字符串追加模式,实现比较简单,对扩展不太友好。
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
}
第二步:按照topic分批调用flushData()方法将消息发送到Broker中,完成消息轨迹数据的存储。