实现原理
本节详细介绍 RocketMQ 监控的实现原理。
监控相关类图
RocketMQ 监控相关的类体系如图10-2所示。

下面逐一介绍图10-2中涉及的核心类。
1)BrokerStatsManager:RocketMQ Broker端的监控数据采集实现类,其内部主要的数据结构如下。
InternalLogger log:RocketMQ服务端数据监控统计相关的日志文件为stats.log。 HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>():服务端监控数据采集核心数据结构,用来存储Broker端的统计数据。statsTable的key为统计指标,即统计维度,例如10.1节提到的TOPIC_PUT_NUMS、TOPIC_PUT_SIZE等。其Value值为StatsItemSet,即数据采集项的数据集合。以TOPIC_PUT_NUMS为例,StatsItemSet中需要按照topic进行数据采集,即存储各个topic的统计数据。 String clusterName:集群名称。
2)StatsItemSet:一类统计指标集合,其内部主要维护的数据结构为ConcurrentMap<String/* key */, StatsItem>statsItemTable。以指标TOPIC_PUT_NUMS对应的StatsItemSet为例,StatsItemSet存储的是主题写入数量(消息发送数量),内部维护的statsItemTable的key为主题的名称,StatsItem为该主题对应的统计信息。
3)StatItem:具体统计数据的载体,其主要属性如下。
AtomicLong value:当前统计的数量值。 AtomicLong times:当前value值变化的次数。 LinkedList<CallSnapshot> csListMinute:近1min内的调用快照信息,每10s采集一次,并且超过6个则淘汰最早入队的原生快照信息,故其长度不会超过6。 LinkedList<CallSnapshot> csListHour:近1h内的调用快照信息,每10min采集一次,同样不会超过6个元素。 LinkedList<CallSnapshot> csListDay:近一天的调用快照信息,每1h采集一次,该队列长度不会超过24,超过则会丢弃最早入队的。 String statsName:统计项的名称,与StatsItemSet中的statsName相同。 String statsKey:统计项Key。如果statsName统计各topic的写入数量,则statsKey为每一个具体的topic名称。
4)StatsSnapshot:统计快照,主要包含如下3个字段。 long timestamp:快照生成时间戳。 long times:快照生成时value值发生变化的次数。 long value:快照生成时当前的统计数量。
监控原始数据采集流程
本节介绍消息发送相关流程,从源码的角度分析采集数据的实现原理。消息发送在Broker端的处理逻辑主要由SendMessageProcessor实现,如代码清单10-1所示。
if (sendOK) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1
);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult
().getMsgNum());
// 省略部分代码
}
java
从代码片段可以看出,消息发送成功会调用BrokerStatsManager的incTopicPutNums()方法,增加topic的写入个数,即统计调用信息,如代码清单10-2所示。
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
}
java
在解读这个方法之前,先介绍一下各个参数的含义。
1)String topic:主题的名称。 2)int num:本次写入的消息条数。 3)int times:默认传入1,表示消息incTopicPutNums被调用的次数,也体现了消息写入数量发生变化的次数。
incTopicPutNams()方法的实现比较简单,从HashMap<String, StatsItemSet> statsTable表中按照TOPIC_PUT_NUMS获取消息写入统计的StatsItemSet对象,然后调用其addValue()方法。
接下来继续探究StatsItemSet的addValue()方法,如代码清单10-3所示。
public void addValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue);
statsItem.getTimes().addAndGet(incTimes);
}
java
该方法的实现比较简单:根据统计key,例如topic的名称,从ConcurrentMap<String/* key*/, StatsItem> statsItemTable中获取该topic的统计信息,即在RocketMQ中,topic的写入总数用StatsItem表示,如果statsItemTable中未包含该topic对应的StatsItem,则创建一个新的对象,然后通过原子的方式新增Value与Times这两个属性的值。
经过上面的步骤,就完成了topic写入数量的收集,接下来介绍如何计算TPS等统计信息。