Item Pipeline 的使用

在前面的章节,我们初步介绍了 Item Pipeline 的作用,本节我们再详细了解一下它的用法。

Item Pipeline 即项目管道,它的调用发生在 Spider 产生 Item 之后。当 Spider 解析完 Response,Item 就会被 Engine 传递到 ItemPipeline,被定义的 ItemPipeline 组件会顺次被调用,完成一连串的处理过程,比如数据清洗、存储等。

Item Pipeline 的主要功能如下。

  • 清洗 HTML 数据。

  • 验证爬取数据,检查爬取字段。

  • 查重并丢弃重复内容。

  • 将爬取结果储存到数据库中。

核心方法

我们可以自定义 Item Pipeline,只需要实现指定的方法就好,其中必须实现的一个方法是:

  • process_item(item,spider)

另外还有几个比较实用的方法,它们分别是:

  • open_spider(spider)

  • close_spider(spider)

  • from_crawler(cls, crawler)

下面我们对这几个方法的用法进行详细介绍。

process_item(item, spider)

process_item 是必须实现的方法,被定义的 Item Pipeline 会默认调用这个方法对 Item 进行处理,比如进行数据处理或者将数据写入数据库等操作。process_item 方法必须返回 Item 类型的值或者抛出一个 DropItem 异常。

process_item 方法的参数有两个。

  • item:Item 对象,即被处理的 Item。

  • spider:Spider 对象,即生成该 Item 的 Spider。

该方法的返回类型如下。

  • 如果返回的是 Item 对象,那么此 Item 会接着被低优先级的 Item Pipeline 的 process_item 方法处理,直到所有的方法被调用完毕。

  • 如果抛出 DropItem 异常,那么此 Item 就会被丢弃,不再进行处理。

open_spider(self, spider)

open_spider 方法是在 Spider 开启的时候被自动调用的,在这里我们可以做一些初始化操作,如开启数据库连接等。其中参数 spider 就是被开启的 Spider 对象。

close_spider(spider)

close_spider 方法是在 Spider 关闭的时候自动调用的,在这里,我们可以做些收尾工作,如关闭数据库连接等,其中参数 spider 就是被关闭的 Spider 对象。

from_crawler(cls, crawler)

from_crawler 方法是一个类方法,用 @classmethod 标识,它接收一个参数 crawler。通过 crawler 对象,我们可以拿到 Scrapy 的所有核心组件,如全局配置的每个信息。然后可以在这个方法里面创建一个 Pipeline 实例参数 cls 就是 Class,最后返回一个 Class 实例。

下面我们用一个实例来加深对 Item Pipeline 用法的理解。

本节目标

本节我们要爬取的目标网站是 https://ssr1.scrape.center/ ,我们需要把每部电影的名称,类别,评分、简介、导演、演员的信息以及相关图片爬取下来,同时把每部电影的导演、演员的相关图片保存成一个文件夹:并将每部电影的完整数据保存到 MongoDB 和 Elasticsearch 里。

这里使用 Scrapy 来实现这个电影数据爬虫,主要是为了了解 Item Pipeline 的用法。我们会使用 Item Pipeline 分别实现 MongoDB 存储,Elasticsearch 存储、Image 图片存储这 3 个 Pipeline。

在开始之前,请确保已经安装好 MongoDB 和 Elasticsearch,另外安装好 Python 的 PyMongo、Elasticsearch、Scrapy 包,安装参考如下。

做好如上准备工作之后,我们就可以开始本节的实战练习了。

实战

我们之前已经分析过此站点的页面逻辑了,在此就不再逐一分析了,直接上手用 Scrapy 编写此站点的爬虫:同时实现几个 Item Pipeline。

首先新建一个项目:我们取名为 scrapyitempipelinedemo,命令如下:

scrapy startproject scrapyitempipelinedemo

接下来新建一个 Spider,命令如下:

scrapy genspider scrapy ssr1.scrape.center

这样我们就成功创建了一个 Spider,名字为 scrape,允许爬取的域名为 ssr1.scrape.center。

接下来我们来实现列表页的爬取。本站点一共有 10 页数据,所以我们可以新建 10 个初始请求,实现 start_requests 方法的代码如下:

from scrapy import Request, Spider

class ScrapeSpider(Spider):
    name = 'scrape'
    allowed_domains = ['ssr1.scrape.center']
    base_url = 'https://ssr1.scrape.center'
    max_page = 10

    def start_requests(self):
        for i in range(1, self.max_page + 1):
            url = f'{self.base_url}/page/{i}'
            yield Request(url, callback=self.parse_index)

    def parse_index(self, response):
        print(response)

在这里我们声明了 max_page 即最大翻页数量,然后实现了 start_requests 方法,构造了 10 个初始请求分别爬取每一个列表页,Request 对应的回调方法修改为了 parse_index,最后我们暂时在 parse_index 方法里面打印输出了 resposne 对象。

运行这个 Spider 的命令如下:

scrapy crawl scrape

运行结果类似如下:

2024-01-31 10:06:57 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/1> (referer: None)
<200 https://ssr1.scrape.center/page/1>
2024-01-31 10:06:57 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/7> (referer: None)
<200 https://ssr1.scrape.center/page/7>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/9> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/3> (referer: None)
<200 https://ssr1.scrape.center/page/9>
<200 https://ssr1.scrape.center/page/3>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/4> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/10> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/5> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/8> (referer: None)
<200 https://ssr1.scrape.center/page/4>
<200 https://ssr1.scrape.center/page/10>
<200 https://ssr1.scrape.center/page/5>
<200 https://ssr1.scrape.center/page/8>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/2> (referer: None)
<200 https://ssr1.scrape.center/page/2>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/6> (referer: None)
<200 https://ssr1.scrape.center/page/6>

可以看到对应的列表页的数据就被爬取下来了,Response 的状态码为 200。

接着我们可以在 parse_index 方法里对 response 的内容进行解析,提取每部电影的详情页链接,通过审查源代码可以发现,其标题对应的 CSS 选择器为 .item .name,如图 15-9 所示。

(图略)

所以这里我们可以借助 response 的 css 方法进行提取,提取链接之后生成详情页的 Request。可以把 parse_index 方法改写如下:

def parse_index(self, response):
    for item in response.css('.item'):
        href = item.css('.name::attr(href)').extract_first()
        url = response.urljoin(href)
        yield Request(url, callback=self.parse_detail)

def parse_detail(self, response):
    print(response)

在这单我们首先筛选了每部电影对应的节点,即 .item,然后遍历这些节点提取其中的 .name 选择器对应的详情页链接,接着通过 response 的 urljoin 方法拼接成完整的详情页 URL,最后构造新的详情页 Request,回调方法设置为 parse_detail,同时在 parse_detail 方法里面打印输出 response。

重新运行,我们可以看到详情页的内容就被爬取下来了,类似的输出如下:

2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/10> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/10>
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/3> (referer: https://ssr1.scrape.center/page/1)
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/9> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/3>
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/8> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/9>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/1> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/8>
<200 https://ssr1.scrape.center/detail/1>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/29> (referer: https://ssr1.scrape.center/page/3)
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/5> (referer: https://ssr1.scrape.center/page/1)
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/2> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/29>
<200 https://ssr1.scrape.center/detail/5>
<200 https://ssr1.scrape.center/detail/2>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/6> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/6>

其实现在 parse_detail 里面的 response 就是详情页的内容了,我们可以进一步对详情页的内容进行解析,提取每部电影的名称、类别、评分,简介、导演、演员等信息。

首先让我们新建一个 Item,叫作 MovieItem,定义如下:

from scrapy import Item, Field


class MovieItem(Item):
    name = Field()
    categories = Field()
    drama = Field()
    score = Field()
    directors = Field()
    actors = Field()

这里我们定义的几个字段 name、categories、score、drama、directors、actors 分别代表电影名称,类别、评分,简介,导演,演员。接下来我们就可以提取详情页了,修改 parse_detail 方法如下:

def parse_detail(self, response):
    item = MovieItem()
    item['name'] = response.xpath('//div[contains(@class, "item")]//h2/text()').extract_first()
    item['categories'] = response.xpath('//button[contains(@class, "category")]/span/text()').extract()
    item['score'] = response.css('.score::text').re_first('[\d\.]+')
    item['drama'] = response.css('.drama p::text').extract_first().strip()
    item['directors'] = []
    directors = response.xpath('//div[contains(@class, "directors")]//div[contains(@class, "director")]')
    for director in directors:
        director_image = director.xpath('.//img[@class="image"]/@src').extract_first()
        director_name = director.xpath('.//p[contains(@class, "name")]/text()').extract_first()
        item['directors'].append({
            'name': director_name,
            'image': director_image
        })
    item['actors'] = []
    actors = response.css('.actors .actor')
    for actor in actors:
        actor_image = actor.css('.actor .image::attr(src)').extract_first()
        actor_name = actor.css('.actor .name::text').extract_first()
        item['actors'].append({
            'name': actor_name,
            'image': actor_image
        })
    yield item

在这里我们首先创建了一个 MovieItem 对象,赋值为 item。然后我们使用 xpath 方法提取了 name、categories 两个字段。为了让大家不仅仅掌握 xpath 的提取方式,我们还使用 CSS 选择器提取了 score 和 drama 字段,同时 score 字段最后还调用了 re_first 方法传人正则表达式提取了分数的内容,对于导演 directors 和演员 actors,我们首先提取了单个 director 和 actor 节点,然后分别从中提取了姓名和照片,最后组合成一个列表赋值给 directors 和 actors 字段。

重新运行一下,可以发现提取结果类似如下:

2024-01-31 10:23:50 [scrapy.core.scraper] DEBUG: Scraped from <200 https://ssr1.scrape.center/detail/21>
{'actors': [{'image': 'https://p1.meituan.net/moviemachine/d1156c14dd899ada7c2b98bc373021c852875.jpg@128w_170h_1e_1c',
             'name': '克林特·伊斯特伍德'},
            {'image': 'https://p0.meituan.net/movie/665eab6fdb7755138e0c8092f35ba39327553.jpg@128w_170h_1e_1c',
             'name': '李·范·克里夫'},
            {'image': 'https://p1.meituan.net/movie/275042f2bbe012263b8deed1c96e611b42623.jpg@128w_170h_1e_1c',
             'name': '埃里·瓦拉赫'},
            {'image': 'https://p1.meituan.net/movie/3c91cd5186e89e927056adbc8a722f5014760.jpg@128w_170h_1e_1c',
             'name': '路易吉·皮斯蒂利'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Claudio Scarchilli'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'John Bartha'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Livio Lorenzon'},
            {'image': 'https://p0.meituan.net/movie/69698bc0960b07e3acddd412f3b88ee821953.jpg@128w_170h_1e_1c',
             'name': '贝尼托·斯特凡内利'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Angelo Novi'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': '安东尼奥·卡萨斯'},
            {'image': 'https://p0.meituan.net/movie/a608b42289f39ebd73e7550c38b3be0430595.jpg@128w_170h_1e_1c',
             'name': '阿尔多·桑布雷利'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Al Mulock'},
            {'image': 'https://p0.meituan.net/movie/59d0f1cb25db1e1ed6d57cf1c163da9e36698.jpg@128w_170h_1e_1c',
             'name': 'Sergio Mendizábal'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Antonio Molino Rojo'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Lorenzo Robledo'},
            {'image': 'https://p0.meituan.net/movie/5d53dde873a6c1570d3c934c8a146fe314431.jpg@128w_170h_1e_1c',
             'name': '马里奥·布雷加'},
            {'image': 'https://p0.meituan.net/movie/2f87d0b53695ba7fe2d1f0ed47f5998112368.jpg@128w_170h_1e_1c',
             'name': '弗兰克·布拉尼亚'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Luigi Ciavarro'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Jesús Guzmán'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Víctor Israel'},
            {'image': 'https://p1.meituan.net/movie/c0b9c3bbec6fdb7026c412ec85150e9e20784.jpg@128w_170h_1e_1c',
             'name': '纳扎雷诺·纳塔莱'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Ricardo Palacios'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Romano Puppo'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Antonio Ruiz'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Román Ariznavarreta'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'José Terrón'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Saturno Cerra'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Enrique Santiago'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Fortunato Arena'},
            {'image': 'https://p1.meituan.net/movie/9bdf1740721372b72f1fb6040ab34a3b7406.jpg@128w_170h_1e_1c',
             'name': '阿蒂利奥·罗戴德约'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Veriano Genesi'},
            {'image': 'https://p1.meituan.net/movie/24473db1b02483d777a08270d4faa99c30173.jpg@128w_170h_1e_1c',
             'name': '阿尔多·久弗瑞'},
            {'image': 'https://p0.meituan.net/movie/2aee47645d238cdbfa48e167e6fbecb88474.jpg@128w_170h_1e_1c',
             'name': 'Rada Rassimov'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Enzo Petito'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Antonio Casale'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Sandro Scarchilli'},
            {'image': 'https://p1.meituan.net/movie/41ae6b3e57bf9e4123089180d755507a36295.jpg@128w_170h_1e_1c',
             'name': '切诺·阿隆索'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'William Conroy'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Aysanoa Runachagua'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Franco Doria'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Amerigo Castrighella'},
            {'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
             'name': 'Tony Di Mitri'}],
 'categories': ['西部', '冒险'],
 'directors': [{'image': 'https://p0.meituan.net/movie/bc6d42116220a072db01371208b5157815631.jpg@128w_170h_1e_1c',
                'name': '赛尔乔·莱翁内'}],
 'drama': '故事发生在美国南北战争时期。图科(埃里·瓦拉赫 饰)是一个图财害命的江洋大盗,因此他被镇上悬赏通缉。布兰迪(克林特·伊斯特伍德  '
          '饰)是一个除暴安良的牛仔,他无意中抓住了图科,但嫌赏金不够又掳走了他。在荒漠中,布兰迪惩罚图科让其自生自灭。但是诡诈的图科居然逃过了一劫,并纠集一些帮凶在客栈捉住了布兰迪 。正当图科以牙还牙折磨布兰迪的时候,他劫持了一个名叫卡森的士兵。后者临死前留下了宝藏的秘密,图科和布兰迪分别获得了一半信息。与此同时,一个狡猾的杀手桑坦萨(李·范·克里夫 '
          '饰)也通过其他渠道发现了宝藏的秘密。于是,在寻宝的道路上,三个人使出浑身解数,上演了一场场对决的好戏……',
 'name': '黄金三镖客 - Il buono, il brutto, il cattivo.',
 'score': '9.1'}

可以看到这里我们已经成功提取了各个字段然后生成了 MovieItem 对象了。

下一步就是本节的重点内容了,我们需要把当前爬取到的内容存储到 MongoDB 和 Elasticsearch 中、然后将导演和演员的图片也下载下来。

要实现这个操作,我们需要创建 3 个 Item Pipeline,其中两个分别用来将数据存储到 MongoDB、Elasticseareh,另外下一个用来下载图片。

MongoDB

之前我们已经实现过 MongoDB 相关的 Pipeline 了,这里我们再简略说一下。

首先确保 MongoDB 已经安装并且正常运行,既可以运行在本地,也可以运行在远程,我们需要把它的连接字符串构造好:连接字符串的格式如下:

mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]

比如运行在本地 27017 端口的无密码的 MongoDB 可以直接写为:

mongodb://localhost:27017

如果是远程 MongoDB,可以根据用户名,密码、地址、端口等构造。

我们实现一个 MongoDBPipeline,将信息保存到 MongoDB,在 pipelines.py 里添加如下类的实现:

class MongoDBPipeline(object):

    @classmethod
    def from_crawler(cls, crawler):
        cls.connection_string = crawler.settings.get('MONGODB_CONNECTION_STRING')
        cls.database = crawler.settings.get('MONGODB_DATABASE')
        cls.collection = crawler.settings.get('MONGODB_COLLECTION')
        return cls()

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.connection_string)
        self.db = self.client[self.database]

    def process_item(self, item, spider):
        self.db[self.collection].update_one({
            'name': item['name']
        }, {
            '$set': dict(item)
        }, True)
        return item

    def close_spider(self, spider):
        self.client.close()

这里我们首先利用 from_crawler 获取了全局配置 MONGODB_CONNECTION_STRING、MONGODB_DATABASE 和 MONGODB_COLLECTION,即 MongoDB 连接字符串,数据库名称,集合名词,然后将三者赋值为类属性。

接着我们实现了 open_spider 方法,该方法就是利用 from_crawler 赋值的 connection_string 创建一个 MongoDB 连接对象,然后声明数据库操作对象,close_spider 则是在 Spider 运行结束时关闭 MongoDB 连接。

接着最重要的就是 process_item 方法了,这个方法接收的参数 item 就是从 Spider 生成的 Item 对象,该方法需要将此 Item 存储到 MongoDB 中。这里我们使用了 update_one 方法实现了存在即更新,不存在则插人的功能。

接下来我们需要在 settings.py 里添加 MONGODB_CONNECTION_STRING、MONGODB_DATABASE 和 MONGODB_COLLECTION 这 3 个变量,相关代码如下:

MONGODB_CONNECTION_STRING = os.getenv('MONGODB_CONNECTION_STRING')
MONGODB_DATABASE = 'movies'
MONGODB_COLLECTION = 'movies'

这里可以将 MONGODB_CONNECTION_STRING 设置为从环境变量中读取,而不用将明文将密码等信息写到代码里。

如果是本地无密码的 MongoDB,直接写为如下内容即可:

MONGODB_CONNECTION_STRING = 'mongodb://localhost:27017' # or just use 'localhost'

这样,一个保存到 MongoDB 的 Pipeline 就创建好了,利用 process_item 方法我们即可完成数据插人到 MongoDB 的操作,最后会返回 Item 对象。

Elasticsearch

存储到 Elasticsearch 也是一样,我们需要先创建一个 Pipeline,代码实现如下:

class ElasticsearchPipeline(object):

    @classmethod
    def from_crawler(cls, crawler):
        cls.connection_string = crawler.settings.get('ELASTICSEARCH_CONNECTION_STRING')
        cls.index = crawler.settings.get('ELASTICSEARCH_INDEX')
        return cls()

    def open_spider(self, spider):
        self.conn = Elasticsearch([self.connection_string])
        if not self.conn.indices.exists(self.index):
            self.conn.indices.create(index=self.index)

    def process_item(self, item, spider):
        self.conn.index(index=self.index, body=dict(item), id=hash(item['name']))
        return item

    def close_spider(self, spider):
        self.conn.transport.close()

这里同样定义了 ELASTICSEARCH_CONNECTION_STRING 代表 Elasticsearch 的连接字符串,ELASTICSEARCH_INDEX 代表索引名称,具体初始化的操作和 MongoDBPipeline 的原理是类似的。

在 process_item 方法中,我们调用了 index 方法对数据进行索引,我们指定了 3 个参数,第一个参数 index 代表索引名称,第二个参数 body 代表数据对象,在这里我们将 Item 转为了字典类型,第三个参数 id 则是索引数据的 id,这里我们直接使用电影名称的 hash 值作为 id,或者自行指定其他 id 也可以的。

同样地,我们需要在 settings.py 里面添加 ELASTICSEARCH_CONNECTION_STRING 和 ELASTICSEARCH_INDEX:

ELASTICSEARCH_CONNECTION_STRING = os.getenv('ELASTICSEARCH_CONNECTION_STRING')
ELASTICSEARCH_INDEX = 'movies'

这里的 ELASTICSEARCH_CONNECTION_STRING 同样是从环境变量中读取的,它的格式如下:

http[s]://[username:password@]host[:port]

比如我实际使用的 ELASTICSEARCH_CONNECTION_STRING 值就类似:

https://user:password@es.cuiqingcai.com:9200

这里你可以根据实际情况更换成你的连接字待量,这样 ElasticsearchPipeline 就完成了。

Image Pipeline

Scrapy 提供了专门处理下载的 Pipeline,包括文件下载和图片下载。下载文件和图片的原理与抓取页面的原理一样,因此下载过程支持异步和多线程,十分高效。下面我们来看看具体的实现过程。

首先定义存储文件的路径,需要定义一个 IMAGES_STORE 变量,在 settings.py 中添加如下代码:

IMAGES_STORE = './images'

在这里我们将路径定义为当前路径下的 images 子文件夹,即下载的图片都会保存到本项目的 images 文件夹中。

内置的 ImagesPipeline 会默认读取 Item 的 image_urls 字段,并认为它是列表形式,接着遍历该字段后取出每个 URL 进行图片下载。

但是现在生成的 Item 的图片链接字段并不是 image_urls 字段表示的,我们是想下载 directors 和 actors 的每张图片。所以为了实现下载,我们需要重新定义下载的部分逻辑,即自定义 ImagePipeline 继承内置的 ImagesPipeline,重写几个方法。

我们定义的 ImagePipeline 代码如下:

from scrapy import Request
from scrapy.exceptions import DropItem
from scrapy.pipelines.images import ImagesPipeline


class ImagePipeline(ImagesPipeline):
    def file_path(self, request, response=None, info=None):
        movie = request.meta['movie']
        type = request.meta['type']
        name = request.meta['name']
        file_name = f'{movie}/{type}/{name}.jpg'
        return file_name

    def item_completed(self, results, item, info):
        image_paths = [x['path'] for ok, x in results if ok]
        if not image_paths:
            raise DropItem('Image Downloaded Failed')
        return item

    def get_media_requests(self, item, info):
        for director in item['directors']:
            director_name = director['name']
            director_image = director['image']
            yield Request(director_image, meta={
                'name': director_name,
                'type': 'director',
                'movie': item['name']
            })

        for actor in item['actors']:
            actor_name = actor['name']
            actor_image = actor['image']
            yield Request(actor_image, meta={
                'name': actor_name,
                'type': 'actor',
                'movie': item['name']
            })

在这里我们实现了 ImagePipeline,继承 Scrapy 内置的 ImagesPipeline,重写下面几个方法。

  • get_media_requests:第一个参数 item 是爬取生成的 Item 对象,我们要下载的图片链接保存在 Item 的 directors 和 actors 每个元素的 image 字段中。所以我们将 URL 逐个取出,然后构造 Request 发起下载请求。同时我们指定了 meta 信息,方便构造图片的存储路径,以便在下载完成时使用。

  • file_path:第一个参数 request 就是当前下载对应的 Request 对象。这个方法用来返回保存的文件名,在这里我们获取了刚才生成的 Request 的 meta 信息,包括 movie(电影名称)、type(电影类型)和 name(导演或演员姓名),最终三者拼合为 file_name 作为最终的图片路径。

  • item_completed:单个 Item 完成下载时的处理方法。因为并不是每张图片都会下载成功,所以我们需要分析下载结果并剔除下载失败的图片。如果某张图片下载失败,那么我们就不需将此 Item 保存到数据库。item_completed 方法的第一个参数 results 就是该 Item 对应的下载结果,它是一个列表,列表的每个元素是一个元组,其中包含了下载成功或失败的信息。这里我们遍历下载结果,找出所有成功的下载列表。如果列表为空,那么该 Item 对应的图片下载失败,随即抛出 DropItem 异常,忽略该 Item;否则返回该 Item,说明此 Item 有效。

现在为止,3 个 Item Pipeline 的定义就完成了。最后只需要启用就可以了,修改 settings.py,设置 ITEM_PIPELINES 的代码如下所示:

ITEM_PIPELINES = {
    'scrapyitempipelinedemo.pipelines.ImagePipeline': 300,
    'scrapyitempipelinedemo.pipelines.MongoDBPipeline': 301,
    'scrapyitempipelinedemo.pipelines.ElasticsearchPipeline': 302,
}

这里要注意调用的顺序。我们需要优先调用 ImagePipeline 对 Item 做下载后的筛选,下载失败的 Item 就直接忽略,它们不会保存到 MongoDB 和 MySQL 里。随后再调用其他两个存储的 Pipeline,这样就能确保存人数据库的图片都是下载成功的。

接下来运行程序,执行爬取,命令如下所示:

scrapy crawl images

爬虫一边爬取一边下载,速度非常快,对应的输出日志如图 15-10 所示。

(图略)

查看本地 images 文件夹,发现图片都已经成功下载,如图15-11所示。

(图略)

可以看到图片已经分路径存储了,一部电影一个文件夹,演员和导演分二级文件夹,图片名直接以演员和导演名命名。

然后我们用 Kibana 查看 Elasticsearch,相应的电影数据也成功存储,如图 15-12 所示。

(图略)

查看 MongoDB,下载成功的图片信息同样已成功保存,如图 15-13 所示,

(图略)

这样我们就可以成功实现图片的下载并把图片的信息存人数据库了。

总结

Item Pipeline 是 Scrapy 非常重要的组件,数据存储几乎都是通过此组件实现的,请认真掌握此内容。