为CPU密集型、 阻塞或遗留功能建立接口
本章最后一节讨论的是访问大多数非 Twisted 的工作。尽管有高效的异步代码所带来的巨大收益,但为 Twisted 和 Scrapy 重写每个库,既不现实也不可行。使用 Twisted 的线程池和 reactor.spawnProcess() 方法,我们可以使用任何 Python 库甚至其他语言编写的二进制包。
处理CPU密集型或阻塞操作的管道
第 8 章讲到,reactor 对于简短、非阻塞的任务非常理想。如果必须要执行一些更复杂或是涉及阻塞的事情,该怎么做呢? Twisted 提供了线程池,可以使用 reactor.callInThread() API 调用,在一些线程中执行慢操作,而不是在主线程中执行(Twisted 的 reactor)。这就意味着 reactor 会持续运行其处理过程,并在计算发生时响应事件。请注意,在线程池中的处理不是线程安全的。这就是说当你使用全局状态时,又会出现多线程编程中所有的传统同步问题。让我们从该管道的一个简单版本起步,逐渐编写出完整的代码。
class UsingBlocking(object):
@defer.inlineCallbacks
def process_item(self, item, spider):
price = item["price"][0]
out = defer.Deferred()
reactor.callInThread(self._do_calculation, price, out)
item["price"][0] = yield out
defer.returnValue(item)
def _do_calculation(self, price, out):
new_price = price + 1
time.sleep(0.10)
reactor.callFromThread(out.callback, new_price)
在前面的管道中,我们看到了实际运行的基本原语。对于每个 Item,我们抽取其价格,并希望使用 _do_calculation()
方法处理它。该方法使用了一个阻塞操作 time.sleep() 。我们将使用 reactor.callInThread() 调用把它放到另一个线程中运行。其中,被调用的函数以及传给该函数的任意数量的参数将会作为参数。显然,我们不只传递了 price,还创建并传递了一个名为 out 的延迟操作。当 _do_calculation()
完成计算时,我们将使用 out 回调返回值。在下一步中,我们对这个延迟操作执行了 yield 处理,并为价格设置了新值,最后返回 Item。
在 _do_calculation() 中,注意到有一个简单的计算——价格自增 1,然后是 100 毫秒的睡眠。这是非常多的时间,如果在 reactor 线程中调用,它将使我们每秒处理的页数无法超过 10 页。通过使其在其他线程中运行,就不再有这个问题了。任务将会在线程池中排队,等待出现可用的线程,一旦进入线程执行,该线程就将睡眠 100 毫秒。最后一步是触发 out 回调。正常情况下,可以使用 out.callback(new_price),不过由于现在处于另一个线程中,这种方法不再安全。如果这样做,会导致延迟操作的代码和 Scrapy 的功能会从另一个线程调用,迟早会出现错误的数据。替代方案是使用 reactor.callFromThread(),同样,也是将函数作为参数,并将任意数量的额外参数传到函数中。该函数将会排队,由 reactor 线程调用;而另一方面,会解除 process_item() 对象 yield 操作的阻塞,为该 Item 恢复 Scrapy 操作。
如果有全局状态(比如计数器、移动平均值等)的话,那么在 _do_calculation()
中使用它们会发生什么呢?例如,我们添加两个变量——beta 和 delta,如下所示。
class UsingBlocking(object):
def __init__(self):
self.beta, self.delta = 0, 0
...
def _do_calculation(self, price, out):
self.beta += 1
time.sleep(0.001)
self.delta += 1
new_price = price + self.beta - self.delta + 1
assert abs(new_price-price-1) < 0.01
time.sleep(0.10)...
上面的代码存在问题,我们会得到断言错误。这是因为如果一个线程在 self.beta 和 self.delta 之间切换,而另一个线程使用这些 beta/delta 的值恢复计算价格,那么会发现它们处于不一致的状态(beta 比 delta 大),因此,会计算出错误的结果。短暂的睡眠使该问题更容易产生,不过即便没有它,竞态条件也将很快出现。为了避免此类问题发生, 必须使用锁,比如使用 Python 的 threading.RLock() 递归锁。当使用锁时,我们可以确信不会存在两个线程同时执行其保护的临界区的情况。
class UsingBlocking(object):
def __init__(self):
...
self.lock = threading.RLock()
...
def _do_calculation(self, price, out):
with self.lock:
self.beta += 1
...
new_price = price + self.beta - self.delta + 1
assert abs(new_price-price-1) < 0.01 ...
前面的代码现在是正确的。请记住我们并不需要保护整段代码,只需覆盖全局状态的使用就够了。
本示例的完整代码位于 ch09/properties/properties/pipelines/computation.py 文件中。 |
要想使用该管道,只需在 settings.py 文件中将其添加到 ITEM_PIPELINES 设置即可,如下所示。
ITEM_PIPELINES = { ...
'properties.pipelines.computation.UsingBlocking': 500,
可以按照平时那样运行该爬虫。按照预期,管道延时显著增长了 100 毫秒,不过我们惊喜地发现吞吐量几乎保持不变,即每秒 25 个 item 左右。
使用二进制或脚本的管道
对于一个遗留功能来说,最不可知的接口就是独立的可执行程序或脚本。它可能需要几秒钟时间启动(比如从数据库中加载数据),不过在这之后,它可能会在一小段延时内处理许多值。即使对于这种情况,Twisted 仍然能够覆盖。我们可以使用 reactor.spawnProcess() API 以及相关的 protocol.ProcessProtocol 运行任何类型的可执行程序。来看一个例子,该示例的脚本如下所示。
#!/bin/bash
trap "" SIGINT
sleep 3
while read line
do
# 4 per second
sleep 0.25
awk "BEGIN {print 1.20 * $line}"
done
这是一个简单的 bash 脚本。当它启动后,会禁用 Ctrl + C。这是为了解决 Ctrl + C 派生到子进程后过早终止,导致 Scrapy 自身无法停止,无限等待子进程返回结果的系统特性。禁用 Ctrl + C 后,脚本将会睡眠 3 秒钟,以模拟启动时间。然后脚本会从输入中读取行,等待 250 毫秒,再返回结果价格,该计算使用 Linux 的 awk 命令将原值乘以 1.2 倍。该脚本的最大吞吐量是每秒 4 个 Item。可以使用一个简短的会话对其进行测试,如下所示。
$ properties/pipelines/legacy.sh
12 <- If you type this quickly you will wait ~3 seconds to get results
14.40
13 <- For further numbers you will notice just a slight delay
15.60
由于 Ctrl + C 被禁用,我们必须使用 Ctrl + D 终止会话。不错!那么,我们要如何在 Scrapy 中使用该脚本呢?仍然从一个简化的版本起步。
class CommandSlot(protocol.ProcessProtocol):
def __init__(self, args):
self._queue = []
reactor.spawnProcess(self, args[0], args)
def legacy_calculate(self, price):
d = defer.Deferred()
self._queue.append(d)
self.transport.write("%f\n" % price)
return d
# Overriding from protocol.ProcessProtocol
def outReceived(self, data):
"""Called when new output is received"""
self._queue.pop(0).callback(float(data))
class Pricing(object):
def __init__(self):
self.slot = CommandSlot(['properties/pipelines/legacy.sh'])
@defer.inlineCallbacks
def process_item(self, item, spider):
item["price"][0] = yield self.slot.legacy_calculate(item["price"][0])
defer.returnValue(item)
我们可以在这里找到名为 CommandSlot 的 ProcessProtocol 的定义,以及 Pricing 爬虫。在 __init__()
中,我们创建了新的 CommandSlot,其构造方法初始化了一个空队列,并使用 reactor.spawnProcess() 启动了一个新的进程。该调用将从进程中传输和接收数据的 ProcessProtocol 作为第一个参数。在本例中,该值为 self,因为 spawnProcess() 是在 protocol 类中进行调用的。第二个参数是可执行程序的名称。第三个参数 args 将该二进制程序的所有命令行参数作为字符串列表保留。
在管道的 process_item() 中,基本上将所有工作都委托给 CommandSlot 的 legacy_calculate() 方法,它将返回一个延迟操作,并执行 yield 操作。legacy_calculate() 创建了一个延迟操作,使其排队,然后使用 transport.write() 将价格写入到进程当中。transport 由 ProcessProtocol 提供,用于让我们和进程进行通信。无论我们何时从进程中接收到数据,都会调用 outReceived()。通过延迟操作排队,以及按顺序处理的 shell 脚本,我们可以从队列中只弹出最旧的延迟操作,使用接收到的值触发它。到此为止。我们可以通过在 ITEM_PIPELINES 中添加它的方式,启动该管道,并像平时那样运行。
ITEM_PIPELINES = {...
'properties.pipelines.legacy.Pricing': 600,
如果我们运行一次,就会发现其性能非常糟糕。如我们所料,我们的处理成为瓶颈,限制了吞吐量只能达到每秒 4 个 Item。要想增长吞吐量,我们所能做的就是对管道进行一些修改,允许该类并行运行多个,如下所示。
class Pricing(object):
def __init__(self):
self.concurrency = 16
args = ['properties/pipelines/legacy.sh']
self.slots = [CommandSlot(args) for i in xrange(self.concurrency)]
self.rr = 0
@defer.inlineCallbacks
def process_item(self, item, spider):
slot = self.slots[self.rr]
self.rr = (self.rr + 1) % self.concurrency
item["price"][0] = yield slot.legacy_calculate(item["price"][0])
defer.returnValue(item)
我们将其修改为启动 16 个实例,并以轮询的方式为每个实例发送价格。该管道现在提供了每秒 16×4 = 64 个 item 的吞吐量。我们可以通过一个快速爬取来确认,如下所示。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 0.0 items/s, avg latency: 0.00 s and avg time in pipelines:
0.00 s
Scraped... 21.0 items/s, avg latency: 2.20 s and avg time in pipelines:
1.48 s
Scraped... 24.2 items/s, avg latency: 1.16 s and avg time in pipelines:
0.52 s
延时和预期一样,增长到 250 毫秒,不过吞吐量仍然是每秒 25 个 item。
请注意,前面的方法中使用了 transport.write() 将 shell 脚本输入中的所有价格排入队列。对于你的应用而言,这种方式可能合适,也可能不合适,尤其是当它使用了更多的数据而不仅仅是几个数字时。本例完整代码会将所有值和回调排入队列,并且只有在前一次结果被接收后,才会向脚本发送新值。你会发现这种方式对你的遗留应用更加友好,不过也增添了一些复杂度。
本章小结
本章讲解了一些复杂的 Scrapy 管道。到目前为止,我们已经学习了 Twisted 编程方面所有可能需要的内容,并且知道了如何实现进程、使用 Item 进程管道等复杂功能。我们通过在延时和吞吐量方面添加更多管道阶段,看到了性能是如何变化的。通常情况下,延时和吞吐量被认为是成反比的,不过这是建立在常数并发的假设下的(例如线程的数例有限)。在我们的例子中,我们从 N = S · T = 25 · 0.77 ≌ 19 开始,在添加管道后,最终达到 N = 25·3.33 ≌ 83,并且没有任何性能问题。这就是 Twisted 编程的力量!现在我们可以进入第 10 章,使 Scrapy 的性能更加完美。