消费者组
Redis 流的消费者组(consumer group)允许用户将一个流从逻辑上划分为多个不同的流,并让消费者组属下的消费者去处理组中的消息。
创建消费者组
创建消费者组的操作可以通过执行 XGROUP CREATE 命令来完成,该命令是 XGROUP 命令的一个子命令:
XGROUP CREATE stream group start_id
命令中的 stream 参数用于指定流的名字,group 参数用于指定将要创建的消费者组的名字。此外,start_id 参数用于指定消费者组在流中的起始 ID,这个 ID 决定了消费者组要从流的哪个 ID 之后开始进行读取。举个例子,如果用户将 0 用作 start_id 参数的值,那么说明用户希望从流的开头进行读取;而如果用户将 10000000 用作 start_id 参数的值,那么说明用户希望读取流中 ID 大于 10000000 的消息。
作为例子,图10-17展示了一个拥有 3 个消费者组的流,其中:
-
消费者组g1以ID 0为起点,该组的消费者能够读取流中ID从1到9在内的所有消息。
-
消费者组g2以ID 4为起点,该组的消费者能够读取流中ID从5到9在内的所有消息。
-
消费者组g3以ID$为起点,该组的消费者能够读取流中ID大于9的新消息。

通过为不同的消费者组设置不同的起点 ID,我们把一个流从逻辑上划分成了 3 个不同的流,它们包含各不相同的元素,如图10-18所示。

同一个流的消息在不同消费者组之间是共享而不是独占的,换句话说,流中的同一条消息可以被多个不同组的消费者读取,并且来自不同消费者组的读取操作不会对其他消费者组的读取操作产生任何影响。比如对于图10-17所示的3个消费者组来说,不仅g1的消费者可以读取到ID为5的消息,g2的消费者同样也可以读取到ID为5的消息。
读取消费者组
客户端可以通过执行 XREADGROUP 命令来读取消费者组中的消息:
XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS stream [stream ...] id [id ...]
这个命令的基本参数及作用与 XREAD 命令大同小异,主要区别在于新增的 GROUP group consumer 选项,该选项的两个参数分别用于指定被读取的消费者组以及负责处理消息的消费者。
消费者组在创建之后就会跟踪并维护一系列信息和数据结构,其中包括:
-
该组属下的消费者名单。
-
一个队列,记录了该组目前处于“待处理”状态的所有消息,简称待处理消息队列。
-
该组最后递送的消息的ID。
当用户调用 XREADGROUP 命令对消费者组进行读取之后,命令就会按需更新上述3项信息。比如,如果用户执行的是以下命令:
XREADGROUP GROUP g1 c1 STREAMS msgs 0
并且读取出了一条ID为 10086 的消息,那么命令将对消费者组的相关信息执行以下更新:
-
如果消费者 c1 是第一次读取这个消费者组,那么将该消费者添加到该组的消费者名单中。
-
将被读取的消息添加到该组的待处理消息队列中。
-
将 10086 设置为该组的最后递送消息 ID。
对于创建之后还未执行过任何读取操作的新消费者组来说,该组的最后递送消息 ID 就是用户创建消费者组时给定的起始 ID,这就是为什么用户在读取消费者组的时候只能够读取到大于起始 ID 的消息。
消费者
从逻辑上来说,消费者就是负责处理消息的客户端。与创建消费者组不一样,消费者不用显式地创建,用户只要在执行 XREADGROUP 命令时给定消费者的名字,Redis 就会自动为新出现的消费者创建相应的数据结构。
与消费者组一样,消费者也会维护一个属于自己的待处理消息队列:每当用户使用 XREADGROUP 命令读取出一条消息,并将这条消息指派给一个消费者处理时,该消费者就会把所指派的消息添加到自己的待处理消息队列中。
需要注意的是,与多个消费者组能够共享同一个流中的元素不一样,同一消费者组中的每条消息只能有一个消费者,换句话说,不同的消费者将独占组中的不同消息:当一个消费者读取了组中的一条消息之后,同组的其他消费者将无法读取这条消息。
消息的状态转换
当消费者处理完一条消息之后,它需要向 Redis 发送一条针对该消息的 XACK 命令:
XACK stream group id [id id ...]
当 Redis 接收到消费者发来的 XACK 命令之后,就会从消费者组的待处理消息队列以及消费者的待处理消息队列中移除指定的消息。这样一来,这些消息的状态就会从 “待处理” 转换为 “已确认”,以此来表示消费者已经处理完这些消息了。
综合起来,一条消费者组消息从出现到处理完毕,需要经历以下阶段:
-
首先,当一个生产者通过 XADD 命令向流中添加一条消息时,该消息就从原来的 “不存在” 状态转换成了 “未递送” 状态。
-
然后,当一个消费者通过 XREADGROUP 命令从流中读取一条消息时,该消息就从原来的 “未递送” 状态转换成了 “待处理” 状态。
-
最后,当消费者完成了对消息的处理,并通过 XACK 命令向服务器进行确认时,该消息就从原来的 “待处理” 状态转换成了 “已确认” 状态。
图10-19展示了消费者组消息的状态转换过程。

实际示例
关于消费者组我们已经了解得足够多了,现在是时候来实际地创建一个消费者组并尝试执行相关的操作了。首先,通过执行 XGROUP CREATE 命令,并将流名 cgs、消费者组名 all-message 和起始 ID 0 用作参数,我们可以创建出相应的消费者组:
redis> XGROUP CREATE cgs all-message 0
O
通过执行以下 XREADGROUP 命令,我们可以以消费者 worker1 的身份,从消费者组 all-message 中读取出相应的消息:
redis> XREADGROUP GROUP all-message worker1 STREAMS cgs 0
1) 1) "cgs" -- 来源流
2) 1) 1) 1535875626221-0 -- 消息
2) 1) "k1"
2) "v1"
2) 1) 1535875628970-0 -- 消息
2) 1) "k2"
2) "v2"
在执行读取操作之后,我们可以通过执行 XPENDING 命令以及 XINFO GROUPS 命令查看消费者组的相关信息,其中 XPENDING 命令用于列出消费者组目前待处理消息的相关信息,XINFO GROUPS 命令则用于列出与给定流相关联消费者组的相关信息:
redis> XPENDING cgs all-message
1) (integer) 2 -- 消费者组目前处于待处理状态的消息数量
2) 1535875626221-0 -- 最小的待处理消息ID
3) 1535875628970-0 -- 最大的待处理消息ID
4) 1) 1) "worker1" -- 消费者的名字
2) "2" -- 该消费者正在处理的消息数量
redis> XINFO GROUPS cgs
1) 1) name -- 消费者组的名字
2) "all-message"
3) consumers -- 属下消费者的数量
4) (integer) 1
5) pending -- 该组目前的待处理消息数量
6) (integer) 2
7) last-delivered-id -- 该组目前的最后递送消息ID
8) 1535875628970-0
在消费者处理完 ID 为 1535875626221 的消息之后,我们可以使用以下命令对其进行确认:
redis> XACK cgs all-message 1535875626221-0
(integer) 1
正如之前所说,被确认的消息将从消费者组的待处理消息队列中消失,这一点可以通过再次执行 XPENDING 命令来确认:
redis> XPENDING cgs all-message
1) (integer) 1
2) 1535875628970-0
3) 1535875628970-0
4) 1) 1) "worker1"
2) "1"
从命令的执行结果可以看出,这个消费者组现在只有一条待处理消息了。
关于消费者组的基本介绍至此就结束了,本章后续的内容将对消费者组的相关命令做更详细的介绍。