PFCOUNT:返回集合的近似基数

在使用 PFADD 命令对元素进行计数之后,用户可以通过执行 PFCOUNT 命令来获取 HyperLogLog 为集合计算出的近似基数:

PFCOUNT hyperloglog [hyperloglog ...]

比如,通过执行以下命令,我们可以获取到 alphabets 这个 HyperLogLog 计算出的近似基数:

redis> PFCOUNT alphabets
(integer) 3

PFCOUNT 命令的返回值为 3,这表示 HyperLogLog 算法认为 alphabets 目前已经计数过 3 个不同的元素。

另外,当用户给定的 HyperLogLog 不存在时,PFCOUNT 命令将返回 0 作为结果:

redis> PFCOUNT not-exists-hyperloglog
(integer) 0

返回并集的近似基数

当用户向 PFCOUNT 传入多个 HyperLogLog 时,PFCOUNT 命令将对所有给定的 Hyper-LogLog 执行并集计算,然后返回并集 HyperLogLog 计算出的近似基数。

比如,我们可以创建两个 HyperLogLog,并分别使用这两个 HyperLogLog 去对两组字母进行计数:

redis> PFADD alphabets1 "a" "b" "c"
(integer) 1
redis> PFADD alphabets2 "c" "d" "e"
(integer) 1

然后使用以下 PFCOUNT 命令获取这两个 HyperLogLog 进行并集计算之后得出的近似基数:

redis> PFCOUNT alphabets1 alphabets2
(integer) 5

对多个 HyperLogLog 执行并集计算的效果与多个集合首先执行并集计算,然后再使用 HyperLogLog 去计算并集集合的近似基数的效果类似。

比如,上面的 PFCOUNT 命令就类似于以下这两条命令:

redis> PFADD temp-hyperloglog "a" "b" "c" "c" "d" "e"
(integer) 1
redis> PFCOUNT temp-hyperloglog
(integer) 5

其它信息

  • 复杂度:O(N),其中 N 为用户给定的 HyperLogLog 数量。

  • 版本要求:PFCOUNT 命令从 Redis 2.8.9 版本开始可用。

示例:优化唯一计数器

为了解决本章开头提到的唯一计数器的内存占用问题,我们可以使用 HyperLogLog 重新实现唯一计数器:无论被计数的元素有多少个,使用 HyperLogLog 实现的唯一计数器的内存开销总是固定的,并且因为每个 HyperLogLog 只占用 12KB 内存,所以即使程序使用多个 HyperLogLog,也不会带来明显的内存开销。

代码清单7-1展示了使用 HyperLogLog 重新实现的唯一计数器。

代码清单7-1 使用 HyperLogLog 实现的唯一计数器:/hyperloglog/unique_counter.py

class UniqueCounter:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def count_in(self, item):
        """
        对给定元素进行计数。
        """
        self.client.pfadd(self.key, item)

    def get_result(self):
        """
        返回计数器的值。
        """
        return self.client.pfcount(self.key)

以下代码展示了这个唯一计数器的使用方法:

>>> from redis import Redis
>>> from unique_counter import UniqueCounter
>>> client = Redis(decode_responses=True)
>>> counter = UniqueCounter(client, 'unique-ip-counter') # 创建一个唯一IP计数器
>>> counter.count_in('1.1.1.1') # 对3个IP进行计数
>>> counter.count_in('2.2.2.2')
>>> counter.count_in('3.3.3.3')
>>> counter.get_result() # 获取计数结果
3
>>> counter.count_in('3.3.3.3') # 尝试输入一个已经计数过的IP
>>> counter.get_result() # 计数器的结果没有发生变化
3

通过使用 HyperLogLog 实现的唯一计数器去取代使用集合实现的唯一计数器,可以大幅降低存储唯一访客 IP 所需的内存数量,表7-2展示了这一点。

image 2025 01 03 21 31 40 116
Figure 1. 表7-2 使用HyperLogLog实现的唯一计数器在记录唯一

与集合实现的唯一计数器相比,使用 HyperLogLog 实现的唯一计数器并不会因为被计数元素的增多而变大,因此它无论是对 10 万个、100 万个还是 1000 万个唯一 IP 进行计数,计数器消耗的内存数量都不会发生变化。与此同时,这个新计数器即使在每天唯一 IP 数量达到 1000 万个的情况下,记录一年的唯一 IP 数量也只需要 4.32MB 内存,这比同等情况下使用集合去实现唯一计数器所需的内存要少得多。

示例:检测重复信息

在构建应用程序的过程中,我们经常需要与广告等垃圾信息做斗争。因为垃圾信息的发送者通常会使用不同的账号、在不同的地方发送相同的垃圾信息,所以寻找垃圾信息的一种比较简单、有效的方法就是找出那些重复的信息:如果两个不同的用户发送了完全相同的信息,或者同一个用户重复地发送了多次完全相同的信息,那么这些信息很有可能就是垃圾信息。

判断两段信息是否相同并不是一件容易的事情,如果使用一般的字符串比对函数(比如 strcmp)来完成这一操作,那么每当有用户尝试执行信息发布操作时,程序就需要执行复杂度为 O(N*M) 的比对操作,其中 N 为信息的长度,M 为系统目前已有的信息数量。

不难看出,随着系统存储的信息越来越多,这种比对操作将变得越来越慢,最终成为系统的瓶颈。

为了降低鉴别重复信息所需的复杂度,我们可以使用 HyperLogLog 来记录所有已发送的信息——每当用户发送一条信息时,程序就使用 PFADD 命令将这条信息添加到 HyperLogLog 中:

  • 如果命令返回 1,那么这条信息就是未出现过的新信息。

  • 如果命令返回 0,那么这条信息就是已经出现过的重复信息。

因为 HyperLogLog 使用的是概率算法,所以即使信息的长度非常长,HyperLogLog 判断信息是否重复所需的时间也非常短。另外,因为 HyperLogLog 并不会随着被计数信息的增多而变大,所以程序可以把所有需要检测的信息都记录到同一个 HyperLogLog 中,这使得实现重复信息检测程序所需的空间极大地减少。代码清单7-2展示了这个使用 HyperLogLog 实现的重复信息检测程序。

代码清单7-2 使用 HyperLogLog 实现的重复信息检测程序:/hyperloglog/duplicate_checker.py
class DuplicateChecker:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def is_duplicated(self, content):
        """
        在信息重复时返回 True ,未重复时返回 False 。
        """
        return self.client.pfadd(self.key, content) == 0

    def unique_count(self):
        """
        返回检查器已经检查过的非重复信息数量。
        """
        return self.client.pfcount(self.key)

以下代码展示了如何使用 DuplicateChecker 程序检测并发现重复的信息:

>>> from redis import Redis
>>> from duplicate_checker import DuplicateChecker
>>> client = Redis(decode_responses=True)
>>> checker = DuplicateChecker(client, 'duplicate-message-checker')
>>> checker.is_duplicated("hello world!") # 输入一些非重复信息
False
>>> checker.is_duplicated("good morning!")
False
>>> checker.is_duplicated("bye bye")
False
>>> checker.unique_count() # 查看目前非重复信息的数量
3
>>> checker.is_duplicated("hello world!") # 发现重复信息
True

检测重复信息这个问题实际上就是算法中的 “去重问题”,因此其他去重问题也可以使用 DuplicateChecker 程序中展示的方法来解决。