RocketMQ监控与运维命令

搭建 RocketMQ 监控平台 rocketmq-console

rocketmq-console 下载路径为 https://github.com/875279177/incubator-rocketmq-externals ,这是一个标准的Maven项目,基于Spring Boot并内嵌了Web服务器。

第一步:修改配置文件application.properties,主要修改端口号、rocketmq.config.dataPath,如代码清单11-22所示。

代码清单11-22 rocketmq-console配置文件
server.contextPath=
# Web服务器端口号
server.port=8080
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#NameServer地址
rocketmq.config.namesrvAddr=127.0.0.1:9876
rocketmq.config.isVIPChannel=
#rocketmq-console’s data path:dashboard/monitor
rocketmq.config.dataPath=D:/rocketmq/logs/data
rocketmq.config.enableDashBoardCollect=true
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
rocketmq.config.loginRequired=false
rocketmq.config.accessKey=rocketadmin
rocketmq.config.secretKey=12345678

第二步:修改日志的存放位置。

第三步:运行org.apache.rocketmq.console.App,启动监控程序,在浏览器中输入http://url:serverport,打开监控界面。

监控图表界面如图11-3所示。

image 2025 02 06 22 32 57 527
Figure 1. 图11-3 rocketmq-console监控图表界面

集群信息界面如图11-4所示。

image 2025 02 06 22 33 22 718
Figure 2. 图11-4 rocketmq-console集群信息界面

Topic 主题管理界面如图11-5所示。

image 2025 02 06 22 33 47 516
Figure 3. 图11-5 rocketmq-console主题管理界面

消息消费界面如图11-6所示。

image 2025 02 06 22 34 20 981
Figure 4. 图11-6 rocketmq-console消息消费界面

生产者界面如图11-7所示。

image 2025 02 06 22 34 43 912
Figure 5. 图11-7 rocketmq-console生产者界面

消息查询界面如图11-8所示。

image 2025 02 06 22 35 06 375
Figure 6. 图11-8 rocketmq-console消息查询界面

对rocketmq-console进行打包与部署。修改application.properties与logback.xml中服务器的端口号、NameServer地址、rocketmq-console监控数据目录、日志文件路径,然后执行如下命令进行打包。

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-2.0.0.jar

rocketmq-console监控平台的内部调用了RocketMQ提供的运维管理命令,下面进行详细介绍。

RocketMQ管理命令

${ROCKETMQ_HOME}/bin/mqadmin 中执行 RocketMQ 运维命令,RocketMQ 命令客户端目前支持的命令如代码清单11-23所示。

public static void initCommand() {
    initCommand(new UpdateTopicSubCommand());
    initCommand(new DeleteTopicSubCommand());
    initCommand(new UpdateSubGroupSubCommand());
    initCommand(new DeleteSubscriptionGroupCommand());
    initCommand(new UpdateBrokerConfigSubCommand());
    initCommand(new UpdateTopicPermSubCommand());
    initCommand(new TopicRouteSubCommand());
    initCommand(new TopicStatusSubCommand());
    initCommand(new TopicClusterSubCommand());
    initCommand(new BrokerStatusSubCommand());
    initCommand(new QueryMsgByIdSubCommand());
    initCommand(new QueryMsgByKeySubCommand());
    initCommand(new QueryMsgByUniqueKeySubCommand());
    initCommand(new QueryMsgByOffsetSubCommand());
    initCommand(new QueryMsgByUniqueKeySubCommand());
    initCommand(new PrintMessageSubCommand());
    initCommand(new PrintMessageByQueueCommand());
    initCommand(new SendMsgStatusCommand());
    initCommand(new BrokerConsumeStatsSubCommand());
    initCommand(new ConsumerConnectionSubCommand());
    initCommand(new ConsumerProgressSubCommand());
    initCommand(new ConsumerStatusSubCommand());
    initCommand(new CloneGroupOffsetCommand());
    initCommand(new ClusterListSubCommand());
    initCommand(new TopicListSubCommand());
    initCommand(new UpdateKvConfigCommand());
    initCommand(new DeleteKvConfigCommand());
    initCommand(new WipeWritePermSubCommand());
    initCommand(new ResetOffsetByTimeCommand());
    initCommand(new UpdateOrderConfCommand());
    initCommand(new CleanExpiredCQSubCommand());
    initCommand(new CleanUnusedTopicCommand());
    initCommand(new StartMonitoringSubCommand());
    initCommand(new StatsAllSubCommand());
    initCommand(new AllocateMQSubCommand());
    initCommand(new CheckMsgSendRTCommand());
    initCommand(new ClusterSendMsgRTCommand());
    initCommand(new GetNamesrvConfigCommand());
    initCommand(new UpdateNamesrvConfigCommand());
    initCommand(new GetBrokerConfigCommand());
    initCommand(new QueryConsumeQueueCommand());
}

创建或更新主题

创新或更新主题的实现类为org.apache.rocketmq.tools.command.topic.updateTopicSub-Command,参数说明如表11-1所示。

image 2025 02 06 22 39 19 718
Figure 7. 表11-1 updateTopic命令参数一览表

1)-b:通过Broker地址直接定位Broker服务器。 2)-c:需要向NameServer发送GET_BROKER_CLUSTER_INFO命令获取BrokerMaster服务器地址,然后向Broker发送UPDATE_AND_CREATE_TOPIC命令。Broker存储主题配置信息的默认路径为${ROCKETMQ_HOME}/store/config/topic.json,Broker通过心跳包将主题和Broker队列信息上报给NameServer,即topic的路由信息。

使用示例为$sh ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -tORDER_TOPIC。

示例返回结果如图11-9所示。

image 2025 02 06 22 40 00 858
Figure 8. 图11-9 updateTopic示例返回结果

返回字段说明如表11-2所示。

image 2025 02 06 22 40 25 548
Figure 9. 表11-2 updateTopic命令返回字段一览表

删除主题

删除主题的实现类为org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand,参数说明如表11-3所示。

image 2025 02 06 22 43 32 895
Figure 10. 表11-3 deleteTopic命令参数一览表

从NameServer获取当前所有Broker,向它们发送DELETE_TOPIC_IN_NAMESRV,从Broker中删除topic的配置信息,然后通过NameServer的心跳机制更新NameServer中关于主题的路由信息。

使用示例为 sh ./mqadmin deleteTopic -n 127.0.0.1:9876 -c DefaultCluster -tORDER_TOPIC。

示例返回结果如图11-10所示。

image 2025 02 06 22 44 12 977
Figure 11. 图11-10 deleteTopic示例返回结果

获取topic所在Broker的集群信息

获取topic所在Broker的集群信息的实现类为org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand,参数说明如表11-4所示。

image 2025 02 06 22 44 49 561
Figure 12. 表11-4 topicClusterList命令参数一览表

向NameServer发送GET_BROKER_CLUSTER_INFO命令,获取topic的集群信息,从路由表中返回Broker地址路由表与集群地址列表,然后从这两个表格中抽取集群名称,去重后返回。

使用示例为 sh ./mqadmin topicClusterList -n 127.0.0.1:9876 -t TopicTest。

示例返回结果如图11-11所示。

image 2025 02 06 22 48 20 746
Figure 13. 图11-11 topicClusterList示例返回结果

查看所有主题信息

查看所有主题信息的实现类为 org.apache.rocketmq.tools.command.topic.TopicListSubCommand,参数说明如表11-5所示。

image 2025 02 06 22 48 55 985
Figure 14. 表11-5 topicList命令参数一览表

如果不使用 -c 参数,则直接向 NameServer 节点发送GET_ALL_TOPIC_LIST_FROM_NAMESERVER命令,从NameServer服务器返回所有的主题列表并输出。如果使用-c参数,首先向NameServer发送GET_BROKER_CLUSTER_INFO命令,获取该NameServer服务器上所有的集群信息,然后从NameServer获取所有的主题列表。为了组装成“集群名称+主题名称+订阅消费组”的格式,依次遍历topic列表,查询topic的路由信息,最后取第一个Broker名称,找到该Broker所在集群中充当topic的集群名称。

使用示例为sh ./mqadmin topicList -n 127.0.0.1:9876 -c。

示例返回结果如图11-12所示。

image 2025 02 06 22 49 32 401
Figure 15. 图11-12 topicList示例返回结果

返回字段说明如表11-6所示。

image 2025 02 06 22 49 54 064
Figure 16. 表11-6 topicList命令返回字段一览表

查看topic路由信息

查看topic路由信息的实现类为org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand,参数说明如表11-7所示。

image 2025 02 06 22 50 29 843
Figure 17. 表11-7 topicRoute命令参数一览表

向NameServer发送GET_ROUTEINTO_BY_TOPIC命令,NameServer将内存中的topic队列信息按指定格式组装后返回。

使用示例为sh ./mqadmin topicRoute-n 127.0.0.1:9876 -t TopicTest。

示例返回结果如图11-13所示。

image 2025 02 06 22 51 05 533
Figure 18. 图11-13 topicRoute示例返回结果

返回字段说明如表11-8所示,路由信息的返回结果为JSON格式。

image 2025 02 06 22 51 34 244
Figure 19. 表11-8 topicList命令返回字段一览表

更新topic的读写权限

更新topic的读写权限的实现类为org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand,参数说明如表11-9所示。

image 2025 02 06 22 52 07 912
Figure 20. 表11-9 updateTopicPerm命令参数一览表

根据-b、-c定位Broker地址,然后发送UPDATE_AND_CREATE_TOPIC命令,Broker在收到该命令后会验证broker集群信息并立即返回,然后异步将主题的配置信息(读写权限)更新到内存并持久化到本地文件,最后向NameServer发送心跳包,以此更新NameServer中的路由信息,稍后会更新到各个消息发送端与消息消费端。

使用示例为sh./mqadmin updateTopicPerm -n 127.0.0.1:9876 -c DefaultCluster -tTopicTest -p 4。关闭主题TopicTest的写权限,即该topic只能被消费,通常在删除一个topic时,需要先关闭其写权限,等消息全部消费完毕后再删除该topic,避免消息丢失。

示例返回结果如图11-14所示。

image 2025 02 06 22 52 37 987
Figure 21. 图11-14 updateTopicPerm示例返回结果

查看topic队列状态

查看topic队列状态的实现类为org.apache.rocketmq.tools.command.topic.TopicStatusSub Command,参数说明如表11-10所示。

image 2025 02 06 22 53 22 890
Figure 22. 表11-10 topicStatus命令参数一览表

首先获取主题的路由信息,然后向Broker发送GET_TOPIC_STATS_INFO命令,获取该主题在每一个Broker上的配置信息并返回主题的队列信息。

使用示例为sh ./mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest。

示例返回结果如图11-15所示。

image 2025 02 06 22 53 54 127
Figure 23. 图11-15 topicStatus示例返回结果

返回字段说明如表11-11所示。

image 2025 02 06 22 54 21 082
Figure 24. 表11-11 topicStatus命令返回字段一览表

查看消息队列负载情况

查看消息队列负载情况的实现类为org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand,参数说明如表11-12所示。

image 2025 02 06 22 54 56 502
Figure 25. 表11-12 allocateMQ命令参数一览表

该命令主要是根据topic的路由信息与消息消费者列表,输出各个消息消费者队列的分配情况,分配算法为平均分配(AllocateMessageQueueAveragely)。这个命令有个缺陷,应该再增加一个参数用来指定分配算法。

使用示例为sh ./mqadmin allocateMQ -n 127.0.0.1:9876 -t TopicTest -i192.168.0.1,192.168.0.2。

示例返回结果如图11-16所示。

image 2025 02 06 22 55 35 134
Figure 26. 图11-16 allocateMQ示例返回结果

创建、更新、删除顺序消息KV配置

创建、更新、删除顺序消息KV配置的实现类为org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand,参数说明如表11-13所示。

image 2025 02 06 22 56 10 472
Figure 27. 表11-13 updateOrderConf命令参数一览表

向NameServer发送PUT_KV_CONFIG、DELETE_KV_CONFIG或GET_KV_CONFIG命令,可以更新、删除或查询该主题的配置信息(顺序消息)。

消息查找、消息发送、消息消费三合一接口

消息查找、消息发送、消息消费的实现类为org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand,参数说明如表11-14所示。

image 2025 02 06 22 56 48 800
Figure 28. 表11-14 queryMsgById命令参数一览表

该命令集合了消息查找、消息发送、消息消费三个功能。

使用示例为sh ./mqadmin queryMsgById -n 127.0.0.1:9876 -iC0A800A800002A9F0000000000000ABC。

示例返回结果如图11-17所示。

image 2025 02 06 22 57 20 479
Figure 29. 图11-17 queryMsgById示例返回结果

返回字段说明如表11-15所示。

image 2025 02 06 22 57 47 036
Figure 30. 表11-15 字段说明

根据消息索引键查询消息

根据消息索引键查询消息的实现类为org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand,参数说明如表11-16所示。

image 2025 02 06 22 58 22 583
Figure 31. 表11-16 queryMsgByKey命令参数一览表

从NameServer获取主题的路由信息,然后并发向所有Broker发送QUERY_MESSAGE查询消息,Broker服务端在收到该消息后,会从Index文件中根据key查询消息,然后返回查询结果。在发送消息时可以通过为消息指定消息key,快速检索消息。

使用示例为sh ./mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TopicTest -k OD2020031221210301。

示例返回结果如图11-18所示。

image 2025 02 06 22 58 56 266
Figure 32. 图11-18 queryMsgByKey示例返回结果

该命令虽然只返回了消息的全局唯一ID、队列ID和在消息消费队列中的偏移量,其实也能得到queryMsgById命令返回的所有字段,只是该命令没有输出而已。

根据消息唯一ID查询消息

根据消息唯一ID查询消息的实现类为org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand,参数说明如表11-17所示。

image 2025 02 06 22 59 39 209
Figure 33. 表11-17 queryMsgByUniqueKey命令参数一览表

根据消息的全局唯一ID(消息发送端在发送消息时会生成一个全局的消息ID)查询消息,如果 -d、-g 不同时为空,将直接消费该条消息。根据消息唯一ID查询消息的实现逻辑与 queryMsgByKey 一样,这是因为消息的唯一全局ID会存储在 Index 文件中。

使用示例为 sh ./mqadmin queryMsgByUniqueKey -n 127.0.0.1:9876 -t TopicTest -i00000000000000000000000000000001000018B4AAC23D3B35710000。

示例返回结果如图11-19所示。

image 2025 02 06 23 00 34 784
Figure 34. 图11-19 queryMsgByUniqueKey示例返回结果

根据消息消费队列偏移量查找消息

根据消息消费队列偏移量查找消息的实现类为org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand,参数说明如表11-18所示。

image 2025 02 06 23 01 09 433
Figure 35. 表11-18 queryMsgByOffset命令参数一览表

首先根据消息主题获取路由表,然后从路由表中根据Broker名称获取Broker的IP地址,向Broker服务器发送PULL_MESSAGE请求,Broker服务器收到请求后,根据消息队列ID与消息偏移量,从消息消费队列中读取一个条目,从而得到该消息的物理偏移量与消息长度,最后从CommitLog文件中找到对应的消息。

使用示例为sh ./mqadmin queryMsgByUniqueKey -n 127.0.0.1:9876 -t TopicTest -i00000000000000000000000000000001000018B4AAC23D3B35710000。

示例返回结果如图11-20所示。

image 2025 02 06 23 01 42 028
Figure 36. 图11-20 queryMsgByOffset示例返回结果

打印消息

打印消息的实现类为org.apache.rocketmq.tools.command.message.PrintMessageSubCommand,参数说明如表11-19所示。

image 2025 02 06 23 02 15 241
Figure 37. 表11-19 printMsg命令参数一览表

该命令其实是实现基于时间戳的消息查询,根据主题的路由信息获取该主题的消息消费队列,循环处理每一个消息消费队列,如果-b、-e为空,则从消息消费队列的最小偏移量开始查找,否则根据-b、-e时间戳查询最小偏移量和最大偏移量,每次从每个消息消费队列中最多拉取32条消息并打印输出。

使用示例为sh ./mqadmin printMsg -n 127.0.0.1:9876 -t TopicTest -c UTF-8 -b 2020-03-01#00:00:00:000 -e 2020-03-12#00:00:00:000 -d true。

示例返回结果如图11-21所示。

image 2025 02 06 23 02 49 904
Figure 38. 图11-21 printMsg示例返回结果

返回消息的列表,主要会返回当前消息所在消息消费队列的最小偏移量、最大偏移量、消息ID,以及消息的全量信息(MessageExt)和消息体字符串(通过-c编码成字符串)。

根据消息队列打印消息

根据消息队列打印消息的实现类为org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand,参数说明如表11-20所示。

image 2025 02 06 23 03 37 287
Figure 39. 表11-20 printMsgByQueue命令参数一览表

根据主题、Broker名称、消息消费队列构建MessageQueue对象,从消息服务器根据拉取偏移量循环从服务器拉取消息,如果使用-b、-e参数,则基于时间戳进行消息查询。

使用示例为sh ./mqadmin printMsgByQueue -n 127.0.0.1:9876 -t TopicTest -a broker-a-i 2 -c UTF-8 -b 2020-01-01#00:00:00:000 -e 2020-03-12#00:00:00:000 -d true -p true。

示例返回结果如图11-22所示。

image 2025 02 06 23 04 23 353
Figure 40. 图11-22 printMsgByQueue示例返回结果

返回结果与 printMsg 相同,该命令主要是根据消息消费队列进行查询。

发送消息

发送消息的实现类为org.apache.rocketmq.tools.command.message.SendMessageCommand,参数说明如表11-21所示。

image 2025 02 06 23 05 05 358
Figure 41. 表11-21 sendMessage命令参数一览表

该命令的实现比较简单,就是调用消息发送API发送消息。

使用示例为sh ./mqadmin sendMessage -n 127.0.0.1:9876 -t TopicTest -p 'Hello sendMsg1' -k 'testKey'。

示例返回结果如图11-23所示。

image 2025 02 06 23 05 37 443
Figure 42. 图11-23 sendMessage示例返回结果

返回消息存储Broker名称、消息队列、发送结果和全局唯一消息ID。

检测消息发送响应时间

检测消息发送响应时间的实现类为org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand,参数说明如表11-22所示。

image 2025 02 06 23 06 16 544
Figure 43. 表11-22 checkMsgSendRT命令参数一览表

构建测试消息,默认测试消息长度为128个字节,然后循环调用消息发送API向Broker服务端发送消息,记录每次消息的响应时间,最终算出一个平均响应时间(剔除第一次消息发送的响应时间)。

使用示例为sh ./mqadmin checkMsgSendRT -n 127.0.0.1:9876 -t TopicTest -a 10 -s128。

示例返回结果如图11-24所示。

image 2025 02 06 23 06 52 020
Figure 44. 图11-24 checkMsgSendRT示例返回结果

查询消息消费队列

查询消息消费队列的实现类为 org.apache.rocketmq.tools.command.queue.QueryConsume QueueCommand,参数说明如表11-23所示。

image 2025 02 06 23 07 24 439
Figure 45. 表11-23 queryCq命令参数一览表

查询消息消费队列的信息,建议指定-b参数,否则会选择路由表中的第一个broker。向消息服务器发送QUERY_CONSUME_QUEUE命令,broker返回该消息消费队列的最大、最小偏移量,如果指定了消费组,则返回消费组的订阅信息。

使用示例为sh ./mqadmin queryCq -n 127.0.0.1:9876 -t TopicTest -q 1 -i 1 -b192.168.0.166:10911 -c 2 -g CID_CONSUMER_TEST。

示例返回结果如图11-25所示。

image 2025 02 06 23 08 06 552
Figure 46. 图11-25 queryCq示例返回结果

返回结果主要包含如下3部分。

1)Subscription Data:如果指定了-g参数,则返回消费组的订阅信息。 2)Filter Data:如果订阅信息为空,则尝试返回消费组的消息过滤配置信息。 3)Queue data:消费队列信息,主要返回该消息消费队列的最新偏移量、最大偏移量和消息条目的信息,默认会返回10条,可以通过-c参数指定返回调试,通过-i参数指定从哪条消息开始返回,主题包含其该消息的物理偏移量、消息长度、消息tag的哈希码等数据。

重置消费进度(resetOffsetByTime)

实现类为org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand,参数说明如表11-24所示。

image 2025 02 06 23 08 50 782
Figure 47. 表11-24 resetOffsetByTime命令参数一览表

根据主题的路由信息找出所有 Master broker 地址,逐一发送 INVOKE_BROKER_TO_RESET_OFFSET 命令,Broker 服务端收到请求后,首先根据指定的时间找到待重置时间点的消息所在的位置信息,然后向该消费组所有在线的客户端发送 RESET_CONSUMER_CLIENT_OFFSET 命令。客户端在收到命令后,更新服务端返回的待重置位点信息,并将原处理队列丢弃,将位点信息存入客户端内存,下次消息拉取将从重置后的位点开始消费,从而更新服务端的信息,实现重置位点。如果消费组没有在线的消费者,则会使用 resetOffsetByTimeOld 命令更新位点。

使用示例为sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -gCID_CONSUMER_TEST -t TopicTest -s now。

示例返回结果如图11-26所示。

image 2025 02 06 23 09 41 263
Figure 48. 图11-26 resetOffsetByTime示例返回结果

重置消费进度(resetOffsetByTimeOld)

实现类为org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeOldCommand,参数说明如表11-25所示,在客户端不在线情况使用。

image 2025 02 06 23 10 11 008
Figure 49. 表11-25 resetOffsetByTimeOld命令参数一览表

旧版本的重置消费位点命令先根据时间戳查询该主题下对应的消费进度,然后向Broker发送UPDATE_CONSUMER_OFFSET命令直接更新服务端的消费进度。

使用示例为sh ./mqadmin resetOffsetByTimeOld -n 127.0.0.1:9876 -gCID_CONSUMER_TEST -t TopicTest -s now。

返回结果与resetOffsetByTime命令类似,在RocketMQ4.6版移除了该命令,将其整合到resetOffsetByTime命令中。

对应低版本的RocketMQ,如果resetOffsetByTime无法重置消费位点,可以停掉消费者,然后执行resetOffsetByTimeOld命令继续执行重置位点。

复制消费组进度

复制消费组进度的实现类为org.apache.rocketmq.tools.command.offset.CloneGroupOffset Command,参数如表11-26所示。