监控数据采样机制
10.2节已详细介绍了采集原始数据的内容,那么如何实现TPS的计算呢?奥妙就包含在StatsItemSet中。在构造init()方法的时候定义了6个定时任务,用来对监控数据进行采样计算,接下来详细介绍采样计算实现的原理。
监控数据采样
从StatsItemSet的init()方法得知,RocketMQ实现了按分钟级、小时级、天级别的数据采样。分钟级采样由samplingInSeconds()方法实现,该方法每隔10s执行一次,具体实现如代码清单10-4所示。
private void samplingInSeconds() {
Iterator < Entry < String, StatsItem >> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
next.getValue().samplingInSeconds();
}
}
该方法依次遍历指标集中的所有指标StatsItem,并调用samplingInSeconds()方法,如代码清单10-5所示。
public void samplingInSeconds() {
synchronized (this.csListMinute) {
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(
), this.times.get(), this.value.get()));
if (this.csListMinute.size() > 7) {
this.csListMinute.removeFirst();
}
}
}
首先根据当前的时间戳、变更次数、调用次数创建一个快照,将其存入csListMinute变量,如果该容器中的元素超过 7 个,则将其头部元素移除,即确保csListMinute最多存储7个元素,这样做有什么好处呢?其实现原理如图10-3所示。

图10-3是在分钟级采样容器中存储最近1min的采样数据,每隔10s采集1次快照,在计算TPS等统计指标时,只须用两个快照之差除以两个快照之间的时间。
samplingInMinutes与samplingInHour的采样方式与samplingInSeconds方法的实现一样,就不重复讲解了。值得注意的是,samplingInMinutes每10min采集1次快照,而samplingInHour每1h采集1次,内部维护的LinkedList<CallSnapshot> csListDay保存的元素个数为25,即最后一个元素与第一个元素相隔时间为1天。 |
根据采样计算统计指标
在RocketMQ中由StatsItem类的printAtMinutes方法实现秒级别的统计信息,具体实现如代码清单10-6所示。
public void printAtMinutes() {
StatsSnapshot ss = computeStatsData(this.csListMinute);
log.info(String.format("[%s] [%s]Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f ",this.statsName, this.statsKey, ss.getSum(), ss.getTps(), ss.getAvgpt()));
}
该方法每分钟执行1次,将计算出来的监控指标以日志文件的形式输出在RocketMQ的日志文件中,其路径默认为 ${user.home}/logs/rocketmqlogs/stats.log
。下面重点分析其指标计算的逻辑,实现逻辑如代码清单10-7所示。
private static StatsSnapshot computeStatsData(
final LinkedList<CallSnapshot> csList) {
StatsSnapshot statsSnapshot = new StatsSnapshot();
synchronized (csList) {
double tps = 0;
double avgpt = 0;
long sum = 0;
if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst();
CallSnapshot last = csList.getLast();
sum = last.getValue() - first.getValue();
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
long timesDiff = last.getTimes() - first.getTimes();
if (timesDiff > 0) {
avgpt = (sum * 1.0d) / timesDiff;
}
}
statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt);
}
return statsSnapshot;
}
该方法接受一个采样容器,最终的目标是计算TPS、总数与平均变更次数。
1)sum的计算逻辑是将两个采样快照的快照值相减,表示这段时间发生的采样次数。 2)tps的计算逻辑是用sum的值除以变化的时间,单位换算成秒。 3)avgpt表示单位时间内从一个快照值到另外一个快照值发生变化的速率。
为了便于理解,举例说明如下。 在1min内,消息发送者使用了RocketMQ的批量消息,一次发送10条,共调用批量接口发送了6次消息,那上面各个值是怎么计算的呢?
1)sum:60。 2)tps:1tps,用sum除以时间即可。 3)avgpt:用sum除以count,最终计算结果为10,表示一次调用改变的平均数值。