创建自定义监控命令
如果想监控多台 Scrapyd 服务器的爬虫进程,则需要手动执行。这是一个很好的机会,能够让我们练习到目前为止所见到的一切知识,创建一个原始的 Scrapy 命令——scrappymonitor,用于监控一组 Scrapyd 服务器。我们将该文件命名为 monitor.py,并且在 settings.py 文件中添加 COMMANDS_MODULE = 'properties.monitor'。通过快速浏览 Scrapyd 的文档,我们发现 listjobs.json 这个 API 可以为我们提供任务相关的信息。如果想要找到给定目标的基础 URL,可以猜到它一定在 scrapyd-deploy 代码中的某个地方,从而可以让我们在单个文件中找到它。如果查看 https://github.com/scrapy/scrapyd-client/blob/master/scrapyd-client/scrapyd-deploy,很快就会发现 _get_target() 函数(由于其实现没有添加太多值,因此我会忽略它),在该函数中将会给我们提供目标名称及其基础 URL。太棒了!我们开始实现该命令的第一部分吧,其代码如下所示。
class Command(ScrapyCommand):
requires_project = True
def run(self, args, opts):
self._to_monitor = {}
for name, target in self._get_targets().items():
if name in args:
project = self.settings.get('BOT_NAME')
url = target['url'] + "listjobs.json?project=" + project
self._to_monitor[name] = url
l = task.LoopingCall(self._monitor)
l.start(5) # call every 5 seconds
reactor.run()
目前我们所看到的实现还是很简单的。它使用目标名称和我们想要监控的 API 地址填充 _to_monitor 字典。然后,我们使用 task.LoopingCall() 计划到 _monitor() 方法的定期调用。_monitor() 方法使用了 treq 和延迟操作,而我们使用了 @defer.inlineCallbacks 来简化其实现。下面是其代码(已忽略一些错误处理和装饰)。
@defer.inlineCallbacks
def _monitor(self):
all_deferreds = []
for name, url in self._to_monitor.items():
d = treq.get(url, timeout=5, persistent=False)
d.addBoth(lambda resp, name: (name, resp), name)
all_deferreds.append(d)
all_resp = yield defer.DeferredList(all_deferreds)
for (success, (name, resp)) in all_resp:
json_resp = yield resp.json()
print("%-20s running: %d, finished: %d, pending: %d" %
(name, len(json_resp['running']),
len(json_resp['finished']), len(json_resp['pending'])))
上面这些行已经包含了我们知道的几乎所有 Twisted 技术。我们使用 treq 调用 Scrapyd 的 API,并且使用 defer.DeferredList 立即处理所有响应。当我们的所有结果进入到 all_resp 之后,则开始迭代并获取其 JSON 对象。treq Response 的 json() 方法将会返回延迟操作,而不是真实值,我们对其执行了 yield 操作,并会在未来的某个时间点恢复其真实值。最后一步,我们打印出结果。JSON 响应包含待处理、运行中及已完成任务列表的信息,我们将打印出它们的长度。