监控数据采样机制

10.2节已详细介绍了采集原始数据的内容,那么如何实现TPS的计算呢?奥妙就包含在StatsItemSet中。在构造init()方法的时候定义了6个定时任务,用来对监控数据进行采样计算,接下来详细介绍采样计算实现的原理。

监控数据采样

从StatsItemSet的init()方法得知,RocketMQ实现了按分钟级、小时级、天级别的数据采样。分钟级采样由samplingInSeconds()方法实现,该方法每隔10s执行一次,具体实现如代码清单10-4所示。

代码清单10-4 StatsItemSet#samplingInSeconds
private void samplingInSeconds() {
    Iterator < Entry < String, StatsItem >> it = this.statsItemTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, StatsItem> next = it.next();
        next.getValue().samplingInSeconds();
    }
}

该方法依次遍历指标集中的所有指标StatsItem,并调用samplingInSeconds()方法,如代码清单10-5所示。

代码清单10-5 StatsItemSet#samplingInSeconds
public void samplingInSeconds() {
    synchronized (this.csListMinute) {
        this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(
        ), this.times.get(), this.value.get()));
        if (this.csListMinute.size() > 7) {
            this.csListMinute.removeFirst();
        }
    }
}

首先根据当前的时间戳、变更次数、调用次数创建一个快照,将其存入csListMinute变量,如果该容器中的元素超过 7 个,则将其头部元素移除,即确保csListMinute最多存储7个元素,这样做有什么好处呢?其实现原理如图10-3所示。

image 2025 02 06 20 51 35 476
Figure 1. 图10-3 分钟级采样设计原理图

图10-3是在分钟级采样容器中存储最近1min的采样数据,每隔10s采集1次快照,在计算TPS等统计指标时,只须用两个快照之差除以两个快照之间的时间。

samplingInMinutes与samplingInHour的采样方式与samplingInSeconds方法的实现一样,就不重复讲解了。值得注意的是,samplingInMinutes每10min采集1次快照,而samplingInHour每1h采集1次,内部维护的LinkedList<CallSnapshot> csListDay保存的元素个数为25,即最后一个元素与第一个元素相隔时间为1天。

根据采样计算统计指标

在RocketMQ中由StatsItem类的printAtMinutes方法实现秒级别的统计信息,具体实现如代码清单10-6所示。

代码清单10-6 StatsItem#printAtMinutes
public void printAtMinutes() {
    StatsSnapshot ss = computeStatsData(this.csListMinute);
    log.info(String.format("[%s] [%s]Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f ",this.statsName, this.statsKey, ss.getSum(), ss.getTps(), ss.getAvgpt()));
}

该方法每分钟执行1次,将计算出来的监控指标以日志文件的形式输出在RocketMQ的日志文件中,其路径默认为 ${user.home}/logs/rocketmqlogs/stats.log。下面重点分析其指标计算的逻辑,实现逻辑如代码清单10-7所示。

代码清单10-7 指标计算逻辑代码
private static StatsSnapshot computeStatsData(
        final LinkedList<CallSnapshot> csList) {
    StatsSnapshot statsSnapshot = new StatsSnapshot();
    synchronized (csList) {
        double tps = 0;
        double avgpt = 0;
        long sum = 0;
        if (!csList.isEmpty()) {
            CallSnapshot first = csList.getFirst();
            CallSnapshot last = csList.getLast();
            sum = last.getValue() - first.getValue();
            tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
            long timesDiff = last.getTimes() - first.getTimes();
            if (timesDiff > 0) {
                avgpt = (sum * 1.0d) / timesDiff;
            }
        }
        statsSnapshot.setSum(sum);
        statsSnapshot.setTps(tps);
        statsSnapshot.setAvgpt(avgpt);
    }
    return statsSnapshot;
}

该方法接受一个采样容器,最终的目标是计算TPS、总数与平均变更次数。

1)sum的计算逻辑是将两个采样快照的快照值相减,表示这段时间发生的采样次数。 2)tps的计算逻辑是用sum的值除以变化的时间,单位换算成秒。 3)avgpt表示单位时间内从一个快照值到另外一个快照值发生变化的速率。

为了便于理解,举例说明如下。 在1min内,消息发送者使用了RocketMQ的批量消息,一次发送10条,共调用批量接口发送了6次消息,那上面各个值是怎么计算的呢?

1)sum:60。 2)tps:1tps,用sum除以时间即可。 3)avgpt:用sum除以count,最终计算结果为10,表示一次调用改变的平均数值。