XINFO:查看流和消费者组的相关信息
Redis向用户提供了XINFO命令用于查看流及其消费者组的相关信息,该命令提供了多个具备不同功能的子命令,接下来将分别对这些子命令进行介绍。
打印消费者信息
XINFO CONSUMERS命令用于打印指定消费者组的所有消费者,以及这些消费者的相关信息:
XINFO CONSUMERS stream group-name
命令打印的信息包括消费者的名字、它们正在处理的消息数量以及消费者的闲置时长。
以下是一个XINFO CONSUMERS命令的使用示例:
redis> XINFO CONSUMERS cgs all-message
1) 1) name -- 消费者的名字
2) "worker1"
3) pending -- 正在处理的消息数量
4) (integer) 1
5) idle -- 毫秒格式的闲置时长
6) (integer) 50899
2) 1) name
2) "worker2"
3) pending
4) (integer) 0
5) idle
6) (integer) 7371
这个命令调用返回了两个消费者,分别是worker1和worker2,其中 worker1正在处理一条消息,并且它的闲置时长为50899ms。
打印消费者组信息
XINFO GROUPS命令用于打印与给定流相关联的所有消费者组,以及这些消费者组的相关信息:
XINFO GROUPS stream
命令打印的信息包括消费者组的名字、它拥有的消费者数量、组中正在处理消息的数量以及该组最后递送消息的ID。
以下是一个XINFO GROUPS命令的使用示例:
redis> XINFO GROUPS cgs
1) 1) name -- 组名
2) "all-message"
3) consumers -- 消费者数量
4) (integer) 2
5) pending -- 组中正在处理的消息数量
6) (integer) 1
7) last-delivered-id -- 最后递送消息的ID
8) 1532339991221-0
这个命令调用返回了流cgs目前唯一的一个消费者组all-message的相关信息,从这些信息可知,该组目前拥有两个消费者,组中正在处理的消息数量为1个,而该组最后递送的消息的ID为1532339991221-0。
打印流消息
XINFO STREAM命令用于打印给定流的相关信息:
XINFO STREAM stream
命令打印的信息包括流的长度(包含的消息数量)、流在底层的基数树表示的相关信息、流相关的消费者组数量、流最后生成的消息的ID以及流的第一个节点和最后一个节点。
以下是对cgs流执行XINFO STREAM命令的结果:
redis> XINFO STREAM cgs
1) length -- 长度
2) (integer) 1
3) radix-tree-keys -- 基数树的键数量
4) (integer) 1
5) radix-tree-nodes -- 基数树的节点数量
6) (integer) 2
7) groups -- 与之相关联的消费者组数量
8) (integer) 1
9) last-generated-id -- 最后生成的消息的ID
10) 1532339991221-0
11) first-entry -- 流的第一个节点
12) 1) 1532339991221-0
2) 1) "msg"
2) "initial message"
13) last-entry -- 流的第二个节点
14) 1) 1532339991221-0
2) 1) "msg"
2) "initial message"
从命令打印出的信息可以看到,cgs流目前的长度为1,它的底层表示基数树包含一个键和两个节点,它有一个相关联的消费者组,它最后生成的消息的ID为1532339991221-0,并且这个消息也是这个流的第一个和最后一个消息。
其他信息
-
复杂度:XINFO CONSUMERS命令的复杂度为O(N),其中N为给定消费者组的消费者数量;XINFO GROUPS命令的复杂度为O(M),其中M为给定流属下的消费者组数量;XINFO STREAM命令的复杂度为O(1)。
-
版本要求:XINFO CONSUMERS、XINFO GROUPS和XINFO STREAM这3条命令从Redis5.0.0版本开始可用。
示例:为消息队列提供消费者组功能
在稍早之前,我们使用Redis流的XADD、XRANGE等命令实现了一个具有基本功能的消息队列,在学习了流的消费者组相关功能之后,是时候使用这些功能对前面的消息队列程序进行扩展了。
代码清单10-2展示了一个为消息队列提供消费者组功能的类实现,这个类可以在消息队列类MessageQueue的基础上,为其提供基于消费者组的消息读取功能,用户只需要使用MessageQueue类向队列中添加消息,然后使用Group类创建出消费者组,就可以通过消费者组方式读取组的消息了。
from message_queue import reconstruct_message_list, get_message_from_nested_list
class Group:
"""
为消息队列提供消费者组功能。
"""
def __init__(self, client, stream, group):
self.client = client
self.stream = stream
self.group = group
def create(self, start_id):
"""
创建消费者组。
"""
self.client.xgroup_create(self.stream, self.group, start_id)
def destroy(self):
"""
删除消费者组。
"""
self.client.xgroup_destroy(self.stream, self.group)
def read_message(self, consumer, id, count=10):
"""
从消费者组中读取消息。
"""
reply = self.client.xreadgroup(self.group, consumer, {self.stream: id}, count)
if len(reply) == 0:
return list()
else:
messages = get_message_from_nested_list(reply)
return reconstruct_message_list(messages)
def ack_message(self, id):
"""
确认已处理完毕的消息。
"""
self.client.xack(self.stream, self.group, id)
def info(self):
"""
返回消费者组的相关信息。
"""
# 因为一个流可以拥有多个消费者组
# 所以我们需要从命令返回的多个组信息中找到正确的信息
for group_info in self.client.xinfo_groups(self.stream):
if group_info['name'] == self.group:
return group_info
else:
return dict()
def consumer_info(self):
"""
返回消费者组属下消费者的相关信息。
"""
return self.client.xinfo_consumers(self.stream, self.group)
def delete_consumer(self, consumer):
"""
删除指定消费者。
"""
self.client.xgroup_delconsumer(self.stream, self.group, consumer)
为了使用消费者组功能,我们需要同时导入消息队列类 MessageQueue 和消费者组类Group,并创建出相应的实例:
>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> from group import Group
>>> client = Redis(decode_responses=True)
>>> queue = MessageQueue(client, "test_stream")
>>> group = Group(client, "test_stream", "test_group")
之后,我们可以向消息队列中添加消息,并通过消费者组读取消息:
>>> queue.add_message({"k1":"v1"})
'1554181394926-0'
>>> group.create(0)
>>> group.read_message("worker1", ">")
[{'1554181394926-0': {'k1': 'v1'}}]
当处理完消息之后,可以通过 ack_message() 方法对其进行确认:
>>> group.ack_message('1554181394926-0')
或者使用 info() 方法和 consumer_info() 方法查看消费者组和消费者的相关信息:
>>> group.info()
{'name': 'test_group', 'consumers': 1, 'pending': 0, 'last-delivered-id': '1554181394926-0'}
>>> group.consumer_info()
[{'name': 'worker1', 'pending': 0, 'idle': 37209}]