基于 RabbitMQ 的分布式爬虫
前面我们了解了 Scrapy 如何利用 Redis 实现分布式爬虫,可以注意到,当爬取数量过大时,Redis 占用的内存非常大,因此对于数据去重,我们使用了 Bloom Filter 来进行优化,大幅减少了 Redis 的内存占用。
不过,现在我们似乎依然面临一个问题,爬取队列仍旧是基于 Redis 实现的,那它同样会占据非常大的内存呀!其实在一般情况下,Redis 作为分布式爬取队列是完全够用的。但在数据量比较大。比如爬取上亿级别数据时,Redis 消耗的内存也是比较大的,这时候我们可以考将爬取队列进行迁移。
迁移到哪里呢?仔细想想,爬取队列类似一个消息队列,可以先进先出,先进后出、按优先级进出等,只要能满足类似的需求就可以。现如今,消息队列中间件也有很多,如 RabbitMQ、RocketMQ 等,它们都可以用来做爬取队列的实现。
本节我们就选取目前比较流行的 RabbitMQ 来实现一下 Scrapy 分布式爬虫吧!
准备工作
在本书 4.8 节中,我们已经初步了解了 RabbitMQ 的基本原理和使用方法,如果你还不了解 RabbitMQ 是什么,建议先回看一下前面的基础内容。
在本节开始之前,请确保已经正确安装好了 RabbitMQ 和 Python 的 pika 库,具体的安装说明可以参考本书 4.8 节。
对接 Scrapy
RabbitMQ 就是一个消息队列,那它怎么对接 Scrapy 实现分布式爬取呢?通过 Scrapy-Redis 的源码,我们可以知道 Scrapy-Redis 利用 Redis 实现了一个爬取队列,所以同样的原理,我们可以仿照 Scrapy-Redis 的实现,将 Redis 换成 RabbitMQ。
仿照 Scrapy-Redis 的源码,我们先来解决 RabbitMQ 的连接问题,首先定义一个 connection 对象:
import pika
def from_settings(settings):
connection_parameters = settings.get('RABBITMQ_CONNECTION_PARAMETERS', RABBITMQ_CONNECTION_PARAMETERS)
connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters))
channel = connection.channel()
return channel
这里定义了 from_settings 方法, 可以根据全局的 RABBITMQ_CONNECTION_PARAMETERS 来创建一个 RabbitMQ 连接对象, 返回 channel 信息。另外在 Scrapy-Redis 中, 优先级队列是使用有序集合来实现 的, 每个元素都有一个分数值, Redis 可以根据分数来排序, 这样分数越小的就排到前面, 下次就会被优先获取。
那 RabbitMQ 怎么实现优先级队列的功能呢?4.8节我们也学习了,RabbitMQ 已经提供了对优先级队列的支持,需要在声明队列的时候设置 x-max-priority 参数来设定最大的优先级数量,同时在发布消息的时候添加优先级参数。
在这里我们仿照 Scrapy-Redis 的 PriorityQueue 来进行改写,写法如下:
class PriorityQueue(Base):
def __init__(self, self, server, spider, key,
max_priority=SCHEDULER_QUEUE_MAX_PRIORITY,
durable=SCHEDULER_QUEUE_DURABLE_,
force_flush=SCHEDULER_QUEUE_FORCE_FLUSH,
priority_offset=SCHEDULER_QUEUE_PRIORITY_OFFSET):
self.inited = False
self.durable = durable
super(PriorityQueue, self).__init__(server, spider, key)
try:
self.queue_operator = self.server.queue_declare(queue=self.key, arguments={
'x-max-priority': max_priority
}, durable=durable)
logger.debug('Queue operator %s', self.queue_operator)
self.inited = True
except ChannelClosedByBroker as e:
logger.error("You have changed queue configuration, you "
"must delete queue manually or set SCHEDULER_QUEUE_FORCE_FLUSH "
"to True, error detail %s" % str(e.args), exc_info=True)
self.inited = False
self.priority_offset = priority_offset
def __len__(self):
if not hasattr(self, 'queue_operator'):
return 0
return self.queue_operator.method.message_count
def push(self, request):
priority = request.priority + self.priority_offset
if priority < 0:
priority = 0
delivery_mode = 2 if self.durable else None
self.server.basic_publish(
exchange='',
properties=pika.BasicProperties(
priority=priority,
delivery_mode=delivery_mode
),
routing_key=self.key,
body=self._encode_request(request)
)
def pop(self):
method_frame, header, body = self.server.basic_get(queue=self.key, auto_ack=True)
if body:
return self._decode_request(body)
首先对于 __init__ 方法,这里自定义了一些参数,如 durable 代表是否持久化,默认读取了配置 SCHEDULER_QUEUE_DURABLE,其值为 True。另外优先级的最大值 max_priority 默认读取了配置 SCHEDULER_QUEUE_MAX_PRIORITY,其值为 10O。在 __init__ 方法中,最关键的就是 queue_declare 方法,它用来声明一个消息队列,指定了参数 x-max-priority 为 max_priority,代表这是一个支持优先级的队列,最大优先级的数值为 max_priority。
接着对于 push 方法,和前文的样例一样,调用了 basic_publish 方法,不过由于这里支持优先级所以额外传入了 peoperties 对象并指定了 priority。
对于 pop 方法, 则是使用了 basic_get 方法并设置了 auto_ack 参数为 True, 这样便可以从队列中 取出一个当前优先级最高的消息并返回了。另外对于 _decode_request 和 _encode_request 方法, 其原理和 Scrapy-Redis 一样, 这里就不再赘述了。
对于 Scheduler, 基本原理就是将 Queue 对象更换为刚才声明的 PriorityQueue 对象, 同时一些初 始化参数通过 settings 获取即可。
这样我们就成功将爬取队列迁移到 RabbitMQ 里面了。
以上的内容我已经整理发布了一个 Python 包, 叫作 GerapyRabbitMQ, 其 GitHub 链接为: https://github.com/Gerapy/GerapyRabbitMQ , 安装方式也非常的简单, 只需要 pip3 安装即可:
pip3 install gerapy-rabbitmq
接下来我们款基于 GerapyRabbitMQ, 把上一节基于 Redis 的爬取队列迁移到 RabbitMQ 上。
迁移
安装好 GerapyRabbitMQ 包后, 我们需要更改如下配置:
SCHEDULER = "gerapy_rabbitmq.scheduler.Scheduler"
SCHEDULER_QUEUE_KEY = '%(spider)s_requests'
RABBITMQ_CONNECTION_PARAMETERS = {
'host': '192.168.2.3'
}
这里首先需要更改 SCHEDULER, 切换到 GerapyRabbitMQ 里面定义的调度器类, 然后调度队列的 名称格式也可以定义, 这里定义为 SCHEDULER_QUEUE_KEY, 意思是 Spider 名称和 Requests 的组合, 然后 RABBITMQ_CONNECTION_PARAMETERS 就是 RabbitMQ 的连接对象, 其参数可以参考 https://pika.readthedocs.io/en/stable/modules/parameters.html 里面的说明。
|
如果出现连接失败的问题, 是因为默认情况下 RabbitMQ 只允许 Guest 用户使用 localhost 访 问, 要解决这个问题, 请参考 https://rabbitmq.docs.pivotal.io/37/rabbit-web-docs/access-control.html 里面的解决方案。 |
同样地, A、B、C 三台主机都需要修改为同一个 RabbitMQ 地址, 重新运行就可以实现用三台主 机协同爬取了, 分布式爬取就完成了。
具体的运行方式和 16.3 节是一样的, 这里不再赘述。
总结
本节中我们介绍了利用 RabbitMQ 实现分布式爬取的过程,成功将爬取队列由 Redis 更换到了 RabbitMQ 上,解决了 Redis 的内存占用问题。
本节代码参见: https://github.com/Python3WebSpider/ScrapyCompositeDemo/tree/gerapy-rabbitmq ,注意是 gerapy-rabbitmq 分支。
本章的内容到此就结束了。在这一章,我们了解了分布式爬虫的原理,并介绍了 Scrapy 分布式爬虫基于 Redis 的实现以及一些优化方案。有了分布式爬虫的加持,一些超大规模数据量的爬取就可以得到有效解决了。