XGROUP:管理消费者组

创建消费者组

通过执行 XGROUP CREATE 命令,用户可以为流创建一个具有指定名字的消费者组:

XGROUP CREATE stream group id
bash

命令的id参数指定了消费者组的最后递送消息ID,这个ID限定了消费者能够接收到的消息范围:消费者组属下的消费者只能接收到ID大于最后递送消息ID的消息,并且消费者组的最后递送消息ID还会随着消费者执行的读取操作而不断更新。

XGROUP CREATE命令目前只能为已经存在的流创建消费者组,如果用户给定的流不存在,那么命令将返回一个错误:

redis> XGROUP CREATE not-exists-stream all-message
(error) ERR no such key
bash

如果一切正常,那么 XGROUP CREATE 命令在成功执行之后将返回 OK。

作为例子,以下代码展示了如何为流cgs创建一个名为 all-message 的消费者组,并将该组的最后递送消息ID设置为0:

redis> XGROUP CREATE cgs all-message 0-0
OK
redis> XINFO GROUPS cgs
1) 1) name
2) "all-message"
3) consumers
4) (integer) 0
5) pending
6) (integer) 0
7) last-delivered-id
8) 0-0
bash

这样,流 cgs 中的所有消息都会成为消费者组 all-message 属下消费者的消费对象。

其他信息

  • 复杂度:O(1)。

  • 版本要求:XGROUP CREATE 命令从 Redis 5.0.0 版本开始可用。

修改消费者组的最后递送消息ID

对于一个已经存在的消费者组来说,用户可以通过执行 XGROUP SETID 命令来为消费者组设置新的最后递送消息 ID:

XGROUP SETID stream group id
bash

命令给定的 ID 可以是任意合法的消息 ID,ID 对应的消息不必实际存在,并且新 ID 可以大于、小于甚至等于当前 ID。

举个例子,对于以下这个名为 all-message 的消费者组:

redis> XINFO GROUPS cgs
1) 1) name
2) "all-message"
3) consumers
4) (integer) 0
5) pending
6) (integer) 0
7) last-delivered-id
8) 0-0
bash

我们可以通过执行以下命令,将该组的最后递送消息ID设置为10086:

redis> XGROUP SETID cgs all-message 10086
OK
redis> XINFO GROUPS cgs
1) 1) name
2) "all-message"
3) consumers
4) (integer) 0
5) pending
6) (integer) 0
7) last-delivered-id
8) 10086-0 -- ID已改变
bash

除了合法的消息ID之外,特殊符号$也可以用作id参数的值,这个符号可以把消费者组的最后递送消息ID设置为流最新消息的ID:

redis> XADD cgs * k v -- 向流插入一条新消息
1534670632240-0
redis> XGROUP SETID cgs all-message $ -- 执行修改命令
OK
redis> XINFO GROUPS cgs -- 最后递送ID已被修改
1) 1) name
2) "all-message"
3) consumers
4) (integer) 2
5) pending
6) (integer) 5
7) last-delivered-id
8) 1534670632240-0
bash

需要注意的是,使用 XGROUP SETID 命令显式地修改最后递送消息 ID 将对后续执行的 XREADGROUP 命令的结果产生影响,简单来说:

  • 如果新ID大于旧ID,那么消费者可能会漏掉一些原本应该读取的消息。

  • 如果新ID小于旧ID,那么消费者可能会重新读取到一些之前已经被确认过的消息。

鉴于此,用户应该谨慎地使用 XGROUP SETID 命令,并且只在不会引发错误的情况下使用它。

其他信息

  • 复杂度:O(1)。

  • 版本要求:XGROUP SETID 命令从 Redis 5.0.0 版本开始可用。

删除消费者

当用户不再需要某个消费者的时候,可以通过执行以下命令将其删除:

XGROUP DELCONSUMER stream group consumer
bash

命令在执行之后将返回一个数字作为结果,这个数字就是消费者被删除时,它仍在处理的消息数量。

举个例子,对于拥有 worker1 和 worker2 这两个消费者的消费者组 allmessage 来说:

redis> XINFO CONSUMERS cgs all-message
1) 1) name -- 消费者的名字
2) "worker1"
3) pending -- 消费者正在处理的消息数量
4) (integer) 2
5) idle -- 消费者闲置的时间
6) (integer) 44481
2) 1) name
2) "worker2"
3) pending
4) (integer) 3
5) idle
6) (integer) 24816
bash

我们可以通过执行以下命令将消费者 worker1 删除:

redis> XGROUP DELCONSUMER cgs all-message worker1
(integer) 2 -- 这个消费者还有两条消息未确认
bash

现在,worker1 将不再是 all-message 属下的消费者:

redis> XINFO CONSUMERS cgs all-message
1) 1) name
2) "worker2"
3) pending
4) (integer) 3
5) idle
6) (integer) 72596
bash

需要注意的是,当消费者被删除之后,它在被删除时处理的消息也会从消费者组的待处理消息队列中移除。换句话说,属于被删除消费者的待处理消息将不再处于“待处理”状态,这些消息可能已经被消费者处理掉了,但也可能尚未得到妥善的处理。

为了避免这个问题,用户在删除一个消费者之前应该确保递送给它的所有消息均已处理完毕,或者使用XCLAIM命令显式地转移待处理消息的归属权。换句话说,为了保证程序的正确性,用户应该保证每个 XGROUP DELCONSUMER 命令的返回值都为0。

其他信息

  • 复杂度:O(N),其中N为被删除消费者正在处理的消息数量。

  • 版本要求:XGROUP DELCONSUMER 命令从 Redis 5.0.0 版本开始可用。

删除消费者组

与上一个命令类似,当一个消费者组完成了它的任务之后,用户可以通过执行以下命令来删除它:

XGROUP DESTROY stream group
bash

命令在成功执行时返回1,因为组不存在等原因导致命令执行失败时返回0。

以下是一个XGROUP DESTROY命令的执行示例:

-- 删除cgs流的all-message消费者组
redis> XGROUP DESTROY cgs all-message
(integer) 1
-- cgs 流现在已经不再拥有任何消费者组了
redis> XINFO GROUPS cgs
(empty list or set)
bash

注意,与10.8.4节介绍XGROUP DELCONSUMER命令时提到的问题一样,为了保证程序的正确性,用户需要保证在删除消费者组的时候,组中已经没有任何待处理消息,否则这些待处理消息可能无法得到妥善的处理。

其他信息

  • 复杂度:O(N+M),其中N为消费者组被删除时,仍处于“待处理”状态的消息数量,而M则是该组属下消费者的数量。

  • 版本要求:XGROUP DESTROY命令从Redis 5.0.0版本开始可用。