示例2:测量吞吐量和延时的扩展
当我们在第 9 章中添加管道后,测量吞吐量(每秒的 item 数)和延时(计划后和下载后的时间)的变化是一件很有意思的事情。
Scrapy 扩展中已经包含了一个测量吞吐量的扩展,即日志统计扩展(scrapy/extensions/logstats.py),我们将会以此为起点。要想测量延时,需要 hook 一些信号,包括 request_scheduled、response_received 和 item_scraped。我们对每个信号记录时间戳,并通过累计多次取平均值的方式减去适当的计算延时。通过观察这些信号提供的回调参数,会发现一些讨厌的东西。item_scraped 只在 Response 中获得,request_scheduled 只在 Request 中获得,而 response_received 则是两者中都有。幸运的是,我们不需要任何特殊的技巧来传递这些值。每个 Response 都有一个 Request 成员,回指其 Request,更好的是它拥有我们在第 5 章中看到的 meta 字典,它和原始 Request 中的一样,而不管是否存在重定向。非常好,我们可以在这里存储时间戳了!
实际上,这并不是我的主意。同样的机制已经在 AutoThrottle 扩展(scrapy/extensions/throttle.py)中使用了。在该扩展中,使用了 request.meta.get('download_latency'),其中 download_latency 是在 scrapy/core/downloader/webclient.py 下载器中进行计算的。在编写中间件时, 最快的改善方式就是让自己熟悉 Scrapy 默认的中间件代码。 |
下面是扩展的代码。
class Latencies(object):
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def __init__(self, crawler):
self.crawler = crawler
self.interval = crawler.settings.getfloat('LATENCIES_INTERVAL')
if not self.interval:
raise NotConfigured
cs = crawler.signals
cs.connect(self._spider_opened, signal=signals.spider_opened)
cs.connect(self._spider_closed, signal=signals.spider_closed)
cs.connect(self._request_scheduled, signal=signals.request_scheduled)
cs.connect(self._response_received, signal=signals.response_received)
cs.connect(self._item_scraped, signal=signals.item_scraped)
self.latency, self.proc_latency, self.items = 0, 0, 0
def _spider_opened(self, spider):
self.task = task.LoopingCall(self._log, spider)
self.task.start(self.interval)
def _spider_closed(self, spider, reason):
if self.task.running:
self.task.stop()
def _request_scheduled(self, request, spider):
request.meta['schedule_time'] = time()
def _response_received(self, response, request, spider):
request.meta['received_time'] = time()
def _item_scraped(self, item, response, spider):
self.latency += time() - response.meta['schedule_time']
self.proc_latency += time() - response.meta['received_time']
self.items += 1
def _log(self, spider):
irate = float(self.items) / self.interval
latency = self.latency / self.items if self.items else 0
proc_latency = self.proc_latency / self.items if self.items else 0
spider.logger.info(("Scraped %d items at %.1f items/s, avg latency: "
"%.2f s and avg time in pipelines: %.2f s") %
(self.items, irate, latency, proc_latency))
self.latency, self.proc_latency, self.items = 0, 0, 0
前两个方法非常重要,因为它们很通用。它们使用 Crawler 对象初始化中间件。你会发现这些代码几乎出现在每个重要的中间件当中。from_crawler(cls, crawler) 是获取 Crawler 对象的方式。然后,可以注意到在 __init__()
方法中,访问了 crawler.settings,并且会在其未设置时抛出 NotConfigured 异常。你会看到很多 FooBar 扩展,用于检查相应的 FOOBAR_ENABLED 设置,如果没有设置或者设置为 False 时,将会抛出异常。这是一种非常常见的模式,是为了方便将中间件包含在对应的 settings.py 设置中(比如 ITEM_PIPELINES),但是默认情况下是禁用的,除非通过其对应的设置显式启用。许多默认的 Scrapy 中间件(比如 AutoThrottle 或 HttpCache)都使用了这种模式。在本例中,我们的扩展会保持 LATENCIES_INTERVAL 的禁用状态,除非已经对其进行了设置。
在 __init__()
方法的后面一部分代码中,我们使用 crawler.signals.connect(),为所有感兴趣的信号都注册了回调,并初始化了一些成员变量。这个类的剩余部分实现了信号处理器。在 _spider_opened() 中,我们初始化了一个计时器,会每隔 LATENCIES_INTERVAL 秒调用 _log() 方法;在 _spider_closed() 中,我们停止了该计时器。在 _request_scheduled() 和 _response_received() 中,我们在 request.meta 中存储了时间戳;而在 _item_scraped() 中,我们累计两次延时(从计划/接收开始直到当前时间),并递增抓取到的 Item 的数量。在 _log() 方法中,我们计算了平均值,格式化并打印出消息,重置累加器以开始另一个采样周期。
任何在多线程上下文中编写类似代码的人,都会意识到上述代码中没有使用互斥锁。本例可能还不是特别复杂,不过编写单线程代码仍然要更加简单,并且在更加复杂的场景下可以很好地扩展。 |
我们可以将该扩展的代码添加到 latencies.py 模块中,放到和 settings.py 同级的目录下。如果想要启用该扩展,只需在 settings.py 文件中添加如下两行。
EXTENSIONS = { 'properties.latencies.Latencies': 500, }
LATENCIES_INTERVAL = 5
我们可以像平时那样运行它。
$ pwd
/root/book/ch08/properties
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000 -s LOG_LEVEL=INFO
...
INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
INFO: Scraped 0 items at 0.0 items/sec, average latency: 0.00 sec and
average time in pipelines: 0.00 sec
INFO: Scraped 115 items at 23.0 items/s, avg latency: 0.84 s and avg time
in pipelines: 0.12 s
INFO: Scraped 125 items at 25.0 items/s, avg latency: 0.78 s and avg time
in pipelines: 0.12 s
日志的第一行来自日志统计扩展,而接下来的各行来自我们的扩展。可以看到吞吐量是每秒 25 个 item,平均时延是 0.78 秒,我们在下载后几乎没有花费时间处理。通过利特尔法则,我们得到系统中 item 的数量为 N = S · T = 43 · 0.45 ≌ 19。无论设置的 CONCURRENT_REQUESTS 和 CONCURRENT_REQUESTS_PER_DOMAIN 是多 少,即便没有触及 100% 的 CPU,出于某些原因,也不应该使其超过 30。我们可以在第 10 章中了解更多相关内容。