Scrapy-Redis 原理和源码解析
Scrapy-Redis 库已经为我们提供了 Scrapy 分布式的队列、调度器、去重等功能,其 GitHub 地址为: https://github.com/rmax/scrapy-redis 。
本节我们深人了解一下,如何利用 Redis 实现 Scrapy 分布式。
获取源码
可以把源码克隆下来,执行如下命令:
git clone https://github.com/imax/scrapy-redis.git
核心源码在 scrapy-redis/src/scrapy_redis 目录下。
爬取队列
从爬取队列人手,看看它的具体实现。源码文件为 queue.py,它有 3 个队列的实现,首先它实现了一个父类 Base、提供一些基本方法和属性,代码如下所示:
首先看一下 _encode_request 和 _decode_request 方法,因为我们需要把一个 Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列化转成字符串再存储,而这两个方法分别是序列化和反序列化的操作,这个过程可以利用 pickle 库来实现。一般在调用 push 方法将 Request 存入数据库时,会调用 _encode_request 方法进行序列化,在调用 pop 取出 Request 的时候,会调用 _decode_request 进行反序列化。
在父类中 __len__
、push 和 pop 方法都是未实现的,会直接抛出 NotImplementedError,因此这个类是不能直接被使用的,必须实现一个子类来重写这 3 个方法,而不同的子类就会有不同的实现,也就有着不同的功能。
接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有 3 个子类的实现:它们分别是 FifoQueue,PriorityQueue,LifoQueue,我们分别来看一下它们的实现原理。
首先是 FifoQueue:
可以看到这个类继承了 Base 类,并重写了 __len__
、push、pop。在这 3 个方法中,都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作。可以看到这里的操作方法有 llen,lpush,rpop 等,这就代表此爬取队列使用了 Redis 的列表。序列化后的 Request 会被存人列表,就是列表的其中一个元素;__len__
方法是获取列表的长度;push 方法中调用了 lpush 操作,这代表从列表左侧存入数据;pop 方法中调用了 rpop 操作,这代表从列表侧取出数据。
Request 在列表中的存取顺序是左侧进,右侧出,这是有序的进出,即先进先出(first input first output-FIFO),此类的名称就叫作 FifoQueue。
还有一个与之相反的实现类,叫作 LifoQueue,代码实现如下:
与 FifoQueue 不同的是,它的 pop 方法在这里使用的是 lpop 操作,也就是从左侧出,而 push 方法依然是使用的 lpush 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出(last in first out-LIFO),此类名称就叫作LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。
在源码中还有一个子类叫作 PriorityQueue,顾名思义,它是优先级队列,代码实现如下:
在这里我们可以看到 __len__
、push、pop 方法中使用了 server 对象的 zcard、zadd、zrange 操作,可以知道这里使用的存储结果是有序集合,在这个集合中,每个元素都可以设置一个分数,这个分数就代表优先级。
__len__
方法调用了 zcard 操作,返回的就是有序集合的大小,也就是爬取队列的长度。在 push 方法中调用了 zadd 操作,就是向集合中添加元素,这里的分数指定成 Request 优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。pop 方法首先调用了 zrange 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank 操作将这个元素删除,这样就完成了取出并删除的操作。
此队列是默认使用的队列,也就是爬取队列默认使用有序集合来存储。
去重过滤
前面说过,Scrapy 的去重是利用集合来实现的,而 Scrapy 分布式中的去重需要利用共享的集合,这里使用的是 Redis 中的集合数据结构。我们来看一看去重类是怎样实现的。
源码文件是 dupefilter.py,其内实现了一个 RFPDupeFilter 类,代码如下所示:
这里同样实现了一个 request_seen 方法,与 Scrapy 中的 request_seen 方法实现极其类似。不过这里集合使用的是 server 对象的 sadd 操作,也就是集合不再是一个简单数据结构了,而是直接换成了数据库的存储方式。
鉴别重复的方式还是使用指纹,指纹同样是依靠 request_fingerprint 方法来获取的。获取指纹之后直接向集合添加指纹,如果添加成功,说明这个指纹原本不存在于集合中,返回值为 1。代码最后的返回结果是判定添加结果是否为 0,如果刚才的返回值为 1,那么这个判定结果就是 False,也就是不重复,否则判定为重复。
这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。
调度器
Scrapy-Redis 还帮我们实现了配合 Queue、DupeFilter 使用的调度器 Scheduler,源文件名称是 Scheduler.py。我们可以指定一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除。我们可以在 settings.py 里自由配置,而此调度器很好地实现了对接。
接下来我们看两个核心的存取方法,代码如下所示:
enqueue_request 可以向队列中添加 Request,核心操作就是调用 Queue 的 push 操作,还有一些统计和日志操作。next_request 就是从队列中取 Request,核心操作就是调用 Queue 的 pop 操作,此时如果队列中还有 Request,则 Request 会直接取出来,爬取继续;如果队列为空,则爬取会重新开始。
总结
目前,我们把之前说的 3 个分布式的问题解决了,总结如下。
-
爬取队列的实现:这里提供了 3 种队列,使用 Redis 的列表或有序集合来维护。
-
去重的实现:这里使用 Redis 的集合来保存 Request 指纹,以提供重复过滤。
-
中断后重新爬取的实现:中断后 Redis 的队列没有清空,再次启动时调度器的 next_request 会从队列中取到下一个 Request,继续爬取。
以上内容便是 Scrapy-Redis 的核心源码解析。Scrapy-Redis 中还提供了 Spider,Item Pipeline 的实现,不过它们并不是必须使用的。
在下一节,我们会将 Scrapy-Redis 集成到之前所实现的 Scrapy 项目中,实现多台主机协同爬取。