爬虫和中间件的变化

为了构建该系统,我们需要稍微对 Scrapy 爬虫进行修改,并且需要开发爬虫中间件。更具体地说,我们必须执行如下操作:

  • 调整索引页爬取,以最大速率执行;

  • 编写中间件,分批发送URL到Scrapyd服务器;

  • 使用相同中间件,允许在启动时使用批量URL。

我们将尝试使用尽可能小的改动来实现这些变化。理想情况下,整个操作应该清晰、易理解并且对其依赖的爬虫代码透明。这应该是一个基础架构层级的需求,如果想对爬虫(可能数百个)进行修改来实现它则是一个坏主意。

索引页分片爬取

我们的第一步是优化索引页爬取,使其尽可能更快。在开始之前,先来设置一些期望。假设爬虫爬取并发量是 16,并且我们测量得到其与源网站服务器的延迟大约为 0.25 秒。此时得到的吞吐量最多为 16 / 0.25 = 64 页/秒。索引页数量为 50000个详情页 / 每个索引页 30 个详情页链接 =1667索引页。因此,我们期望索引页下载花费的时间大约为 1667 / 64 = 26 秒多一点。

让我们以第 3 章中名为 easy 的爬虫开始。先把执行垂直抓取的 Rule 注释掉(callback='parse_item' 的那个),因为现在只需要爬取索引页。

你可以在 GitHub 中获取到本书的全部代码。下载该代码,可以访问:git clonehttps://github.com/scalingexcellence/scrapybook。

本章中的完整代码位于 ch11 目录当中。

如果我们在进行任何优化之前对 scrapy crawl 只爬取 10 个页面的情况进行计时,可以得到如下结果。

$ ls
properties scrapy.cfg
$ pwd
/root/book/ch11/properties
$ time scrapy crawl easy -s CLOSESPIDER_PAGECOUNT=10
...
DEBUG: Crawled (200) <GET ...index_00000.html> (referer: None)
DEBUG: Crawled (200) <GET ...index_00001.html> (referer: ...index_00000.html)
...
real 0m4.099s

如果 10 个页面就花费了 4 秒时间,那么就不可能在 26 秒时间内完成 1,700 个页面。通过查看日志,我们发现每个页面都来自于前一个页面的下一页链接,也就是说在任意时刻都只有至多一个页面正在执行爬取。我们的有效并发为 1。我们希望并行处理,得到想要的并发数量(16 个并发请求)。我们将对索引分片,并允许一些额外的分片,以确保爬虫中的 URL 不会不足。我们将会把索引分为 20 个段。实际上,任何超过 16 的数值都能够增加速度,不过在超过 20 之后所得到的回报呈递减趋势。我们将通过如下表达式计算每个分片的起始索引 ID。

>>> map(lambda x: 1667 * x / 20, range(20))
[0, 83, 166, 250, 333, 416, 500, ... 1166, 1250, 1333, 1416, 1500, 1583]

因此,我们使用如下代码设置 start_urls 。

start_urls = ['http://web:9312/properties/index_%05d.html' % id
       for id in map(lambda x: 1667 * x / 20, range(20))]

这可能和你的索引有很大的不同,因此我们没必要在此处实现得更漂亮。如果还设定了并发设置( CONCURRENT_REQUESTS、CONCURRENT_REQUESTS_PER_DOMAIN )为 16,那么当运行爬虫时,将会得到如下结果。

$ time scrapy crawl easy -s CONCURRENT_REQUESTS=16 -s CONCURRENT_REQUESTS_PER_DOMAIN=16
...
real 0m32.344s

该结果已经与期望值非常接近了。我们的下载速度为 1667 个页面 / 32秒 = 52个索引页/秒,这就意味着每秒可以生成 52×30 = 1560 个详情页 URL。现在,可以将垂直抓取的 Rule 的注释取消掉,保存文件作为新爬虫分发。我们不需要对爬虫代码进行更多修改,这显示出我们即将开发的中间件的强大以及非侵入性。如果只使用开发服务器运行 scrapy crawl,假设处理详情页的速度和索引页处理时一样快,那么它将花费不少于 50000 / 52 = 16 分钟时间完成爬取。

本节有两个关键内容。在学习完第 10 章之后,我们已经可以实现真正的工程。我们能够精确计算出系统期望得到的性能,并且确保在达到该性能之前不会停止(在合理范围内)。第二个要记住的重要事情是,由于索引页爬取提供了详情页,爬取的总吞吐量将会是其吞吐量的最小值。如果我们生成的 URL 比 Scrapyd 能够消费得更快,那么 URL 将会堆积在其队列当中。反过来,如果生成的 URL 太慢,Scrapyd 将会拥有过剩的无法利用的能力。

分批爬取URL

现在,我们准备开发处理详情页 URL 的基础架构,目的是对其进行垂直爬取、分批并分发到多台 Scrapyd 节点中,而不是在本地爬取。

如果查看第 8 章中的 Scrapy 架构,就可以很容易地得出结论,这是爬虫中间件的任务,因为它实现了 process_spider_output(),在到达下载器之前,在此处处理请求,并能够中止它们。我们在实现中限制只支持基于 CrawlSpider 的爬虫,另外还只支持简单的 GET 请求。如果需要更加复杂,比如 POST 或有权限验证的请求,那么需要开发更复杂的功能来扩展参数、请求头,甚至可能在每次批量运行后重新登录。

在开始之前,先来快速浏览一下 Scrapy 的 GitHub。我们将回顾 SPIDER_MIDDLEWARES_BASE 设置,以查看 Scrapy 提供的参考实现,以便尽最大可能复用它。Scrapy 1.0 包含如下爬虫中间件:HttpErrorMiddleware、OffsiteMiddleware、RefererMiddleware、UrlLengthMiddleware 以及 DepthMiddleware。在快速了解它们的实现之后,我们发现 OffsiteMiddleware(只有 60 行代码)与想要实现的功能很相似。它根据爬虫的 allowed_domains 属性,把 URL 限制在某些特定域名中。我们可以使用相似的模式吗?和 OffsiteMiddleware 实现中丢弃 URL 不同,我们将对这些 URL 进行分批并发送到 Scrapyd 节点中。事实证明这是可以的。下面是实现的部分代码。

def __init__(self, crawler):
    settings = crawler.settings
    self._target = settings.getint('DISTRIBUTED_TARGET_RULE', -1)
    self._seen = set()
    self._urls = []
    self._batch_size = settings.getint('DISTRIBUTED_BATCH_SIZE', 1000)

def process_spider_output(self, response, result, spider):
    for x in result:
        if not isinstance(x, Request):
            yield x
        else:
            rule = x.meta.get('rule')
            if rule == self._target:
                self._add_to_batch(spider, x)
            else:
                yield x

def _add_to_batch(self, spider, request):
    url = request.url
    if not url in self._seen:
        self._seen.add(url)
        self._urls.append(url)
        if len(self._urls) >= self._batch_size:
            self._flush_urls(spider)

process_spider_output() 既处理 Item 也处理 Request。我们只想处理 Request,因此我们对其他所有内容执行 yield 操作。如果查看 CrawlSpider 的源代码,就会注意到将 Request / Response 映射到 Rule 的方式是通过其 meta 字典的名为 'rule' 的整型字段。我们检查该数值,如果它指向目标的 Rule(DISTRIBUTED_TARGET_RULE 设置),则会调用 _add_to_batch() 添加 URL 到当前批次。然后,丢弃该 Request。对其他所有 Request 执行 yield 操作,比如下一页链接、无变化的链接。_add_to_batch() 方法实现了一个去重机制。不过很遗憾的是,由于前一节中描述的分片流程,我们可能对少数 URL 抽取两次。我们使用 _seen 集合检测并丢弃重复值。然后,把这些 URL 添加到 _urls 列表中,如果其大小超过 _batch_size(DISTRIBUTED_BATCH_SIZE 设置),就会触发调用 _flush_urls()。该方法提供了如下的关键功能。

def __init__(self, crawler):
    settings = crawler.settings
    self._targets = settings.get("DISTRIBUTED_TARGET_HOSTS")
    self._batch = 1
    self._project = settings.get('BOT_NAME')
    self._feed_uri = settings.get('DISTRIBUTED_TARGET_FEED_URL', None)
    self._scrapyd_submits_to_wait = []

def _flush_urls(self, spider):
    if not self._urls:
        return
    target = self._targets[(self._batch - 1) % len(self._targets)]
    data = [
        ("project", self._project),
        ("spider", spider.name),
        ("setting", "FEED_URI=%s" % self._feed_uri),
        ("batch", str(self._batch)),
    ]
    json_urls = json.dumps(self._urls)
    data.append(("setting", "DISTRIBUTED_START_URLS=%s" % json_urls))
    d = treq.post("http://%s/schedule.json" % target, data=data, timeout=5, persistent=False)
    self._scrapyd_submits_to_wait.append(d)
    self._urls = []
    self._batch += 1

首先,它使用一个批次计数器(_batch)来决定要将该批次发送到哪个 Scrapyd 服务器中。我们在 _targets(DISTRIBUTED_TARGET_HOSTS 设置)中保持更新可用的服务器。然后,构造 POST 请求到 Scrapyd 的 schedule.json。这比之前通过 curl 执行的更加高级,因为它传递了一些精心挑选的参数。基于这些参数,Scrapyd 可以有效地计划运行任务,类似如下所示。

scrapy crawl distr \
-s DISTRIBUTED_START_URLS='[".../property_000000.html", ... ]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' \
-a batch=1

除了项目和爬虫名外,我们还向爬虫传递了一个 FEED_URI 设置。我们可以从 DISTRIBUTED_TARGET_FEED_URL 设置中获取该值。

由于 Scrapy 支持 FTP,我们可以让 Scrapyd 通过匿名 FTP 的方式将爬取到的 Item 文件上传到 Spark 服务器中。格式包含爬虫名(%(name)s)和时间(%(time)s)。如果只使用这些,那么当两个文件的创建时间相同时,最终会产生冲突。为了避免意外覆盖,我们还添加了一个 %(batch)s 参数。默认情况下,Scrapy 不知道任何关于批次的事情,因此我们需要找到一种方式来设置该值。Scrapyd 中 schedule.json 这个 API 的一个有趣特性是,如果参数不是设置或少数几个已知参数的话,它将会被作为参数传给爬虫。默认情况下,爬虫参数将会成为爬虫属性,未知的 FEED_URI 参数将会去查阅爬虫的属性。因此,通过传递 batch 参数给 schedule.json,我们可以在 FEED_URI 中使用它以避免冲突。

最后一步是使用编码为 JSON 的该批次详情页 URL 编译为 DISTRIBUTED_START_URLS 设置。除了熟悉和简单之外,使用该格式并没有什么特殊的理由。任何文本格式都可以做到。

通过命令行向 Scrapy 传输大量数据丝毫也不优雅。在一些时候,你想要将参数存储到数据存储中(比如 Redis),并且只向 Scrapy 传输 ID。如果想要这样做,则需要在 _flush_urls() 和 process_start_requests() 中做一些小的改变。

我们使用 treq.post() 处理 POST 请求。Scrapyd 对持久化连接处理得不是很好,因此使用 persistent=False 禁用该功能。为了安全起见,我们还设置了一个 5 秒的超时时间。有趣的是,我们为该请求在 _scrapyd_submits_to_wait 列表中存储了延迟函数,后续内容中将会进行讲解。关闭该函数时,我们将重置 _urls 列表,并增加当前的 _batch 值。

出人意料的是,我们在关闭操作处理器中发现了如下所示的诸多功能。

def __init__(self, crawler):
    ...
    crawler.signals.connect(self._closed, signal=signals.spider_closed)

@defer.inlineCallbacks
def _closed(self, spider, reason, signal, sender):
    # Submit any remaining URLs
    self._flush_urls(spider)
    yield defer.DeferredList(self._scrapyd_submits_to_wait)

_close() 将会在我们按下 Ctrl + C 或爬取完成时被调用。无论哪种情况,我们都不希望丢失属于最后一个批次的任何 URL,因为它们还没有被发送出去。这就是为什么我们在 _close() 方法中首先要做的是调用 _flush_urls(spider) 清空最后的批次的原因。第二个问题是,作为非阻塞代码,任何 treq.post() 在停止爬取时都可能完成或没有完成。为了避免丢失任何批次,我们将使用之前提及的 scrapyd_submits_to_wait 列表,来包含所有的 treq.post() 的延迟函数。我们使用 defer.DeferredList() 进行等待,直到全部完成。由于 _close() 使用了 @defer.inlineCallbacks,我们只需对其执行 yield 操作,并在所有请求完成之后进行恢复即可。

总结来说,在 DISTRIBUTED_START_URLS 设置中包含批量 URL 的任务将被送往 Scrapyd 服务器,并在这些 Scrapyd 服务器中运行相同的爬虫。很明显,我们需要某种方式以使用该设置初始化 start_urls。

从设置中获取初始URL

当你注意到爬虫中间件提供的用于处理爬虫给我们的 start_requests 的 process_start_requests() 方法时,就会感受到爬虫中间件是怎样满足我们的需求的。我们检测 DISTRIBUTED_START_URLS 设置是否已被设定,如果是的话,则解码 JSON 并使用其中的 URL 对相关的 Request 进行 yield 操作。对于这些请求,我们设置 CrawlSpider 的 _response_download() 方法作为回调,并设置 meta['rule'] 参数,以便其 Response 能够被适当的 Rule 处理。坦白来说,我们查阅了 Scrapy 的源代码,发现 CrawlSpider 创建 Request 的方式使用了相同的方法。在本例中,代码如下所示。

def __init__(self, crawler):

self._start_urls = settings.get('DISTRIBUTED_START_URLS', None)
self.is_worker = self._start_urls is not None

def process_start_requests(self, start_requests, spider):
    if not self.is_worker:
        for x in start_requests:
            yield x
    else:
        for url in json.loads(self._start_urls):
            yield Request(url, spider._response_downloaded, meta={'rule': self._target})

我们的中间件已经准备好了。可以在 settings.py 中启用它并进行设置。

SPIDER_MIDDLEWARES = {
    'properties.middlewares.Distributed': 100,
}

DISTRIBUTED_TARGET_RULE = 1
DISTRIBUTED_BATCH_SIZE = 2000
DISTRIBUTED_TARGET_FEED_URL = ("ftp://anonymous@spark/"
                               "%(batch)s_%(name)s_%(time)s.jl")
DISTRIBUTED_TARGET_HOSTS = [
    "scrapyd1:6800",
    "scrapyd2:6800",
    "scrapyd3:6800",
]

一些人可能会认为 DISTRIBUTED_TARGET_RULE 不应该作为设置,因为不同爬虫之间可能是不一样的。你可以将其认为是默认值,并且可以在爬虫中使用 custom_settings 属性进行覆写,比如:

custom_settings = {
    'DISTRIBUTED_TARGET_RULE': 3
}

不过在我们的例子中并不需要这么做。我们可以做一个测试运行,爬取作为设置提供的单个页面。

$ scrapy crawl distr -s \
DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000.html"]'

当爬取成功后,可以尝试更进一步,爬取页面后使用 FTP 传输给 Spark 服务器。

scrapy crawl distr -s \
DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000.  html"]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' -a batch=12

如果你通过 ssh 登录到 Spark 服务器中(稍后会有更多介绍),将会看到一个文件位于 /root/items 目录中,比如 12_distr_date_time.jl。

上述是使用 Scrapyd 实现分布式爬取的中间件的示例实现。你可以使用它作为起点,实现满足自己特殊需求的版本。你可能需要适配的事情包括如下内容。

支持的爬虫类型。比如,一个不局限于 CrawlSpider 的替代方案可能需要你的爬虫通过适当的 meta 以及采用回调命名约定的方式来标记分布式请求。

向 Scrapyd 传输 URL 的方式。你可能希望使用特定域名信息来减少传输的信息量。比如,在本例中,我们只传输了房产的 ID。

你可以使用更优雅的分布式队列解决方案,使爬虫能够从失败中恢复,并允许 Scrapyd 将更多的 URL 提交到批处理。

你可以动态填充目标服务器列表,以支持按需扩展。

在Scrapyd服务器中部署项目

为了能够在我们的 3 台 Scrapyd 服务器中部署爬虫,我们需要将这 3 台服务器添加到 scrapy.cfg 文件中。该文件中的每个 [deploy:target-name] 区域都定义了一个新的部署目标。

$ pwd
/root/book/ch11/properties
$ cat scrapy.cfg
...
[deploy:scrapyd1]
url = http://scrapyd1:6800/
[deploy:scrapyd2]
url = http://scrapyd2:6800/
[deploy:scrapyd3]
url = http://scrapyd3:6800/

可以通过 scrapyd-deploy -l 查询当前可用的目标。

$ scrapyd-deploy -l
scrapyd1       http://scrapyd1:6800/
scrapyd2       http://scrapyd2:6800/
scrapyd3       http://scrapyd3:6800/

通过 scrapyd-deploy <target-name>,可以很容易地部署任意服务器。

$ scrapyd-deploy scrapyd1
Packing version 1449991257
Deploying to project "properties" in http://scrapyd1:6800/addversion.json

Server response (200):
{"status": "ok", "project": "properties", "version": "1449991257",
 "spiders": 2, "node_name": "scrapyd1"}

该过程会留给我们一些额外的目录和文件(build、project.egg-info、setup.py),我们可以安全地删除它们。本质上,scrapyd-deploy 所做的事情就是打包你的项目,并使用 addversion.json 上传到目标 Scrapyd 服务器当中。

之后,当我们使用 scrapyd-deploy -L 查询单台服务器时,可以确认项目是否已经被成功部署,如下所示。

$ scrapyd-deploy -L scrapyd1
properties

我还在项目目录中使用 touch 创建了 3 个空文件(scrapyd1-3)。使用 scrapyd* 扩展为文件名称,同样也是目标服务器的名称。之后,你可以使用一个 bash 循环部署所有服务器:for i in scrapyd*; do scrapyd-deploy $i; done。