BRPOPLPUSH:阻塞式弹出并推入操作

BRPOPLPUSH 命令是 RPOPLPUSH 命令的阻塞版本,BRPOPLPUSH 命令接受一个源列表、一个目标列表以及一个秒级精度的超时时限作为参数:

BRPOPLPUSH source target timeout

根据源列表是否为空,BRPOPLPUSH 命令会产生以下两种行为:

  • 如果源列表非空,那么 BRPOPLPUSH 命令的行为就和 RPOPLPUSH 命令的行为一样,BRPOPLPUSH 命令会弹出位于源列表最右端的元素,并将该元素推入目标列表的左端,最后向客户端返回被推入的元素。

  • 如果源列表为空,那么 BRPOPLPUSH 命令将阻塞执行该命令的客户端,然后在给定的时限内等待可弹出的元素出现,或者等待时间超过给定时限为止。

举个例子,假设现在有 list3、list4 两个列表:

client-1> LRANGE list3 0 -1
1) "hello"
client-1> LRANGE list4 0 -1
1) "a"
2) "b"
3) "c"

如果我们以这两个列表作为输入执行 BRPOPLPUSH 命令,由于源列表 list3 非空,所以 BRPOPLPUSH 命令将不阻塞直接执行,就像 RPOPLPUSH 命令一样:

client-1> BRPOPLPUSH list3 list4 10
"hello"
client-1> LRANGE list3 0 -1
(empty list or set)
client-1> LRANGE list4 0 -1
1) "hello"
2) "a"
3) "b"
4) "c"

现在,由于 list3 为空,如果我们再次执行相同的 BRPOPLPUSH 命令,那么客户端 client-1 将被阻塞,直到我们从另一个客户端 client-2 向 list3 推入新元素为止:

client-1> BRPOPLPUSH list3 list4 10
"world"
(1.42s) -- 被阻塞了1.42s
client-1> LRANGE list3 0 -1
(empty list or set)
client-1> LRANGE list4 0 -1
1) "world"
2) "hello"
3) "a"
4) "b"
5) "c"
client-2> RPUSH list3 "world"
(integer) 1

表 4-3 展示了客户端从被阻塞到解除阻塞的整个过程。

image 2025 01 03 17 20 17 141
Figure 1. 表4-3 阻塞 BRPOPLPUSH 命令的执行过程

处理源列表为空的情况

如果源列表在用户给定的时限内一直没有元素可供弹出,那么 BRPOPLPUSH 命令将向客户端返回一个空值,以此来表示此次操作没有弹出和推入任何元素:

redis> BRPOPLPUSH empty-list another-list 5
(nil)
(5.05s) -- 客户端被阻塞了5.05s

与 BLPOP 命令和 BRPOP 命令一样,redis-cli 客户端也会显示 BRPOPLPUSH 命令的阻塞时长。

其它信息

  • 复杂度:O(1)。

  • 版本要求:BRPOPLPUSH 命令从 Redis 2.2.0 版本开始可用。

示例:带有阻塞功能的消息队列

在构建应用程序的时候,有时会遇到一些非常耗时的操作,比如发送邮件,将一条新微博同步给上百万个用户,对硬盘进行大量读写,执行庞大的计算等。因为这些操作非常耗时,所以如果我们直接在响应用户请求的过程中执行它们,那么用户就需要等待非常长时间。

例如,为了验证用户身份的有效性,有些网站在注册新用户的时候,会向用户给定的邮件地址发送一封激活邮件,用户只有在点击了验证邮件里面的激活链接之后,新注册的账号才能够正常使用。

下面这段伪代码展示了一个带有邮件验证功能的账号注册函数,这个函数不仅会为用户输入的用户名和密码创建新账号,还会向用户给定的邮件地址发送一封激活邮件:

def register(username, password, email):
    # 创建新账号
    create_new_account(username, password)
    # 发送激活邮件
    send_validate_email(email)
    # 向用户返回注册结果
    ui_print("账号注册成功,请访问你的邮箱并激活账号。")

因为邮件发送操作需要进行复杂的网络信息交换,所以它并不是一个快速的操作,如果我们直接在 send_validate_email() 函数中执行邮件发送操作,那么用户可能就需要等待较长一段时间才能看到 ui_print() 函数打印出的反馈信息。

为了解决这个问题,在执行 send_validate_email() 函数的时候,我们可以不立即执行邮件发送操作,而是将邮件发送任务放入一个队列中,然后由后台的线程负责实际执行。这样,程序只需要执行一个入队操作,就可以直接向用户反馈注册结果了,这比实际地发送邮件之后再向用户反馈结果要快得多。

代码清单4-4 展示了一个使用 Redis 实现的消息队列,它使用 RPUSH 命令将消息推入队列,并使用 BLPOP 命令从队列中取出待处理的消息。

代码清单4-4 使用列表实现的消息队列:/list/message_queue.py
class MessageQueue:

    def __init__(self, client, queue_name):
        self.client = client
        self.queue_name = queue_name

    def add_message(self, message):
        """
        将一条消息放入到队列里面。
        """
        self.client.rpush(self.queue_name, message)

    def get_message(self, timeout=0):
        """
        从队列里面获取一条消息,
        如果暂时没有消息可用,那么就在 timeout 参数指定的时限内阻塞并等待可用消息出现。

        timeout 参数的默认值为 0 ,表示一直等待直到消息出现为止。
        """
        # blpop 的结果可以是 None ,也可以是一个包含两个元素的元组
        # 元组的第一个元素是弹出元素的来源队列,而第二个元素则是被弹出的元素
        result = self.client.blpop(self.queue_name, timeout)
        if result is not None:
            source_queue, poped_item = result
            return poped_item

    def len(self):
        """
        返回队列目前包含的消息数量。
        """
        return self.client.llen(self.queue_name)

为了使用这个消息队列,我们通常需要用到两个客户端:

  • 一个客户端作为消息的发送者(sender),负责将待处理的消息推入队列中。

  • 而另一个客户端作为消息的接收者(receiver)和消费者 (consumer),负责从队列中取出消息,并根据消息内容进行相应的处理工作。

下面的这段代码展示了一个简单的消息接收者,在没有消息的时候,这个程序将阻塞在 mq.get_message() 调用上面;当有消息(邮件地址)出现时,程序就会打印出该消息并发送邮件:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, 'validate user email queue')
>>> while True:
... email_address = mq.get_message() # 阻塞直到消息出现
... send_email(email_address) # 打印出邮件地址并发送邮件
...
peter@exampl.com
jack@spam.com
tom@blahblah.com

以下代码展示了消息发送者是如何将消息推入队列中的:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, 'validate user email queue')
>>> mq.add_message("peter@exampl.com")
>>> mq.add_message("jack@spam.com")
>>> mq.add_message("tom@blahblah.com")
  1. 阻塞弹出操作的应用

    下面展示的消息队列之所以使用 BLPOP 命令而不是 LPOP 命令来实现出队操作,是因为阻塞弹出操作可以让消息接收者在队列为空的时候自动阻塞,而不必手动进行休眠,从而使得消息处理程序的编写变得更为简单直接,并且还可以有效地节约系统资源。

    作为对比,以下代码展示了在使用 LPOP 命令实现出队操作的情况下,如何实现类似上面展示的消息处理程序:

    while True:
        # 尝试获取消息,如果没有消息,那么返回None
        email_address = mq.get_message()
        if email_address is not None:
            # 有消息,发送邮件
            send_email(email_address)
        else:
            # 没有消息可用,休眠100ms之后再试
            sleep(0.1)
  2. 使用消息队列实现实时提醒

    消息队列除了可以在应用程序的内部使用,还可以用于实现面向用户的实时提醒系统。

    比如,如果我们在构建一个社交网站,那么可以使用 JavaScript 脚本,让客户端以异步的方式调用 MessageQueue 类的 get_message() 方法,然后程序就可以在用户被关注的时候、收到了新回复的时候或者收到新私信的时候,通过调用add_message() 方法来向用户发送提醒信息。