使用REST API

REST 是一套用于创建现代 Web 服务的技术,其主要优点是比 SOAP 或专有 Web 服务机制更加简单,更加轻量级。软件开发人员观察发现,Web 服务经常提供的 CRUD(创建、读取、更新、删除[Create、 Read、 Update、 Delete])功能与 HTTP 基本操作(GET、POST、 PUT、 DELETE)具有相似性。另外,他们还发现典型的 Web 服务调用其所需的大部分信息时,都可以将其压缩到资源 URL 上。例如,http://api.mysite.com/customer/john 是一个资源 URL,它可以让我们确定目标服务器(api.mysite.com),实际上我正在尝试在服务器上执行和 customers(表)相关的操作,更具体的说就是执行和 john(行——主键)相关的操作。当它与其他 Web 概念(如安全认证、无状态、缓存、使用 XML 或 JSON 作为载荷等)结合时,能够通过一种强大而又简单、熟悉且可以轻松跨平台的方式,提供和使用 Web 服务。难怪 REST 可以掀起软件行业的一场风暴。

使用treq

treq 是一个 Python 包,相当于基于 Twisted 应用编写的 Python requests 包。它可以让我们轻松执行 GET、 POST 以及其他 HTTP 请求。想要安装该包,可以使用 pip install treq,不过它已经在我们的开发机中预先安装好了。我们更倾向于选择 treq 而不是 Scrapy 的 Request/crawler.engine.download() 的原因是,虽然它们都很简单,但是在性能上 treq 更有优势,我们将会在第 10 章中看到更详细的介绍。

用于写入Elasticsearch的管道

首先,我们要编写一个将 Item 存储到 ES(Elasticsearch)服务器的爬虫。你可能会觉得从 ES 开始,甚至先于 MySQL,作为持久化机制进行讲解有些不太寻常,不过其实它是我们可以做的最简单的事情。ES 可以是无模式的,也就是说无需任何配置就能够使用它。对于我们这个(非常简单的)用例来说,treq 也已经足够使用。如果想要使用更高级的 ES 功能,则需要考虑使用 txes2 或其他 Python/Twisted ES 包。

在我们的开发机中,已经包含正在运行的 ES 服务器了。下面登录到开发机中,验证其是否正在正常运行。

$ curl http://es:9200
{
    "name" : "Living Brain",
    "cluster_name" : "elasticsearch",
    "version" : { ... },
    "tagline" : "You Know, for Search"
}

在宿主机浏览器中,访问 http://localhost:9200 ,也可以看到同样的结果。当访问 http://localhost:9200/properties/property/_search 时,可以看到返回的响应表示 ES 进行了全局性的尝试,但是没有找到任何与房产信息相关的索引。恭喜你,刚刚已经使用了 ES 的 REST API。

在本章,我们将在 properties 集合中插入房产信息。你可能需要重置 properties 集合,此时可以使用 curl 执行 DELETE 请求:

$ curl -XDELETE http://es:9200/properties

本章中管道实现的完整代码包含很多额外的细节,如更多的错误处理等,不过我将通过凸显关键点的方式,保持这里的代码简洁。

本章在 ch09 目录当中,其中本示例的代码为 ch09/properties/properties/pipelines/es.py。

从本质上说,爬虫代码只包含如下 4 行。

@defer.inlineCallbacks
def process_item(self, item, spider):
    data = json.dumps(dict(item), ensure_ascii=False).encode("utf-8")
    yield treq.post(self.es_url, data)

其中,前两行用于定义标准的 process_item() 方法,可以在其中 yield 延迟操作(参考第 8 章)。

第 3 行用于准备要插入的 data。首先,我们将 Item 转化为字典。然后使用json.dumps() 将其编码为 JSON 格式。ensure_ascii=False 的目的是通过不转义非 ASCII 字符,使得输出更加紧凑。然后,将这些 JSON 字符串编码为 UTF-8,即 JSON 标准中的默认编码。

最后一行使用 treq 的 post() 方法执行 POST 请求,将文档插入到 ElasticSearch 中。es_url 存储在 settings.py 文件当中 (ES_PIPELINE_URL 设置),如 http://es:9200/properties/property ,可以提供一些基本信息,如 ES 服务器的 IP 和端口(es:9200)、集合名称(properties)以及想要写入的对象类型(property)。

要想启用该管道,需要将其添加到 settings.py 文件的 ITEM_PIPELINES 设置当中,并且使用 ES_PIPELINE_URL 设置进行初始化。

ITEM_PIPELINES = {
'properties.pipelines.tidyup.TidyUp': 100,
'properties.pipelines.es.EsWriter': 800,
}
ES_PIPELINE_URL = 'http://es:9200/properties/property'

完成上述工作后,我们可以进入到适当的目录当中。

$ pwd
/root/book/ch09/properties
$ ls
properties scrapy.cfg

然后,开始运行爬虫。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90
...
INFO: Enabled item pipelines: EsWriter...
INFO: Closing spider (closespider_itemcount)...
'item_scraped_count': 106,

如果现在再次访问 http://localhost:9200/properties/property/_search ,可以在响应的 hits/total 字段中看到已经插入的条目数量,以及前 10 条结果。我们还可以通过添加 ?size=100 参数取得更多结果。在搜索 URL 中添加 q=参数 时,可以在全部或特定字段中搜索指定关键词。最相关的结果将会出现在最前面。例如, http://localhost:9200/properties/property/_search?q=title:london ,将会返回标题中包含 "London" 的房产信息。对于更加复杂的查询,可以查阅 ES 的官方文档,网址为: https://www.elastic.co/guide/en/elasticsearch/reference/currenquery-dsl-query-string-query.html

ES 不需要配置的原因是它可以根据我们提供的第一个属性自动检测模式(字段类型)。通过访问 http://localhost:9200/properties/ ,可以看到其自动检测的映射关系。

让我们快速查看一下性能,使用上一章结尾处给出的方式重新运行 scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000。平均延时从 0.78 秒增长到 0.81 秒,这是因为管道的平均时间从 0.12 秒增长到了 0.15 秒。吞吐量仍然保持在每秒大约 25 个 Item。

使用管道将 Item 插入到数据库当中是不是一个好主意呢?答案是否定的。通常情况下,数据库提供的批量插入条目的方式可以有几个数量级的效率提升,因此我们应当使用这种方式。也就是说,应当将 Item 打包批量插入,或在爬虫结束时以后置处理的步骤执行插入。我们将在最后一章中看到这些方法。不过,许多人仍然使用 Item 管道插入数据库,此时使用 Twisted API 而不是通用/阻塞的方法实现该方案才是正确的方式。

使用Google Geocoding API实现地理编码的管道

每个房产信息都有地区名称,因此我们想对其进行地理编码,也就是说找到它们对应的坐标(经度、纬度)。我们可以使用这些坐标将房产信息放到地图上,或是根据它们到某个位置的距离对搜索结果进行排序。开发这种功能需要复杂的数据库、文本匹配以及空间计算。而使用 Google 的 Geocoding API,可以避免上面提到的几个问题。可以通过浏览器或 curl 打开下述 URL 以获取数据。

$ curl "https://maps.googleapis.com/maps/api/geocode/json?sensor=false&address=london"
{
    "results" : [
        ...
        "formatted_address" : "London, UK",
        "geometry" : {
            ...
            "location" : {
                "lat" : 51.5073509,
                "lng" : -0.1277583
            },
            "location_type" : "APPROXIMATE",
            ...
    ],
    "status" : "OK"
}

我们可以看到一个 JSON 对象,当搜索 "location" 时,可以很快发现 Google 提供的是伦敦中心坐标。如果继续搜索,会发现同一文档中还包含其他位置。其中,第一个坐标位置是最相关的。因此,如果存在 results[0].geometry.location 的话,它就是我们所需要的信息。

Google 的 Geocoding API 可以使用之前用过的技术(treq)进行访问。只需几行,就可以找出一个地址的坐标位置(查看 pipeline 目录的 geo.py 文件),其代码如下。

@defer.inlineCallbacks
def geocode(self, address):
    endpoint = 'http://web:9312/maps/api/geocode/json'

    parms = [('address', address), ('sensor', 'false')]
    response = yield treq.get(endpoint, params=parms)
    content = yield response.json()

    geo = content['results'][0]["geometry"]["location"]
    defer.returnValue({"lat": geo["lat"], "lon": geo["lng"]})

该函数使用了一个和前面用过的 URL 相似的 URL,不过在这里将其指向到一个假的实现,以使其执行速度更快,侵入性更小,可离线使用并且更加可预测。可以使用 endpoint = 'https://maps.googleapis.com/maps/api/geocode/json' 来访问 Google 的服务器,不过需要记住的是 Google 对请求有严格的限制。address 和 sensor 的值都通过 treq 的 get() 方法的 params 参数进行了自动 URL 编码。treq.get() 方法返回了一个延迟操作,我们对其执行 yield 操作,以便在响应可用时恢复它。对 response.json() 的第二个 yield 操作,用于等待响应体加载完成并解析为 Python 对象。此时,我们可以得到第一个结果的位置信息,将其格式化为字典后,使用 defer.returnValue() 返回,该方法是从使用 inlineCallbacks 的方法返回值的最适当的方式。如果任何地方存在问题,该方法会抛出异常,并通过 Scrapy 报告给我们。

通过使用 geocode(),process_item() 可以变为一行代码,如下所示。

item["location"] = yield self.geocode(item["address"][0])

我们可以在 ITEM_PIPELINES 设置中添加并启用该管道,其优先级数值应当小于 ES 的优先级数值,以便 ES 获取坐标位置的值。

ITEM_PIPELINES = {
    ...
    'properties.pipelines.geo.GeoPipeline': 400,

我们启用调试数据,运行一个快速的爬虫。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90 -L DEBUG
...
{'address': [u'Greenwich, London'],
...
'image_urls': [u'http://web:9312/images/i06.jpg'],
'location': {'lat': 51.482577, 'lon': -0.007659},
'price': [1030.0],
...

现在,可以看到 Item 中包含了 location 字段。太好了!不过当使用真实的 Google API 的 URL 临时运行它时,很快就会得到类似下面的异常。

File "pipelines/geo.py" in geocode (content['status'], address))
Exception: Unexpected status="OVER_QUERY_LIMIT" for
address="*London"

这是我们在完整代码中放入的一个检查,用于确保 Geocoding API 的响应中 status 字段的值是 OK。如果该值非真,则说明我们得到的返回数据不是期望的格式,无法被安全使用。在本例中,我们得到了 OVER_QUERY_LIMIT 状态,可以清楚地说明在什么地方发生了错误。这可能是我们在许多案例中都会面临的一个重要问题。由于 Scrapy 的引擎具备较高的性能,缓存和资源请求的限流成为了必须考虑的问题。

可以访问 Geocoder API 的文档来了解其限制:“免费用户 API:每 24 小时允许 2500 个请求,每秒允许 5 个请求”。即使使用了 Google Geocoding API 的付费版本,仍然会有每秒 10 个请求的限流,这就意味着该讨论仍然是有意义的。

下面的实现看起来可能会比较复杂,但是它们必须在上下文中进行判断。而在典型的多线程环境中创建此类组件需要线程池和同步,这样就会产生更加复杂的代码。

下面是使用 Twisted 技术实现的一个简单而又足够好用的限流引擎。

class Throttler(object):
    def __init__(self, rate):
        self.queue = []
        self.looping_call = task.LoopingCall(self._allow_one)
        self.looping_call.start(1. / float(rate))

    def stop(self):
        self.looping_call.stop()

    def throttle(self):
        d = defer.Deferred()
        self.queue.append(d)
        return d

    def _allow_one(self):
        if self.queue:
        self.queue.pop(0).callback(None)

该代码中,延迟操作排队进入列表中,每次调用 _allow_one() 时依次触发它们; _allow_one() 检查队列是否为空,如果不是,则调用最旧的延迟操作的 callback()(先入先出,FIFO)。我们使用 Twisted 的 task.LoopingCall() API 周期性调用 _allow_one()。使用 Throttler 非常简单。我们可以在管道的 __init__ 中对其进行初始化,并在爬虫结束时对其进行清理。

class GeoPipeline(object):
    def __init__(self, stats):
        self.throttler = Throttler(5) # 5 Requests per second

    def close_spider(self, spider):
        self.throttler.stop()

在使用想要限流的资源之前(在本例中为在 process_item() 中调用 geocode()),需要对限流器的 throttle() 方法执行 yield 操作。

yield self.throttler.throttle()
item["location"] = yield self.geocode(item["address"][0])

在第一个 yield 时,代码将会暂停,等待足够的时间过去之后再恢复。比如,某个时刻共有 11 个延迟操作在队列中,我们的速率限制是每秒 5 个请求,我们的代码将会在队列清空时恢复,大约为 11/5=2.2 秒。

使用 Throttler 后,我们不再会发生错误,但是爬虫速度会变得非常慢。通过观察发现,示例的房产信息中只有有限的几个不同位置。这是使用缓存的一个非常好的机会。我们可以使用一个简单的 Python 字典来实现缓存,不过这种情况下将会产生竞态条件,导致不正确的 API 调用。下面是一个没有该问题的缓存,此外还演示了一些 Python 和 Twisted 的有趣特性。

class DeferredCache(object):
    def __init__(self, key_not_found_callback):
        self.records = {}
        self.deferreds_waiting = {}
        self.key_not_found_callback = key_not_found_callback

    @defer.inlineCallbacks
    def find(self, key):
        rv = defer.Deferred()

        if key in self.deferreds_waiting:
            self.deferreds_waiting[key].append(rv)
        else:
            self.deferreds_waiting[key] = [rv]

            if not key in self.records:
            try:
                value = yield self.key_not_found_callback(key)
                self.records[key] = lambda d: d.callback(value)
            except Exception as e:
                self.records[key] = lambda d: d.errback(e)

            action = self.records[key]
            for d in self.deferreds_waiting.pop(key):
                reactor.callFromThread(action, d)

        value = yield rv
        defer.returnValue(value)

该缓存看起来和人们通常期望的有些不同。它包含两个组成部分。

  • self.deferreds_waiting:这是一个延迟操作的队列,等待指定键的值。

  • self.records: 这是已经出现的键-操作对的字典。

如果查看 find() 实现的中间部分,就会发现如果没有在 self.records 中找到一个键,则会调用一个预定义的 callback 函数,取得缺失值(yield self.key_not_found_callback(key))。该回调函数可能会抛出一个异常。我们要如何在 Python 中以紧凑的方式存储这些值或异常呢?由于 Python 是一种函数式语言,我们可以根据是否出现异常,在 self.records 中存储调用延迟操作的 callback 或 errback 的小函数(lambda)。在定义时,该值或异常被附加到 lambda 函数中。函数中对变量的依赖被称为闭包,这是大多数函数式编程语言最显著和强大的特性之一。

缓存异常有些不太常见,不过这意味着如果在第一次查找某个键时,key_not_found_callback(key) 抛出了异常,那么接下来对相同键再次查询时仍然会抛出同样的异常,不需要再执行额外的调用。

find() 实现的剩余部分提供了避免竞态条件的机制。如果要查询的键已经在进程当中, 将会在 self.deferreds_waiting 字典中有记录。在这种情况下,我们不再额外调用 key_not_found_callback(),只是添加到延迟操作列表中,等待该键。

当 key_not_found_callback() 返回,并且该键的值变为可用时,触发每个等待该键的延迟操作。我们可以直接执行 action(d),而不是使用 reactor.callFromThread(),不过这样就必须处理所有抛出的异常,并且会创建一个不必要的长延迟链。

使用缓存非常简单。只需在 __init__() 中对其初始化,并在执行 API 调用时设置回调函数即可。在 process_item() 中,按照如下代码使用缓存。

def __init__(self, stats):
    self.cache = DeferredCache(self.cache_key_not_found_callback)

@defer.inlineCallbacks
def cache_key_not_found_callback(self, address):
    yield self.throttler.enqueue()
    value = yield self.geocode(address)
    defer.returnValue(value)

@defer.inlineCallbacks
def process_item(self, item, spider):
    item["location"] = yield self.cache.find(item["address"][0])
    defer.returnValue(item)

本例的完整代码包含了更多的错误处理代码,能够对限流导致的错误重试调用(一个简单的 while 循环),并且还包含了更新爬虫状态的代码。

本例的完整代码文件地址为:ch09/properties/properties/pipelines/geo2.py。

要想启用该管道,需要禁用(注释掉)之前的实现,并且在 settings.py 文件的 ITEM_PIPELINES 中添加如下代码。

ITEM_PIPELINES = {
    'properties.pipelines.tidyup.TidyUp': 100,
    'properties.pipelines.es.EsWriter': 800,
    # DISABLE 'properties.pipelines.geo.GeoPipeline': 400,
    'properties.pipelines.geo2.GeoPipeline': 400,
}

然后,可以按照如下代码运行该爬虫。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 15.8 items/s, avg latency: 1.74 s and avg time in pipelines:
0.94 s
Scraped... 32.2 items/s, avg latency: 1.76 s and avg time in pipelines:
0.97 s
Scraped... 25.6 items/s, avg latency: 0.76 s and avg time in pipelines:
0.14 s
...
: Dumping Scrapy stats:...
    'geo_pipeline/misses': 35,
    'item_scraped_count': 1019,

可以看到,爬取延时最初由于填充缓存的原因非常高,但是很快就回到了之前的值。统计显示总共有 35 次未命中,这正是我们所用的示例数据集内不同位置的数量。显然,在本例中总共有 1019 - 35 = 984 次命中缓存。如果使用真实的 Google API,并将每秒对 API 的请求数量稍微增加,比如通过将 Throttler(5) 改为 Throttler(10), 把每秒请求数从 5 增加到 10,就会在 geo_pipeline/retries 统计中得到重试的记录。如果发生任何错误,比如使用 API 无法找到一个位置,将会抛出异常,并且会在 geo_pipeline/errors 统计中被捕获到。如果某个位置的坐标已经被设置(后面的小节中看到),则会在 geo_pipeline/already_set 统计中显示。最后,当访问 http://localhost:9200/properties/property/_search ,查看房产信息的 ES 时,可以看到包含坐标位置值的条目,比如 {…​"location": {"lat": 51.5269736, "lon":-0.0667204}…​} ,这和我们所期望的一样(在运行之前清理集合,确保看到的不是旧值)。

在Elasticsearch中启用地理编码索引

既然已经拥有了坐标位置,现在就可以做一些事情了,比如根据距离对结果进行排序。下面是一个 HTTP POST 请求(使用 curl 执行),返回标题中包含 "Angel" 的房产信息,并按照它们与点 {51.54, -0.19} 的距离进行排序。

$ curl http://es:9200/properties/property/_search -d '{
"query" : {"term" : { "title" : "angel" } },
"sort": [{"_geo_distance": {
    "location": {"lat": 51.54, "lon": -0.19},
    "order": "asc",
    "unit": "km",
    "distance_type": "plane"
}}]}'

唯一的问题是当尝试运行它时,会发现运行失败,并得到了一个错误信息:"failed to find mapper for [location] for geo distance based sort"。这说明位置字段并不是执行空间操作的适当格式。要想设置为合适的类型,则需要手动重写其默认类型。首先,将其自动检测的映射关系保存到文件中。

$ curl 'http://es:9200/properties/_mapping/property' > property.txt

然后编辑 property.txt 的如下代码。

"location":{"properties":{"lat":{"type":"double"},"lon":{"type":"double"}}}

将该行的代码修改为如下代码。

"location": {"type": "geo_point"}

另外,我们还删除了文件尾部的 {"properties":{"mappings":and two }}。对该文件的修改到此为止。现在可以按如下代码删除旧类型,使用指定的模式创建新类型。

$ curl -XDELETE 'http://es:9200/properties'
$ curl -XPUT 'http://es:9200/properties'
$ curl -XPUT 'http://es:9200/properties/_mapping/property' --data @property.txt

现在可以再次运行该爬虫,并且可以重新运行本节前面的 curl 命令,此时将会得到按照距离排序的结果。我们的搜索返回了房产信息的 JSON,额外包含了一个 sort 字段,该字段的值是到搜索点的距离,单位为千米。