Scrapy 分布式爬虫的数据统计方案

在上一节中,我们已经学习了如何利用Kubernetes进行Scrapy爬虫的部署,并将其对接了分布式的实现,对接了代理池、账号池,以顺利地实现数据抓取。

但这时候我们又遇到了一个难题,怎样监控各个Scrapy.爬虫的爬取情况呢?比如,这时候我部署了10个Pod来运行Scrapy分布式爬虫,它们基于Scrapy-Redis进行协同爬取,但我们并无法知晓它们一分钟爬取了多少条数据,成功、失败、重试了多少次,难道要通过分析日志得出来吗?

另外,假如我们已经能成功获取了这些数据,又想进一步把这些数据可视化出来,做一个实时大屏图表,用什么方式实现比较好?需要自已额外写代码实现吗?还是说已经有了非常成熟的解决方案?

围绕这两个问题,我们来探索Scrapy爬虫监控方案。

准备

在本节开始之前,请确保已经完成了上一节的所有内容并能透彻地理解其原理,另外还需要你能较为熟练地完成利用 Kubernetes 部署 Scrapy 爬虫和其他服务的操作。

数据统计

我们在 Scrapy 运行结果中会注意到它时不时输出类似这样的结果: 2021-03-15 21:52:06 [scrapy.extensions.logstats] INFO: Crawled 33 pages (at 33 pages/min), scraped 172 items (at 172 items/min)

这里显示了 Scrapy 爬虫的统计结果,里面包含当前 Spider 的页面(page)爬取速度和结果(item)的提取速度,本例中一分钟爬取了 33 个页面,提取了 172 个结果。

然而,当前我们基于 Scrapy-Redis 通过前面几节的方案实现了分布式爬虫之后,仔细观察会发现每个爬虫输出的结果都是各自的统计结果,比如其中一个 Spider 机器性能和网络比较好,爬取速度快,那么它的统计结果就更高,表现不太好的 Spider 的统计结果就差一些。这些 Spider 的统计信息都是独立的、互不影响的、数据也各不相同。

这是为什么呢?

回想一下,之前 Scrapy-Redis 实现分布式爬虫时,我们有两项关键配置: SCHEDULER = "scrapy_redis.scheduler.Scheduler" DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

这里我们分别配置了 Scheduler 和 RFPDupeFilter,其中 Scheduler 可以实现所有请求通过 Redis 队列共享,RFPDupeFilter 可以实现去重指纹通过 Redis 集合共享。然而统计信息呢?哪里配置共享吗?并没有,因此每个 Spider 都是各统计各的,数据各不相干。

这就遇到了一个问题,统计信息不同步而且很分散,这么多 Scrapy 爬虫究竟爬取了多少数据也无从得知。如果通过日志来行数据收集和统计,这个难度和工作量也不小,而且不精确。

所以,有没有什么简单方法呢?

当然有,按照 Scheduler 和 RFPDupeFilter 的思路,将统计信息也通过 Redis 共享不就可以了吗?

实现原理

要实现这个功能,我们需要用到 Scrapy 的一个组件,叫作 Stats Collection,翻译过来可以叫统计信息收集器,它是一种 Scrapy 的 Extension,即扩展组件。

Scrapy 通过 Stats Collection 来收集键值对类型的统计信息,其中值一般是计数器,这么多键值对构成了一个统计表,可以理解为 Python 中的集合。比如上述例子中的爬取了多少页面,提取了多少结果,这两个信息是可以通过 Stats Collection 来保存的。

如果说得更直观一点,在 Scrapy 爬虫运行完成时,想必我们还注意到过类似如下的统计信息:

{'downloader/request_bytes': 2925,
'downloader/request_count': 11,
'downloader/request_method_count/GET': 11,
'downloader/response_bytes': 23406,
'downloader/response_count': 11,
'downloader/response_status_count/200': 10,
'downloader/response_status_count/404': 1,
'elapsed_time_seconds': 3.917599,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2021, 3, 15, 14, 1, 36, 275427),
'item_scraped_count': 100,
'log_count/DEBUG': 111,
'log_count/INFO': 10,
'memusage/max': 55242752,
'memusage/startup': 55242752,
'request_depth_max': 9,
'response_received_count': 11,
'robotstxt/request_count': 1,
'robotstxt/response_count': 1,
'robotstxt/response_status_count/404': 1,
'scheduler/dequeued': 10,
'scheduler/dequeued/memory': 10,
'scheduler/enqueued': 10,
'scheduler/enqueued/memory': 10,
'start_time': datetime.datetime(2021, 3, 15, 14, 1, 32, 357828)}

这个结果就是 Stats Collection 里面存储的常用键值对,比如 item_scraped_count 就代表爬取了多少结果,downloader/response_status_count/200 就代表成功的响应次数有多少。

看起来挺清晰的,对不对?我们可以通过这些指标清楚地得知当前状态下每个 Scrapy 爬虫的运行状态。

这是怎么实现的呢?其实在 Scrapy 中,它是通过一个默认配置好的 Stats Collector 实现的,叫作 MemoryStatsCollector,这是 Scrapy 中内置的 Stats Collector,我们可以通过配置 STATS_CLASS 来更改。

看下 Stats Collector 的源码,内容如下:

import pprint
import logging

logger = logging.getLogger(__name__)

class StatsCollector:
    def __init__(self, crawler):
        self._dump = crawler.settings.getbool('STATS_DUMP')
        self._stats = {}

    def get_value(self, key, default=None, spider=None):
        return self._stats.get(key, default)

    def get_stats(self, spider=None):
        return self._stats

    def set_value(self, key, value, spider=None):
        self._stats[key] = value

    def set_stats(self, stats, spider=None):
        self._stats = stats

    def inc_value(self, key, count=1, start=0, spider=None):
        d = self._stats
        d[key] = d.setdefault(key, start) + count

    def max_value(self, key, value, spider=None):
        self._stats[key] = max(self._stats.setdefault(key, value), value)

    def min_value(self, key, value, spider=None):
        self._stats[key] = min(self._stats.setdefault(key, value), value)

    def clear_stats(self, spider=None):
        self._stats.clear()

    def open_spider(self, spider):
        pass

    def close_spider(self, spider, reason):
        if self._dump:
            logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats),
                        extra={'spider': spider})
        self.persist_stats(self._stats, spider)

    def _persist_stats(self, stats, spider):
            pass


class MemoryStatsCollector(StatsCollector):

    def __init__(self, crawler):
        super().__init__(crawler)
        self.spider_stats = {}

    def _persist_stats(self, stats, spider):
        self.spider_stats[spider.name] = stats

这里可以很明显看到 MemoryStatsCollector 继承自 StatsCollector 这个类,而 StatsCollector 里面又提供了一系列数据获取和设置相关的方法,比如 set_value 接收 key 和 value 参数,将 value 存储到 stats 这个全局变量里面,get_value 接收 key 这个参数,然后将 value 从 stats 这个全局变量里面取出来并返回。

这个 stats 变量就相当于一个大的表,爬虫开始运行时将 stats 进行初始化,然后整个爬虫在有任何事件发生的时候可以调用一下数据修改的 set_value 方法,将数据的修改记录下来就好了。而 MemoryStatsCollector 的实现也非常的简单,就是将 stats 初始化为一个 Python 字典,所以所有的数据统计结果都是在内存中存储的。如果爬虫运行停止了而且这些数据没有保存下来的话,数据就丢失了,而且这个数据也没有任何共享机制,所以每个 Scrapy 爬虫的统计信息都是一个独立的 Python 字典,自然也就无法做到统计信息的共享了。

到了这里,我们就知道如果要实现 Scrapy 分布式爬虫的统计信息的共享,应该怎么做了吧?那就是将 stats 全局变量通过 Redis 共享!

仿照 Scrapy-Redis 的其他模块的实现,我们可以将其实现,代码类似如下:

from scrapy.statscollectors import StatsCollector
from .connection import from_settings as redis_from_settings
from .defaults import STATS_KEY, SCHEDULER_PERSIST
from datetime import datetime

class RedisStatsCollector(StatsCollector):

    def __init__(self, crawler, spider=None):
        super().__init__(crawler)
        self.server = redis_from_settings(crawler.settings)
        self.spider = spider
        self.spider_name = spider.name if spider else crawler.spidercls.name
        self.stats_key = crawler.settings.get('STATS_KEY', STATS_KEY)
        self.persist = crawler.settings.get(
            'SCHEDULER_PERSIST', SCHEDULER_PERSIST)

    def _get_key(self, spider=None):
        if spider:
            self.stats_key % {'spider': spider.name}
        if self.spider:
            return self.stats_key % {'spider': self.spider.name}
        return self.stats_key % {'spider': self.spider_name or 'scrapy'}

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

    def get_value(self, key, default=None, spider=None):
        if self.server.hexists(self._get_key(spider), key):
            return int(self.server.hget(self._get_key(spider), key))
        else:
            return default

    def get_stats(self, spider=None):
        return self.server.hgetall(self._get_key(spider))

    def set_value(self, key, value, spider=None):
        self.server.hset(self._get_key(spider), key, value)

    def set_stats(self, stats, spider=None):
        self.server.hmset(self._get_key(spider), stats)

    def inc_value(self, key, count=1, start=0, spider=None):
        if not self.server.hexists(self._get_key(spider), key):
            self.set_value(key, start)
        self.server.hincrby(self._get_key(spider), key, count)

    def max_value(self, key, value, spider=None):
        self.set_value(key, max(self.get_value(key, value), value))

    def min_value(self, key, value, spider=None):
        self.set_value(key, min(self.get_value(key, value), value))

    def clear_stats(self, spider=None):
        self.server.delete(self._get_key(spider))

    def open_spider(self, spider):
        if spider:
            self.spider = spider

    def close_spider(self, spider, reason):
        self.spider = None
        if not self.persist:
            self.clear_stats(spider)

这部分改动需要放在 Scrapy-Redis 源码里面,这里主要的改动就是将 stats 修改为 Redis HSET 实现,因为 HSET 就是 Redis 中的一个用于键值对存储的数据结构。比如 set_value 就可以修改为 hset 方法实现,get_value 就可以修改为 hget 方法实现。

大家看到这里可能眉头一紧,心想这个功能还需要自己去修改源码实现吗?这样会增加不少工作量。在 Scrapy-Redis 0.6.8 及以前的版本中,确实需要这么做。不过幸运的是,我已经把这部分功能实现并合并到 Scrapy-Redis 源代码中了,自 Scrapy-Redis 0.7.1 版本开始,大家就可以直接使用了。另外,默认情况下,大家会安装最新版的 Scrapy-Redis,所以大多数情况下是可以直接使用的。

怎么使用呢?很简单,只需要在 Scrapy 爬虫项目里面的 settings.py 中添加如下的一行配置即可:

STATS_CLASS = "scrapy_redis.stats.RedisStatsCollector"

仅仅通过这一行代码的配置,我们就完成了如上的所有工作,不需要手动实现 RedisStatsCollector 这个类了。

接下来,我们重新运行下 Scrapy 爬虫,通过 Redis Desktop Manager 连接 Redis 看下,这时候就会发现运行过程中多了一个 Redis 的 key,如图 17-24 所示。

图17-24 Redis key 列表

这里多了一个 book:stats 的 key,打开看下结果,如图 17-25 所示。

图17-25 运行结果

这时候我们可以看到所有的Scrapy统计信息都存储到这里了,而且多个Scrapy爬虫通过此统计信息实现了数据共享,任何一个爬虫的数据修改都会直接反映到这个Redis的HSET里面,这样我们就实现了Scrapy分布式爬虫的统计信息共享。

总结

在本节中,我们首先介绍了 Scrapy 爬虫的统计信息是怎么实现的,然后介绍了如何实现 Scrapy 分布式爬虫的统计信息共享。

统计信息共享是一个非常有用的功能,为后面数据可视化打下了基础,后文我们会继续学习如何基于这些信息进行数据可视化。