如何采集监控指标

RocketMQ Broker端虽然会使用类似滑动窗口机制将监控指标存储在内存中,但只会保留一个采样周期,例如1min、1h。如果需要将监控数据进行可视化展现,就需要定时将监控数据进行持久化存储,该如何做呢?

其实RocketMQ官方已经提供了示例。RocketMQ提供了statsAll命令,用来查看统计数据。接下来,我们通过源码分析statsAll命令的实现类来展示如何查询RocketMQ中的监控数据。

statsAll命令的实现类为StatsAllSubCommand,我们直奔主题进入StatsAllSubCommand的execute()方法。接下来详细介绍该命令的执行流程,如代码清单10-8所示。

代码清单10-8 StatsAllSubCommand#execute
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
        throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
    try {
        defaultMQAdminExt.start();                // 省略部分代码
    }
}

第一步:构建DefaultMQAdminExt对象,并设置实例名称为当前时间戳,调用start()方法启动,RocketMQ的官方运维命令的执行都是基于该对象。

TopicList topicList = defaultMQAdminExt.fetchAllTopicList();

第二步:通过DefaultMQAdminExt对象的fetchAllTopicList方法获取集群中所有的topic。在RocketMQ中所有与运维管理相关的方法都定义在该类中。接着遍历主题列表,从服务器依次获取相关的监控数据,如代码清单10-9所示。

代码清单10-9 StatsAllSubCommand#execute
for (String topic : topicList.getTopicList()) {
    try {
        printTopicDetail(defaultMQAdminExt, topic, activeTopic);
    } catch (Exception e) {
    }
}

第三步:遍历topic的路由信息,向所有的Broker查看关于该topic的统计信息,具体由DefaultMQAdminExt的viewBrokerStatsData()方法实现,如代码清单10-10所示。

代码清单10-10 DefaultMQAdminExt#viewBrokerStatsData
public BrokerStatsData viewBrokerStatsData (String brokerAddr, String statsName, String statsKey){
    return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
}

第四步:调用上述方法向Broker发送VIEW_BROKER_STATS_DATA查询监控数据,各个参数的含义如下。

1)String brokerAddr:broker的IP与端口号。 2)String statsName:监控指标类型,对应TOPIC_PUT_NUMS(主题写入数量)。 3)String statsKey:具体的统计key,如果statsName为TOPIC_PUT_NUMS,则对应的key为topic。

通过对StatsAllSubCommand命令进行简单的梳理,可以得出一个非常重要的结论:运维相关的命令可使用DefaultMQAdminExt提供的各种方法,将它们进行组合,可以实现特定的需求。

现在,我们可以单独编写监控数据持久化功能,采用定时任务的方式定时向RocketMQBroker查询内存中的统计数据,并将其存储在关系型数据库或时序数据库中,提供报表的方式结合图形化进行可视化展示,实现丰富多彩的监控系统。