使用Twisted专用客户端建立服务接口

到目前为止,我们看到了如何通过 treq 使用类 REST API。Scrapy 还可以和许多其他使用 Twisted 专用客户端的服务建立接口。比如,我们想要与 MongoDB 建立接口,当搜索 "MongoDB Python" 时,将会得到 PyMongo,该库是阻塞/同步的,不能和 Twisted 一起使用,除非使用后续小节中的方法,在管道中描述线程,处理阻塞操作。如果搜索 "MongoDB Twisted Python",将会得到 txmongo,该库可以在 Twisted 和 Scrapy 中完美运行。通常情况下,Twisted 客户端背后的社区都很小,但相比自行编写客户端,这仍然是一个更好的选择。我们将使用一个类似的 Twisted 专用客户端作为接口,处理 Redis 键值对存储。

用于读写Redis的管道

Google Geocoding API 是按照 IP 进行限制的。我们可以利用多个 IP(例如使用多台服务器)进行缓解,此时需要避免重复请求其他机器上已经完成地理编码的地址。这种情况也适用于之前运行中曾见到过的地址。我们不想浪费宝贵的限额。

请与 API 供应商沟通,确保在他们的策略下这种做法是可行的。比如,你可能必须每隔几分钟/小时就要丢弃掉缓存记录,或者根本不允许缓存。

我们可以使用 Redis 的键值对缓存,从本质上说,它是一个分布式的字典。我们已经在 vagrant 环境中运行了一个 Redis 实例,可以使用 redis-cli 命令,从开发机连接它并执行基本操作。

$ redis-cli -h redis
redis:6379> info keyspace
# Keyspace
redis:6379> set key value
OK
redis:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
redis:6379> FLUSHALL
OK
redis:6379> info keyspace
# Keyspace
redis:6379> exit

通过 Google 搜索 "Redis Twisted",我们找到了 txredisapi 库。其本质区别是它不再是同步 Python 库的包装,而是适用于 Twisted 的库,它使用 reactor.connectTCP() 连接 Redis、实现 Twisted 协议等。使用该库的方式与其他库类似,不过在 Twisted 应用中使用它时,其效率肯定会更高一些。我们在安装它时可以再附带一个工具库——dj_redis_url,该工具库用于解析 Redis 配置 URL,我们可以使用 pip 进行安装(sudo pip install txredisapi dj_redis_url),和往常一样,在我们的开发机中也已经预先安装好了这些库。

可以按如下代码初始化 RedisCache。

from txredisapi import lazyConnectionPool
class RedisCache(object):
...
    def __init__(self, crawler, redis_url, redis_nm):
        self.redis_url = redis_url
        self.redis_nm = redis_nm

        args = RedisCache.parse_redis_url(redis_url)
        self.connection = lazyConnectionPool(connectTimeout=5,
                                            replyTimeout=5,
                                            **args)

        crawler.signals.connect(
                self.item_scraped,signal=signals.item_scraped)

该管道非常简单。为了连接 Redis 服务器,我们需要主机地址、端口等参数,由于这些参数是以 URL 格式存储的,因此需要使用 parse_redis_url() 方法解析该格式(为简洁起见已经省略)。为键设置前缀作为命名空间的行为非常常见,在本例中,我们将其存储在 redis_nm 中。然后,使用 txredisapi 的 lazyConnectionPool(),打开到服务器的连接。

最后一行使用了一个很有意思的函数。我们的目的是将地理编码管道与该管道包装起来。如果在 Redis 中没有某个值,我们将不会设置该值,我们的地理编码管道将像之前那样使用 API 对地址进行地理编码。

在该操作完成之后,需要有一种方式在 Redis 中缓存这些键值对,在这里是通过连接到 signals.item_scraped 信号的方式实现的。我们定义的回调( item_scraped() 方法,将很快看到)在非常靠后的位置被调用,此时坐标位置将会被设置。

本示例的完整代码位于 ch09/properties/properties/pipelines/redis.py。

我们通过查找和记录每个 Item 的地址和位置,保持了缓存的简单性。这对 Redis 来说是很有意义的,因为它经常运行在同一个服务器当中,这使得它运行速度非常快。如果不是这种情况,那么可能需要添加一个基于字典的缓存,与我们在地理编码管道中的实现类似。下面是处理传入的 Item 的方法。

@defer.inlineCallbacks
def process_item(self, item, spider):
    address = item["address"][0]
    key = self.redis_nm + ":" + address
    value = yield self.connection.get(key)
    if value:
        item["location"] = json.loads(value)
    defer.returnValue(item)

和大家的期望相同。我们得到了地址,为其添加前缀,然后使用 txredisapi connection 的 get() 方法在 Redis 中查询。我们在 Redis 中存储的值是 JSON 编码的对象。如果值已经设定,则使用 JSON 对其进行解码,并将其设为坐标位置。

当一个 Item 到达所有管道的结尾时,我们重新捕获它,确保存储到 Redis 的位置值当中。下面是实现代码。

from txredisapi import ConnectionError

def item_scraped(self, item, spider):
    try:
        location = item["location"]
        value = json.dumps(location, ensure_ascii=False)
    except KeyError:
        return

    address = item["address"][0]
    key = self.redis_nm + ":" + address
    quiet = lambda failure: failure.trap(ConnectionError)
    return self.connection.set(key, value).addErrback(quiet)

这里同样没有什么惊喜。如果我们找到一个位置,就可以得到地址,为其添加前缀,并使用它们作为键值对,用于 txredisapi 连接的 set() 方法。你会发现该函数没有使用 @defer.inlineCallbacks,这是因为在处理 signals.item_scraped 时并不支持该装饰器。这就意味着无法再对 connection.set() 使用非常便捷的 yield 操作,不过我们可以做的工作是返回延迟操作,Scrapy 可以用它串联任何未来的信号进行监听。无论何种情况,如果到 Redis 的连接无法执行 connection.set(),就会抛出一个异常。可以通过添加自定义错误处理到 connection.set() 返回的延迟操作中, 静默忽略该异常。在该错误处理中,我们将失败作为参数传递,并告知它们对任何 ConnectionError 执行 trap() 操作。这是 Twisted 的延迟操作 API 的一个非常好用的功能。通过在预期的异常中使用 trap(),我们能够以紧凑的方式静默忽略它们。

为了启用该管道,我们所需做的就是将其添加到 ITEM_PIPELINES 设置中,并在 settings.py 文件中提供一个 REDIS_PIPELINE_URL。为该管道设置一个比地理编码管道更小的优先级值非常重要,否则其运行就会太迟,无法起到作用。

ITEM_PIPELINES = { ...
    'properties.pipelines.redis.RedisCache': 300,
    'properties.pipelines.geo.GeoPipeline': 400,
...
REDIS_PIPELINE_URL = 'redis://redis:6379'

我们可以像平时那样运行该爬虫。第一次运行将会和之前类似,不过接下来的每次运行都会像下面这样。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=100
...
INFO: Enabled item pipelines: TidyUp, RedisCache, GeoPipeline,
MysqlWriter, EsWriter
...
Scraped... 0.0 items/s, avg latency: 0.00 s, time in pipelines: 0.00 s
Scraped... 21.2 items/s, avg latency: 0.78 s, time in pipelines: 0.15 s
Scraped... 24.2 items/s, avg latency: 0.82 s, time in pipelines: 0.16 s
...
INFO: Dumping Scrapy stats: {...
    'geo_pipeline/already_set': 106,
    'item_scraped_count': 106,

可以看到 GeoPipeline 和 RedisCache 都已经启用,并且 RedisCache 会首先进行。另外,还可以注意到 geo_pipeline/already_set 统计值是 106。这些是 GeoPipeline 从 Redis 缓存中找到的预先填充好的 item,并且它们都不需要请求 Google API 调用。如果 Redis 缓存为空,你会看到一些键依然会使用 Google API 进行处理。在性能方面,我们注意到 GeoPipeline 引发的初始行为现在没有了。实际上,由于目前使用了缓存,因此绕过了每秒 5 个请求的 API 限制。当使用 Redis 时,还应当考虑使用过期键,使系统可以周期性地刷新缓存数据。