基于 RabbitMQ 的分布式爬虫

前面我们了解了 Scrapy 如何利用 Redis 实现分布式爬虫,可以注意到,当爬取数量过大时,Redis 占用的内存非常大,因此对于数据去重,我们使用了 Bloom Filter 来进行优化,大幅减少了 Redis 的内存占用。

不过,现在我们似乎依然面临一个问题,爬取队列仍旧是基于 Redis 实现的,那它同样会占据非常大的内存呀!其实在一般情况下,Redis 作为分布式爬取队列是完全够用的。但在数据量比较大。比如爬取上亿级别数据时,Redis 消耗的内存也是比较大的,这时候我们可以考将爬取队列进行迁移。

迁移到哪里呢?仔细想想,爬取队列类似一个消息队列,可以先进先出,先进后出、按优先级进出等,只要能满足类似的需求就可以。现如今,消息队列中间件也有很多,如 RabbitMQ、RocketMQ 等,它们都可以用来做爬取队列的实现。

本节我们就选取目前比较流行的 RabbitMQ 来实现一下 Scrapy 分布式爬虫吧!

准备工作

在本书 4.8 节中,我们已经初步了解了 RabbitMQ 的基本原理和使用方法,如果你还不了解 RabbitMQ 是什么,建议先回看一下前面的基础内容。

在本节开始之前,请确保已经正确安装好了 RabbitMQ 和 Python 的 pika 库,具体的安装说明可以参考本书 4.8 节。

对接 Scrapy

RabbitMQ 就是一个消息队列,那它怎么对接 Scrapy 实现分布式爬取呢?通过 Scrapy-Redis 的源码,我们可以知道 Scrapy-Redis 利用 Redis 实现了一个爬取队列,所以同样的原理,我们可以仿照 Scrapy-Redis 的实现,将 Redis 换成 RabbitMQ。

仿照 Scrapy-Redis 的源码,我们先来解决 RabbitMQ 的连接问题,首先定义一个 connection 对象:

import pika

def from_settings(settings):
    connection_parameters = settings.get('RABBITMQ_CONNECTION_PARAMETERS', RABBITMQ_CONNECTION_PARAMETERS)
    connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters))
    channel = connection.channel()
    return channel

这里定义了 from_settings 方法, 可以根据全局的 RABBITMQ_CONNECTION_PARAMETERS 来创建一个 RabbitMQ 连接对象, 返回 channel 信息。另外在 Scrapy-Redis 中, 优先级队列是使用有序集合来实现 的, 每个元素都有一个分数值, Redis 可以根据分数来排序, 这样分数越小的就排到前面, 下次就会被优先获取。

那 RabbitMQ 怎么实现优先级队列的功能呢?4.8节我们也学习了,RabbitMQ 已经提供了对优先级队列的支持,需要在声明队列的时候设置 x-max-priority 参数来设定最大的优先级数量,同时在发布消息的时候添加优先级参数。

在这里我们仿照 Scrapy-Redis 的 PriorityQueue 来进行改写,写法如下:

class PriorityQueue(Base):

    def __init__(self, self, server, spider, key,
            max_priority=SCHEDULER_QUEUE_MAX_PRIORITY,
            durable=SCHEDULER_QUEUE_DURABLE_,
            force_flush=SCHEDULER_QUEUE_FORCE_FLUSH,
            priority_offset=SCHEDULER_QUEUE_PRIORITY_OFFSET):
        self.inited = False
        self.durable = durable
        super(PriorityQueue, self).__init__(server, spider, key)
        try:
            self.queue_operator = self.server.queue_declare(queue=self.key, arguments={
                'x-max-priority': max_priority
            }, durable=durable)
            logger.debug('Queue operator %s', self.queue_operator)
            self.inited = True
        except ChannelClosedByBroker as e:
            logger.error("You have changed queue configuration, you "
                         "must delete queue manually or set SCHEDULER_QUEUE_FORCE_FLUSH "
                         "to True, error detail %s" % str(e.args), exc_info=True)
            self.inited = False
        self.priority_offset = priority_offset

    def __len__(self):
        if not hasattr(self, 'queue_operator'):
            return 0
        return self.queue_operator.method.message_count

    def push(self, request):
        priority = request.priority + self.priority_offset
        if priority < 0:
            priority = 0
        delivery_mode = 2 if self.durable else None
        self.server.basic_publish(
            exchange='',
            properties=pika.BasicProperties(
                priority=priority,
                delivery_mode=delivery_mode
            ),
            routing_key=self.key,
            body=self._encode_request(request)
        )

    def pop(self):
        method_frame, header, body = self.server.basic_get(queue=self.key, auto_ack=True)
        if body:
            return self._decode_request(body)

首先对于 __init__ 方法,这里自定义了一些参数,如 durable 代表是否持久化,默认读取了配置 SCHEDULER_QUEUE_DURABLE,其值为 True。另外优先级的最大值 max_priority 默认读取了配置 SCHEDULER_QUEUE_MAX_PRIORITY,其值为 10O。在 __init__ 方法中,最关键的就是 queue_declare 方法,它用来声明一个消息队列,指定了参数 x-max-priority 为 max_priority,代表这是一个支持优先级的队列,最大优先级的数值为 max_priority。

接着对于 push 方法,和前文的样例一样,调用了 basic_publish 方法,不过由于这里支持优先级所以额外传入了 peoperties 对象并指定了 priority。

对于 pop 方法, 则是使用了 basic_get 方法并设置了 auto_ack 参数为 True, 这样便可以从队列中 取出一个当前优先级最高的消息并返回了。另外对于 _decode_request 和 _encode_request 方法, 其原理和 Scrapy-Redis 一样, 这里就不再赘述了。

对于 Scheduler, 基本原理就是将 Queue 对象更换为刚才声明的 PriorityQueue 对象, 同时一些初 始化参数通过 settings 获取即可。

这样我们就成功将爬取队列迁移到 RabbitMQ 里面了。

以上的内容我已经整理发布了一个 Python 包, 叫作 GerapyRabbitMQ, 其 GitHub 链接为: https://github.com/Gerapy/GerapyRabbitMQ , 安装方式也非常的简单, 只需要 pip3 安装即可:

pip3 install gerapy-rabbitmq

接下来我们款基于 GerapyRabbitMQ, 把上一节基于 Redis 的爬取队列迁移到 RabbitMQ 上。

迁移

安装好 GerapyRabbitMQ 包后, 我们需要更改如下配置:

SCHEDULER = "gerapy_rabbitmq.scheduler.Scheduler"
SCHEDULER_QUEUE_KEY = '%(spider)s_requests'

RABBITMQ_CONNECTION_PARAMETERS = {
'host': '192.168.2.3'
}

这里首先需要更改 SCHEDULER, 切换到 GerapyRabbitMQ 里面定义的调度器类, 然后调度队列的 名称格式也可以定义, 这里定义为 SCHEDULER_QUEUE_KEY, 意思是 Spider 名称和 Requests 的组合, 然后 RABBITMQ_CONNECTION_PARAMETERS 就是 RabbitMQ 的连接对象, 其参数可以参考 https://pika.readthedocs.io/en/stable/modules/parameters.html 里面的说明。

如果出现连接失败的问题, 是因为默认情况下 RabbitMQ 只允许 Guest 用户使用 localhost 访 问, 要解决这个问题, 请参考 https://rabbitmq.docs.pivotal.io/37/rabbit-web-docs/access-control.html 里面的解决方案。

同样地, A、B、C 三台主机都需要修改为同一个 RabbitMQ 地址, 重新运行就可以实现用三台主 机协同爬取了, 分布式爬取就完成了。

具体的运行方式和 16.3 节是一样的, 这里不再赘述。

总结

本节中我们介绍了利用 RabbitMQ 实现分布式爬取的过程,成功将爬取队列由 Redis 更换到了 RabbitMQ 上,解决了 Redis 的内存占用问题。

本节代码参见: https://github.com/Python3WebSpider/ScrapyCompositeDemo/tree/gerapy-rabbitmq ,注意是 gerapy-rabbitmq 分支。

本章的内容到此就结束了。在这一章,我们了解了分布式爬虫的原理,并介绍了 Scrapy 分布式爬虫基于 Redis 的实现以及一些优化方案。有了分布式爬虫的加持,一些超大规模数据量的爬取就可以得到有效解决了。