XREADGROUP:读取消费者组中的消息

XREADGROUP 命令是消费者组版本的 XREAD 命令,用户可以使用这个命令读取消费者组中的消息:

XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS stream [stream ...] id [id ...]
bash

XREADGROUP 命令的格式与 XREAD 命令的格式基本相同,主要的区别在于前者多了一个用于指定消费者组和消费者的 GROUP 选项:

GROUP group consumer
bash

通过以上两个参数,用户可以在执行读取操作的同时,说明自己想要读取的消费者组以及执行该操作的消费者。

举个例子,如果我们想要以消费者worker1的身份,从流cgs的allmessage消费者组中读取第一条ID大于10086的消息,那么可以执行以下命令:

redis> XREADGROUP GROUP all-message worker1 COUNT 1 STREAMS cgs 10086
1) 1) "cgs"
2) 1) 1) 1534752640195-0
2) 1) "k1"
2) "v1"
bash

XREADGROUP命令在读取消息的同时,还会将该消息分别添加到消费者组的待处理消息队列以及消费者的待处理消息队列中,从而使得被读取消息的状态从原来的“未递送”转变成“待处理”,这一点可以通过以下两条命令来确认:

-- 查看消费者组all-message的待处理消息队列
redis> XPENDING cgs all-message
1) (integer) 1
2) 1534752640195-0
3) 1534752640195-0
4) 1) 1) "worker1"
2) "1"
-- 查看消费者worker1的待处理消息队列
redis> XPENDING cgs all-message - + 1 worker1
1) 1) 1534752640195-0
2) "worker1"
3) (integer) 1791952
4) (integer) 1
bash

XREADGROUP 命令除了会把被读取的消息添加到上述两个队列之外,还会将最后一条被读取的消息的ID设置成消费者组的最后递送消息ID,这一点可以通过以下命令来确认:

redis> XINFO groups cgs
1) 1) name
2) "all-message"
3) consumers
4) (integer) 1
5) pending
6) (integer) 1
7) last-delivered-id
8) 1534752640195-0
bash

消费者组的待处理消息队列记录了所有已经被递送但是尚未被确认的待处理消息,而每个消费者各自专属的待处理消息队列则记录了各个消费者所属的待处理消息,这两个队列的存在使得Redis不会错误地将同一条消息递送给不同的消费者,也可以让用户在处理完一条消息之后,通过XACK命令对其进行确认。至于最后递送消息ID的存在则保证了消费者组只会向消费者递送新出现的消息,而不会重复地递送已经递送过的旧消息(除非用户显式地要求进行这一操作)。

读取未递送过的新消息

前面在介绍XREAD命令时曾经提到过,用户可以通过将id参数的值设置为$,在不知道最后一条消息的ID的情况下,获取新出现的消息。 XREADGROUP命令同样可以执行类似的操作,只要将id参数的值设置为特殊符号>,命令就会自动地向消费者返回尚未递送过的新消息。

举个例子,如果我们想要在不知道消费者组all-message最后递送消息 ID 的情况下,获取第一条尚未递送过的消息,那么只需要执行以下命令即可:

redis> XREADGROUP GROUP all-message worker1 COUNT 1 STREAMS cgs >
1) 1) "cgs"
2) 1) 1) 1534752642829-0
2) 1) "k2"
2) "v2"
bash

其他信息

  • 复杂度:对于用户给定的每个流,从流中获取消息的复杂度为 O(log(N)+M),其中N为流包含的消息数量,而M则为被获取消息的数量。因此对于用户给定的I个流,获取这些流消息的总复杂度为 O((log(N)+M)*I)。

  • 版本要求:XREADGROUP命令从Redis 5.0.0版本开始可用。