带有乐观锁的事务
本书在第2章实现了具有基本获取和释放功能的锁程序,并在第12章为该程序加上了自动释放功能,但是这两个锁程序都有一个问题,那就是它们的释放操作都是不安全的:
-
无论某个客户端是否是锁的持有者,只要它调用release()方法,锁就会被释放。
-
在锁被占用期间,如果某个不是持有者的客户端错误地调用了 release()方法,那么锁将在持有者不知情的情况下释放,并导致系统中同时存在多个锁。
为了解决这个问题,我们需要修改锁实现,给它加上身份验证功能:
-
客户端在尝试获取锁的时候,除了需要输入锁的最大使用时限之外,还需要输入一个代表身份的标识符,当客户端成功取得锁时,程序将把这个标识符存储在代表锁的字符串键中。
-
当客户端调用release()方法时,它需要将自己的标识符传给 release()方法,而release()方法则需要验证客户端传入的标识符与锁键存储的标识符是否相同,以此来判断调用release()方法的客户端是否就是锁的持有者,从而决定是否释放锁。
根据以上描述,我们可能会写出代码清单13-5所示的代码。
class IdentityLock:
def __init__(self, client, key):
self.client = client
self.key = key
def acquire(self, identity, timeout):
"""
尝试获取一个带有身份标识符和最大使用时限的锁,
成功时返回 True ,失败时返回 False 。
"""
result = self.client.set(self.key, identity, ex=timeout, nx=True)
return result is not None
def release(self, input_identity):
"""
根据给定的标识符,尝试释放锁。
返回 True 表示释放成功;
返回 False 则表示给定的标识符与锁持有者的标识符并不相同,释放请求被拒绝。
"""
# 获取锁键储存的标识符
lock_identity = self.client.get(self.key)
if lock_identity is None:
# 如果锁键的标识符为空,那么说明锁已经被释放
return True
elif input_identity == lock_identity:
# 如果给定的标识符与锁键的标识符相同,那么释放这个锁
self.client.delete(self.key)
return True
else:
# 如果给定的标识符与锁键的标识符并不相同
# 那么说明当前客户端不是锁的持有者
# 拒绝本次释放请求
return False
这个锁实现在绝大部分情况下都能够正常运行,但它的 release()方法包含了一个非常隐蔽的错误:在程序使用GET命令获取锁键的值以后,直到程序调用DEL命令删除锁键的这段时间里面,锁键的值有可能已经发生了变化,因此程序执行的DEL命令有可能会导致当前持有者的锁被错误地释放。
举个例子,表13-1就展示了一个锁被错误释放的例子:客户端A是锁原来的持有者,它调用release()方法尝试释放自己的锁,但是当客户端A执行完GET命令并确认自己就是锁的持有者之后,锁键却因为过期而自动被移除了,紧接着客户端B又通过执行acquire()方法成功取得了锁,然而客户端A并未察觉这一变化,它以为自己还是锁的持有者,并调用DEL命令把属于客户端B的锁给释放了。

为了正确地实现release()方法,我们需要一种机制,它可以保证如果锁键的值在GET命令执行之后发生了变化,那么DEL命令将不会被执行。在Redis中,这种机制被称为乐观锁。
本节接下来的内容将对Redis的乐观锁机制进行介绍,并在之后给出一个使用乐观锁实现的正确的、具有身份验证功能的锁。
WATCH:对键进行监视
客户端可以通过执行WATCH命令,要求服务器对一个或多个数据库键进行监视,如果在客户端尝试执行事务之前,这些键的值发生了变化,那么服务器将拒绝执行客户端发送的事务,并向它返回一个空值:
WATCH key [key ...]
与此相反,如果所有被监视的键都没有发生任何变化,那么服务器将会如常地执行客户端发送的事务。
通过同时使用WATCH命令和Redis事务,我们可以构建出一种针对被监视键的乐观锁机制,确保事务只会在被监视键没有发生任何变化的情况下执行,从而保证事务对被监视键的所有修改都是安全、正确和有效的。
以下代码展示了一个因为乐观锁机制而导致事务执行失败的例子:
redis> WATCH user_id_counter
OK
redis> GET user_id_counter -- 获取当前最新的用户ID
"256"
redis> MULTI
OK
redis> SET user::256::email "peter@spamer.com" -- 尝试使用这个ID来存储用户信息
QUEUED
redis> SET user::256::password "topsecret"
QUEUED
redis> INCR user_id_counter -- 创建新的用户ID
QUEUED
redis> EXEC
(nil) -- user_id_counter键已被修改
表13-2展示了这个事务执行失败的具体原因:因为客户端A监视了 user_id_counter 键,而客户端B却在客户端A执行事务之前对该键进行了修改,所以服务器最终拒绝了客户端A的事务执行请求。

其他信息
-
时间复杂度:O(N),其中N为被监视键的数量。
-
版本要求:WATCH命令从Redis 2.2.0版本开始可用。
UNWATCH:取消对键的监视
客户端可以通过执行UNWATCH命令,取消对所有键的监视:
UNWATCH
服务器在接收到客户端发送的UNWATCH命令之后,将不会再对之前 WATCH命令指定的键实施监视,这些键也不会再对客户端发送的事务造成任何影响。
以下代码展示了一个UNWATCH命令的执行示例:
redis> WATCH "lock_key" "user_id_counter" "msg"
OK
redis> UNWATCH -- 取消对以上3个键的监视
OK
除了显式地执行UNWATCH命令之外,使用EXEC命令执行事务和使用 DISCARD命令取消事务,同样会导致客户端撤销对所有键的监视,这是因为这两个命令在执行之后都会隐式地调用UNWATCH命令。
其他信息
-
复杂度:O(N),其中N为被取消监视的键数量。
-
版本要求:UNWATCH命令从Redis 2.2.0版本开始可用。
示例:带有身份验证功能的锁
在了解了乐观锁机制的使用方法之后,现在是时候使用它来实现一个正确的带身份验证功能的锁了。
之前展示的锁实现的问题在于,在GET命令执行之后,直到DEL命令执行之前的这段时间里,锁键的值有可能会发生变化,并出现误删锁键的情况。为了解决这个问题,我们需要使用乐观锁去保证DEL命令只会在锁键的值没有发生任何变化的情况下执行,代码清单13-6展示了修改之后的锁实现。
from redis import WatchError
class IdentityLock:
def __init__(self, client, key):
self.client = client
self.key = key
def acquire(self, identity, timeout):
"""
尝试获取一个带有身份标识符和最大使用时限的锁,
成功时返回 True ,失败时返回 False 。
"""
result = self.client.set(self.key, identity, ex=timeout, nx=True)
return result is not None
def release(self, input_identity):
"""
根据给定的标识符,尝试释放锁。
返回 True 表示释放成功;
返回 False 则表示给定的标识符与锁持有者的标识符并不相同,释放请求被拒绝。
"""
# 开启流水线
pipe = self.client.pipeline()
try:
# 监视锁键
pipe.watch(self.key)
# 获取锁键储存的标识符
lock_identity = pipe.get(self.key)
if lock_identity is None:
# 如果锁键的标识符为空,那么说明锁已经被释放
return True
elif input_identity == lock_identity:
# 如果给定的标识符与锁键储存的标识符相同,那么释放这个锁
# 为了确保 DEL 命令在执行时的安全性,我们需要使用事务去包裹它
pipe.multi()
pipe.delete(self.key)
pipe.execute()
return True
else:
# 如果给定的标识符与锁键储存的标识符并不相同
# 那么说明当前客户端不是锁的持有者
# 拒绝本次释放请求
return False
except WatchError:
# 抛出异常说明在 DEL 命令执行之前,已经有其他客户端修改了锁键
return False
finally:
# 取消对键的监视
pipe.unwatch()
# 因为 redis-py 在执行 WATCH 命令期间,会将流水线与单个连接进行绑定
# 所以在执行完 WATCH 命令之后,必须调用 reset() 方法将连接归还给连接池
pipe.reset()
注意,因为乐观锁的效果只会在同时使用WATCH命令以及事务的情况下产生,所以程序除了需要使用WATCH命令对锁键实施监视之外,还需要将DEL命令包裹在事务中,这样才能确保DEL命令只会在锁键的值没有发生任何变化的情况下执行。
以下代码展示了这个锁实现的使用方法:
>>> from redis import Redis
>>> from identity_lock import IdentityLock
>>> client = Redis(decode_responses=True)
>>> lock = IdentityLock(client, "test-lock")
>>> lock.acquire("peter", 3600) # 使用"peter"作为标识符,获取一个使用时限为3600s的锁
True
>>> lock.release("tom") # 尝试使用错误的标识符去释放锁,失败
False
>>> lock.release("peter") # 使用正确的标识符去释放锁,成功
True
示例:带有身份验证功能的计数信号量
本书前面介绍了如何使用锁去获得一项资源的独占使用权,并给出了几个不同的锁实现,但是除了独占一项资源之外,有时候我们也会想让多个用户共享一项资源,只要共享者的数量不超过我们限制的数量即可。
举个例子,假设我们的系统有一项需要进行大量计算的操作,如果很多用户同时执行这项操作,那么系统的计算资源将会被耗尽。为了保证系统的正常运作,我们可以使用计数信号量来限制在同一时间内能够执行该操作的最大用户数量。
计数信号量(counter semaphore)与锁非常相似,它们都可以限制资源的使用权,但是与锁只允许单个客户端使用资源的做法不同,计数信号量允许多个客户端同时使用资源,只要这些客户端的数量不超过指定的限制即可。
代码清单13-7展示了一个带有身份验证功能的计数信号量实现:
-
这个程序会把所有成功取得信号量的客户端的标识符存储在格式为 semaphore::<name>::holders的集合键中,至于信号量的最大可获取数量则存储在格式为semaphore::<name>::max_size的字符串键中。
-
在使用计数信号量之前,用户需要先通过set_max_size()方法设置计数信号量的最大可获取数量。
-
get_max_size()方法和get_current_size()方法可以分别获取计数信号量的最大可获取数量以及当前已获取数量。
-
获取信号量的acquire()方法是程序的核心:在获取信号量之前,程序会先使用两个GET命令分别获取信号量的当前已获取数量以及最大可获取数量,如果信号量的当前已获取数量并未超过最大可获取数量,那么程序将执行SADD命令,将客户端给定的标识符添加到holders 集合中。
-
由于GET命令执行之后直到SADD命令执行之前的这段时间里,可能会有其他客户端抢先取得了信号量,并导致可用信号量数量发生变化,因此程序需要使用WATCH命令监视holders键,并使用事务包裹SADD命令,以此通过乐观锁机制确保信号量获取操作的安全性。
-
因为max_size键的值也会影响信号量获取操作的执行结果,并且这个键的值在SADD命令执行之前也可能会被其他客户端修改,所以程序在监视holders键的同时,也需要监视max_size键。
-
当客户端想要释放自己持有的信号量时,只需要把自己的标识符传给release()方法即可,release()方法将调用SREM命令,从holders集合中查找并移除客户端给定的标识符。
from redis import WatchError
class Semaphore:
def __init__(self, client, name):
self.client = client
self.name = name
# 用于储存信号量持有者标识符的集合
self.holder_key = "semaphore::{0}::holders".format(name)
# 用于记录信号量最大可获取数量的字符串
self.size_key = "semaphore::{0}::max_size".format(name)
def set_max_size(self, size):
"""
设置信号量的最大可获取数量。
"""
self.client.set(self.size_key, size)
def get_max_size(self):
"""
返回信号量的最大可获取数量。
"""
result = self.client.get(self.size_key)
if result is None:
return 0
else:
return int(result)
def get_current_size(self):
"""
返回目前已被获取的信号量数量。
"""
return self.client.scard(self.holder_key)
def acquire(self, identity):
"""
尝试获取一个信号量,成功时返回 True ,失败时返回 False 。
传入的 identity 参数将被用于标识客户端的身份。
如果调用该方法时信号量的最大可获取数量尚未被设置,那么引发一个 TypeError 。
"""
# 开启流水线
pipe = self.client.pipeline()
try:
# 监视与信号量有关的两个键
pipe.watch(self.size_key, self.holder_key)
# 取得当前已被获取的信号量数量,以及最大可获取的信号量数量
current_size = pipe.scard(self.holder_key)
max_size_in_str = pipe.get(self.size_key)
if max_size_in_str is None:
raise TypeError("Semaphore max size not set")
else:
max_size = int(max_size_in_str)
if current_size < max_size:
# 如果还有剩余的信号量可用
# 那么将给定的标识符放入到持有者集合中
pipe.multi()
pipe.sadd(self.holder_key, identity)
pipe.execute()
return True
else:
# 没有信号量可用,获取失败
return False
except WatchError:
# 获取过程中有其他客户端修改了 size_key 或者 holder_key ,获取失败
return False
finally:
# 取消监视
pipe.unwatch()
# 将连接归还给连接池
pipe.reset()
def release(self, identity):
"""
根据给定的标识符,尝试释放当前客户端持有的信号量。
返回 True 表示释放成功,返回 False 表示由于标识符不匹配而导致释放失败。
"""
# 尝试从持有者集合中移除给定的标识符
result = self.client.srem(self.holder_key, identity)
# 移除成功则说明信号量释放成功
return result == 1
以下代码简单地展示了这个计数信号量的使用方法:
>>> from redis import Redis
>>> from semaphore import Semaphore
>>> client = Redis(decode_responses=True)
>>> semaphore = Semaphore(client, "test-semaphore") # 创建计数信号量
>>> semaphore.set_max_size(3) # 设置信号量的最大可获取数量
>>> semaphore.acquire("peter") # 获取信号量
True
>>> semaphore.acquire("jack")
True
>>> semaphore.acquire("tom")
True
>>> semaphore.acquire("mary") # 可用的3个信号量都已被获取,无法取得更多信号量
False
>>> semaphore.release("jack") # 释放一个信号量
True
>>> semaphore.get_current_size() # 目前有两个信号量已被获取
2
>>> semaphore.get_max_size() # 信号量的最大可获取数量为3个
3