XPENDING:显示待处理消息的相关信息

用户可以通过XPENDING命令,获取指定流的指定消费者组目前的待处理消息的相关信息:

XPENDING stream group [start stop count] [consumer]

这些信息包括待处理消息的数量、待处理消息队列中的首条消息和最后一条消息的ID(前者是队列中ID最小的消息,而后者则是队列中ID最大的消息),以及该组名下各个消费者正在处理的消息数量(没有在处理消息的消费者将被省略)。

举个例子,如果我们想要知道流cgs的all-message消费者组目前的待处理消息相关信息,那么可以执行以下命令:

redis> XPENDING cgs all-message
1) (integer) 2 -- 待处理消息的数量
2) 1534435172217-0 -- 首条消息的ID
3) 1534435256529-0 -- 最后一条消息的ID
4) 1) 1) "worker1" -- 各个消费者目前正在处理的消息数量
2) "1"
2) 1) "worker2"
2) "1"

从命令返回的结果可以看到,这个消费者组目前有两条待处理消息,其中首条消息的ID为1534435172217-0,最后一条消息的ID为 1534435256529-0。至于消费者方面,这个消费者组目前有两个正在处理消息的消费者,分别为worker1和worker2,这两个消费者各自都在处理一条消息。

为了进一步检阅待处理消息的相关细节,用户可以在执行XPENDING命令的时候,提供可选的start、stop和count这3个参数。其中start和 stop两个参数用于指定消息的ID范围区间,而count参数则用于限制被检阅的消息数量。值得一提的是,XPENDING命令的这3个参数与XRANGE 命令的3个同名参数具有相同的作用,因此所有使用XRANGE命令能够执行的区间操作在XPENDING命令中都可以执行。

比如,通过执行以下命令,我们可以获取ID为1534435172217-0的待处理消息的更详细消息:

redis> XPENDING cgs all-message 1534435172217-0 1534435172217-0 1
1) 1) 1534435172217-0 -- 消息ID
2) "worker1" -- 所属消费者
3) (integer) 52490194 -- 消息最后一次递送给消费者之后,过去了多少毫秒
4) (integer) 1 -- 消息被递送的次数

从命令返回的结果可以看到,这条ID为1534435172217-0的消息正在由消费者worker1进行处理,这条消息只递送了一次,并且从递送到现在已经过去了52490194ms。

再举个例子,如果我们不想限制ID的范围,只想将命令返回消息的最大数量限制为5条,那么可以将起始ID设置为-,结束ID设置为+,并执行以下命令:

-- 取出最多5条待处理消息的相关信息,但不限制具体的消息ID
redis> XPENDING cgs all-message - + 5
1) 1) 1534435172217-0
2) "worker1"
3) (integer) 52656386
4) (integer) 1
2) 1) 1534435256529-0
2) "worker2"
3) (integer) 52572179
4) (integer) 1

最后,在使用上述 3 个参数的时候,用户还可以再提供一个可选的 consumer参数,这样命令就会只列出与给定消费者相关联的待处理消息。

比如,通过执行以下命令,我们可以只列出消费者worker1的待处理消息的相关信息:

redis> XPENDING cgs all-message - + 5 worker1
1) 1) 1534435172217-0
2) "worker1"
3) (integer) 52924618
4) (integer) 1

其他信息

  • 复杂度:执行 XPENDING stream group 格式的 XPENDING 的复杂度为 O(N),其中N为消费者组目前拥有的消费者数量;执行带有start、stop和count参数的XPENDING命令的复杂度为O(log(N)+M),其中N 为消费者组目前拥有的待处理消息总数量,而M则是命令返回的消息数量;执行带有consumer参数的XPENDING命令的复杂度为O(log(N)+M),其中N为该消费者目前拥有的待处理消息数量,而M则为命令返回的消息数量。

  • 版本要求:XPENDING命令从Redis 5.0.0版本开始可用。