基于 Bloom Filter 进行大规模去重

首先回顾一下 Scrapy-Redis 的去重机制。Scrapy-Redis 将 Request 的指纹存储到了 Redis 集合中,每个指纹的长度为 40,例如 27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61 就是一个指纹,是一个字符串。

我们计算一下用这种方式耗费的存储空间。每个字符占用 1 字节,即 1B,1 个指纹占用空间为 40B,1 万个指纹占用空间约 400KB,1 亿个指纹占用空间约 4GB。当爬取数量达到上亿级别时,Redis 的占用的内存就会变得很大,而且这仅仅是指纹的存储。Redis 还存储了爬取队列,内存占用会进一步提高,更别说多个 Scrapy 项目同时爬取的情况了。当爬取达到亿级规模时,Scrapy-Redis 提供的集合去重已经不能满足我们的要求了。所以我们需要使用一个更加节省内存的去重算法——BloomFilter。

了解 Bloom Filter

BloomFilter,中文名称是布降隆过滤器,它在 1970 年由 Bloom 提出,可以被用来检测一个元素是否在一个集合中,Bloom Filter 的空间利用率很高,使用它可以大大节省存储空间。Bloom Filter 使用位数组表示一个待检测集合,并可以快速通过概率算法判断一个元素是否在这个集合中。利用这个算法我们可以实现去重效果。

本节我们来了解 Bloom Filter 的基本算法,以及 Scrapy-Redis 中对接 Bloom Filter 的方法。

Bloom Filter 的算法

在 Bloom Filter 中使用位数组来辅助实现检测判断。在初始状态下,我们声明一个包含 m 位的位数组,它的所有位都是 0,如图 16-7 所示。

(图略)

现在我们有了一个待检测集合,表示为 S={x1,x2,…​,xn},接下来需要做的就是检测一个 x 是否已经存在于集合 S 中。在 BloomFilter 算法中,首先使用 k 个相互独立的,随机的散列函数来将这个集合 S 中的每个元素映射到长度为 m 的位数组上,散列函数得到的结果记作位置索引,然后将位数组该位置的索引设置为 1。例如这里我们取 k 为 3,即有 3 个散列函数,x1 经过 3 个散列函数映射得到的结果分别为 1、4、8,x2 经过 3 个散列函数映射得到的结果分别为 4、6、10,那么就会将位数组的 1、4、6、8、10 这 5 个位置设置为 1,如图 16-8 所示。

(图略)

这时如果再有一个新的元素 x,我们要判断它是否属于 S 集合,便会将仍然用 k 个散列函数对 x 求映射结果,如果所有结果对应的位数组位置均为 1,那么就认为 x 属于 S 集合,否则 x 不属于 S 集合。

例如一个新元素 x 经过 3 个散列函数映射的结果为 4,6,8,对应的位置均为 1,则判断 x 属于 S 集合。如果结果为 4、6、7,其中 7 对应的位置为 0,则判定 x 不属于 S 集合。

注意这里 m、n、k 的关系满足 m>kn,也就是说位数组的长度 m 要比集合元素个数 n 和散列函数 k 的乘积还要大。

这样的判定方法很高效,但是也是有代价的,它可能把不属于这个集合的元素误认为属于这个集合,我们来估计一下它的错误率。当集合 S={x1,x2,…​,xn} 的所有元素都被 k 个散列函数映射到 m 位的位数组中时,这个位数组中某一位还是 0 的概率是:

(图略)

因为散列函数是随机的,所以任意一个散列函数选中这一位的概率为 1/m,那么 1-1/m就代表散列函数一次没有选中这一位的概率,要把 S 完全映射到 m 位的位数组中,需要做 kn 次散列运算,所以最后的概率就是 1-1/m 的 kn 次方。

一个不属于 S 的元素 x 如果要被误判定为在 S 中,那么这个概率就是 k 次散列运算得到的结果对应的位数组位置都为 1:所以误判概率为:

(图略)

根据:

(图略)

可以将误判概率转化为:

(图略)

在给定 m、n 时,可以求出使得 f 最小的 k 值为:

(图略)

也就是说,当 k 约等于 m 与比值的 0.7 倍时,使得误判概率最小,这单将误判概率归纳为表 16-1 。

(表略)

可以看到,当 k 值确定日时,随着 m/n 的增大,误判概率遂渐变小。当 m/n 的值确定时,k 越靠近最优 k 值,误判概率越小。另外误判概率总体来看都是极小的,在容忍此误判概率的情况下,大幅减小存储空间和判定速度是完全值得的。

接下来我们就将 Bloom Filter 算法应用到 Scrapy-Redis 分布式爬虫的去重过程中,以解决 Redis 内存不足的问题。

对接 Scrapy-Redis

实现 Bloom Filter 时,我们首先要保证不能破坏 Scrapy-Redis 分布式爬取的运行架构,所以我们需要修改 Scrapy-Redis 的源码,替换它的去重类,同时 Bloom Filter 的实现需要借助一个位数组,既然当前架构还是依赖于 Redis 的,位数组的维护直接使用 Redis 就好了。

首先我们实现一个基本的散列算法,可以将一个值经过散列运算后映射到一个 m 位位数组的某一位上,代码实现如下:

class HashMap(object):
    def __init__(self,m,seed):
        self.m = m
        self.seed = seed

    def hash(self, value):
        ret = 0
        for i in range(len(value)):
            ret += self.seed + ret + ord(value[i])
        return (self.m - 1) & ret

在这里新建了一个 HashMap 类,构造函数传人两个值,一个是 m 位数组的位数,另一个是种子值 seed,不同的散列函数需要有不同的 seed,这样可以保证不同散列函数的结果不会碰撞。

在 hash 方法的实现中,value 是要被处理的内容,在这里我们遍历了该字符的每一位并利用 ord 方法取到了它的 ASCII 码,然后混淆 seed 进行迭代求和运算,最终会得到一个数值。这个数值的结果由 value 和 seed 唯一确定,然后我们再将它和 m 进行按位与运算,即可获取 m 位位数组的映射结果,这样我们就实现了一个由字符串和 seed 来确定的散列函数。当 m 固定时,只要 seed 值相同,就代表是同一个散列函数,相同的 value 必然会映射到相同的位置。所以如果我们想要构造几个不同的散列函数,只需要改变其 seed 就好了,以上便是一个简易的散列函数的实现。

接下来我们再实现 Bloom Filter,Bloom Filter 里面需要用到 k 个散列函数,所以在这里我们需要对这几个散列函数指定相同的 m 值和不同的 seed 值,在这单构造如下:

BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

class BloomFilter(object):
    def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER):
        self.m = 1 << bit
        self.seeds = range(hash_number)
        self.maps = [HashMap(self.m, seed) for seed in self.seeds]
        self.server = server
        self.key = key

由于我们需要完成亿级别数据的去重,即前文介绍的算法中 n 为 1 亿以:散列函数的个数 k 大约取 10 左右的量级,而 m>kn,所以这里 m 保底在 10 亿左右。由于这个数值比较大,所以这里用移位操作来实现:传人位数 bit,将其定义为 30,然后做一个移位操作 1<<30,相当于 2 的 30 次方,等于 1073741824,量级恰好在 10 亿左右。由于是位数组,所以这个位数组占用的大小就是 230bit=128MB,而本文开头我们计算过,Scrapy-Redis 集合去重的占用空间大约在 4G 左右,可见 Bloom Filter 的空间利用效率之高。

随后我们再传入散列函数的个数,用它来生成几个不同的 seed,用不同的 seed 来定义不同的散列函数,这样我们就可以构造一个散列函数列表,遍历 seed,构造带有不同 seed 值的 HashMap 对象,保存成变量 maps 供后续使用。

另外,server 就是 Redis 连接对象,key 就是这个 m 位位数组的名称。

接下来我们就要实现比较关键的两个方法了,一个是判定元素是否重复的方法 exists,另一个是添加元素到集合中的方法 insert,代码实现如下:

首先我们先来介绍 insert 方法,Bloom Filter 算法会逐个调用散列函数,对放入集合中的元素进行运算,得到在 m 位位数组中的映射位置,然后将位数组对应的位置置 1,所以这里在代码中我们遍历了初始化好的散列函数,然后调用其 hash 方法算出映射位置 offset,再利用 Redis 的 setbit 方法将该位置置 1。

在 exists 方法中,我们就需要实现判定是否重复的逻辑了。方法参数 value 为待判断的元素,在这里我们首先定义了一个变量 exist,然后遍历了所有散列函数对 value 进行散列运算,得到映射位置,接着我们用 getbit 方法取得该映射位置的结果,依次进行与运算。这样,只有 getbit 得到的结果都为 1 时,最后的 exist 才为 True,表示 value 属于这个集合。其中只要有一次 getbit 得到的结果为 0。即 m 位位数组中有对应的 0 位,最终的结果 exist 就为 False,代表 value 不属于这个集合。这样,此方法最后的返回结果就是判定重复与否的结果了。

到现在为止 Bloom Filter 的实现已经完成:我们可以用一个实例来测试一下,代码如下:

在这里我们首先定义了一个 Redis 连接对象,然后传递给 BloomFilter,为了避免内存占用过大,这里传的位数比较小,设置为 5,散列函数的个数设置为 6。

首先我们调用 insert 方法插入了 Hello 和 World 两个字符串,随后判断了一下 Hello 和 Python 这两个字符串是否存在,最后输出它的结果,运行结果如下:

True
False

很明显,结果完全没有问题,这样我们就借助于 Redis 成功实现了 Bloom Filter 算法。

下面我们需要继续修改 Scrapy-Redis 的源码,将它的去重逻辑替换为 Bloom Filter 的逻辑,在这里主要是修改 RFPDupeFilter 类的 request_seen 方法,实现如下:

首先还是利用 request_fingerprint 方法获取 Request 的指纹,然后调用 Bloom Filter 的 exists 方法判定该指纹是否存在。如果存在,证明该 Request 是重复的,返回 True;否则调用 Bloom Filter 的 insert 方法将该指纹添加并返回 False。这样就成功利用 Bloom Filter 替换了 Scrapy-Redis 的集合去重。

对于 Bloom Filter 的初始化定义,我们可以将 __init__ 方法修改为如下内容:

其中 bit 和 hash_number 需要使用 from_settings 方法传递,修改如下:

其中常量 DUPEFILTER_DEBUG 和 BLOOMFILTER_BIT 统一定义在 defaults.py 中,默认如下

BLOOMFILTER_HASH_NUMBER = 6
BLOOMEILTER_BIT = 30

到此为止,我们就成功实现了 Bloom Filter 和 Scrapy-Redis 的对接。

使用

测试

案例集成

总结

以上便是 Bloom Filter 的原理及对接实现,使用 Bloom Filter 可以大大节省 Redis 内存,在数据量大的情况下推荐使用此方案。

本节代码参见: https://github.com/Python3WebSpider/ScrapyCompositeDemo/tree/scrapy-redis-bloomfilter ,注意是 scrapy-redis-bloomfilter 分支。