XINFO:查看流和消费者组的相关信息

Redis向用户提供了XINFO命令用于查看流及其消费者组的相关信息,该命令提供了多个具备不同功能的子命令,接下来将分别对这些子命令进行介绍。

打印消费者信息

XINFO CONSUMERS命令用于打印指定消费者组的所有消费者,以及这些消费者的相关信息:

XINFO CONSUMERS stream group-name

命令打印的信息包括消费者的名字、它们正在处理的消息数量以及消费者的闲置时长。

以下是一个XINFO CONSUMERS命令的使用示例:

redis> XINFO CONSUMERS cgs all-message
1) 1) name -- 消费者的名字
2) "worker1"
3) pending -- 正在处理的消息数量
4) (integer) 1
5) idle -- 毫秒格式的闲置时长
6) (integer) 50899
2) 1) name
2) "worker2"
3) pending
4) (integer) 0
5) idle
6) (integer) 7371

这个命令调用返回了两个消费者,分别是worker1和worker2,其中 worker1正在处理一条消息,并且它的闲置时长为50899ms。

打印消费者组信息

XINFO GROUPS命令用于打印与给定流相关联的所有消费者组,以及这些消费者组的相关信息:

XINFO GROUPS stream

命令打印的信息包括消费者组的名字、它拥有的消费者数量、组中正在处理消息的数量以及该组最后递送消息的ID。

以下是一个XINFO GROUPS命令的使用示例:

redis> XINFO GROUPS cgs
1) 1) name -- 组名
2) "all-message"
3) consumers -- 消费者数量
4) (integer) 2
5) pending -- 组中正在处理的消息数量
6) (integer) 1
7) last-delivered-id -- 最后递送消息的ID
8) 1532339991221-0

这个命令调用返回了流cgs目前唯一的一个消费者组all-message的相关信息,从这些信息可知,该组目前拥有两个消费者,组中正在处理的消息数量为1个,而该组最后递送的消息的ID为1532339991221-0。

打印流消息

XINFO STREAM命令用于打印给定流的相关信息:

XINFO STREAM stream

命令打印的信息包括流的长度(包含的消息数量)、流在底层的基数树表示的相关信息、流相关的消费者组数量、流最后生成的消息的ID以及流的第一个节点和最后一个节点。

以下是对cgs流执行XINFO STREAM命令的结果:

redis> XINFO STREAM cgs
1) length -- 长度
2) (integer) 1
3) radix-tree-keys -- 基数树的键数量
4) (integer) 1
5) radix-tree-nodes -- 基数树的节点数量
6) (integer) 2
7) groups -- 与之相关联的消费者组数量
8) (integer) 1
9) last-generated-id -- 最后生成的消息的ID
10) 1532339991221-0
11) first-entry -- 流的第一个节点
12) 1) 1532339991221-0
2) 1) "msg"
2) "initial message"
13) last-entry -- 流的第二个节点
14) 1) 1532339991221-0
2) 1) "msg"
2) "initial message"

从命令打印出的信息可以看到,cgs流目前的长度为1,它的底层表示基数树包含一个键和两个节点,它有一个相关联的消费者组,它最后生成的消息的ID为1532339991221-0,并且这个消息也是这个流的第一个和最后一个消息。

其他信息

  • 复杂度:XINFO CONSUMERS命令的复杂度为O(N),其中N为给定消费者组的消费者数量;XINFO GROUPS命令的复杂度为O(M),其中M为给定流属下的消费者组数量;XINFO STREAM命令的复杂度为O(1)。

  • 版本要求:XINFO CONSUMERS、XINFO GROUPS和XINFO STREAM这3条命令从Redis5.0.0版本开始可用。

示例:为消息队列提供消费者组功能

在稍早之前,我们使用Redis流的XADD、XRANGE等命令实现了一个具有基本功能的消息队列,在学习了流的消费者组相关功能之后,是时候使用这些功能对前面的消息队列程序进行扩展了。

代码清单10-2展示了一个为消息队列提供消费者组功能的类实现,这个类可以在消息队列类MessageQueue的基础上,为其提供基于消费者组的消息读取功能,用户只需要使用MessageQueue类向队列中添加消息,然后使用Group类创建出消费者组,就可以通过消费者组方式读取组的消息了。

代码清单10-2 为消息队列提供消费者组功能的Group 类:/stream/group.py
from message_queue import reconstruct_message_list, get_message_from_nested_list

class Group:
    """
    为消息队列提供消费者组功能。
    """

    def __init__(self, client, stream, group):
        self.client = client
        self.stream = stream
        self.group = group

    def create(self, start_id):
        """
        创建消费者组。
        """
        self.client.xgroup_create(self.stream, self.group, start_id)

    def destroy(self):
        """
        删除消费者组。
        """
        self.client.xgroup_destroy(self.stream, self.group)

    def read_message(self, consumer, id, count=10):
        """
        从消费者组中读取消息。
        """
        reply = self.client.xreadgroup(self.group, consumer, {self.stream: id}, count)
        if len(reply) == 0:
            return list()
        else:
            messages = get_message_from_nested_list(reply)
            return reconstruct_message_list(messages)

    def ack_message(self, id):
        """
        确认已处理完毕的消息。
        """
        self.client.xack(self.stream, self.group, id)

    def info(self):
        """
        返回消费者组的相关信息。
        """
        # 因为一个流可以拥有多个消费者组
        # 所以我们需要从命令返回的多个组信息中找到正确的信息
        for group_info in self.client.xinfo_groups(self.stream):
            if group_info['name'] == self.group:
                return group_info
        else:
            return dict()

    def consumer_info(self):
        """
        返回消费者组属下消费者的相关信息。
        """
        return self.client.xinfo_consumers(self.stream, self.group)

    def delete_consumer(self, consumer):
        """
        删除指定消费者。
        """
        self.client.xgroup_delconsumer(self.stream, self.group, consumer)

为了使用消费者组功能,我们需要同时导入消息队列类 MessageQueue 和消费者组类Group,并创建出相应的实例:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> from group import Group
>>> client = Redis(decode_responses=True)
>>> queue = MessageQueue(client, "test_stream")
>>> group = Group(client, "test_stream", "test_group")

之后,我们可以向消息队列中添加消息,并通过消费者组读取消息:

>>> queue.add_message({"k1":"v1"})
'1554181394926-0'
>>> group.create(0)
>>> group.read_message("worker1", ">")
[{'1554181394926-0': {'k1': 'v1'}}]

当处理完消息之后,可以通过 ack_message() 方法对其进行确认:

>>> group.ack_message('1554181394926-0')

或者使用 info() 方法和 consumer_info() 方法查看消费者组和消费者的相关信息:

>>> group.info()
{'name': 'test_group', 'consumers': 1, 'pending': 0, 'last-delivered-id': '1554181394926-0'}
>>> group.consumer_info()
[{'name': 'worker1', 'pending': 0, 'idle': 37209}]