Item Pipeline 的使用
在前面的章节,我们初步介绍了 Item Pipeline 的作用,本节我们再详细了解一下它的用法。
Item Pipeline 即项目管道,它的调用发生在 Spider 产生 Item 之后。当 Spider 解析完 Response,Item 就会被 Engine 传递到 ItemPipeline,被定义的 ItemPipeline 组件会顺次被调用,完成一连串的处理过程,比如数据清洗、存储等。
Item Pipeline 的主要功能如下。
-
清洗 HTML 数据。
-
验证爬取数据,检查爬取字段。
-
查重并丢弃重复内容。
-
将爬取结果储存到数据库中。
核心方法
我们可以自定义 Item Pipeline,只需要实现指定的方法就好,其中必须实现的一个方法是:
-
process_item(item,spider)
另外还有几个比较实用的方法,它们分别是:
-
open_spider(spider)
-
close_spider(spider)
-
from_crawler(cls, crawler)
下面我们对这几个方法的用法进行详细介绍。
process_item(item, spider)
process_item 是必须实现的方法,被定义的 Item Pipeline 会默认调用这个方法对 Item 进行处理,比如进行数据处理或者将数据写入数据库等操作。process_item 方法必须返回 Item 类型的值或者抛出一个 DropItem 异常。
process_item 方法的参数有两个。
-
item:Item 对象,即被处理的 Item。
-
spider:Spider 对象,即生成该 Item 的 Spider。
该方法的返回类型如下。
-
如果返回的是 Item 对象,那么此 Item 会接着被低优先级的 Item Pipeline 的 process_item 方法处理,直到所有的方法被调用完毕。
-
如果抛出 DropItem 异常,那么此 Item 就会被丢弃,不再进行处理。
open_spider(self, spider)
open_spider 方法是在 Spider 开启的时候被自动调用的,在这里我们可以做一些初始化操作,如开启数据库连接等。其中参数 spider 就是被开启的 Spider 对象。
本节目标
本节我们要爬取的目标网站是 https://ssr1.scrape.center/ ,我们需要把每部电影的名称,类别,评分、简介、导演、演员的信息以及相关图片爬取下来,同时把每部电影的导演、演员的相关图片保存成一个文件夹:并将每部电影的完整数据保存到 MongoDB 和 Elasticsearch 里。
这里使用 Scrapy 来实现这个电影数据爬虫,主要是为了了解 Item Pipeline 的用法。我们会使用 Item Pipeline 分别实现 MongoDB 存储,Elasticsearch 存储、Image 图片存储这 3 个 Pipeline。
在开始之前,请确保已经安装好 MongoDB 和 Elasticsearch,另外安装好 Python 的 PyMongo、Elasticsearch、Scrapy 包,安装参考如下。
-
MongoDB: https://setup.scrape.center/mongodb
-
PyMongo: https://setup.scrape.center/pymongo
-
Elasticsearch: https://setup.scrape.center/elasticsearch
-
ElasticsearchPython 包: https://setup.scrape.center/elasticsearch-py
做好如上准备工作之后,我们就可以开始本节的实战练习了。
实战
我们之前已经分析过此站点的页面逻辑了,在此就不再逐一分析了,直接上手用 Scrapy 编写此站点的爬虫:同时实现几个 Item Pipeline。
首先新建一个项目:我们取名为 scrapyitempipelinedemo,命令如下:
scrapy startproject scrapyitempipelinedemo
接下来新建一个 Spider,命令如下:
scrapy genspider scrapy ssr1.scrape.center
这样我们就成功创建了一个 Spider,名字为 scrape,允许爬取的域名为 ssr1.scrape.center。
接下来我们来实现列表页的爬取。本站点一共有 10 页数据,所以我们可以新建 10 个初始请求,实现 start_requests 方法的代码如下:
from scrapy import Request, Spider
class ScrapeSpider(Spider):
name = 'scrape'
allowed_domains = ['ssr1.scrape.center']
base_url = 'https://ssr1.scrape.center'
max_page = 10
def start_requests(self):
for i in range(1, self.max_page + 1):
url = f'{self.base_url}/page/{i}'
yield Request(url, callback=self.parse_index)
def parse_index(self, response):
print(response)
在这里我们声明了 max_page 即最大翻页数量,然后实现了 start_requests 方法,构造了 10 个初始请求分别爬取每一个列表页,Request 对应的回调方法修改为了 parse_index,最后我们暂时在 parse_index 方法里面打印输出了 resposne 对象。
运行这个 Spider 的命令如下:
scrapy crawl scrape
运行结果类似如下:
2024-01-31 10:06:57 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/1> (referer: None)
<200 https://ssr1.scrape.center/page/1>
2024-01-31 10:06:57 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/7> (referer: None)
<200 https://ssr1.scrape.center/page/7>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/9> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/3> (referer: None)
<200 https://ssr1.scrape.center/page/9>
<200 https://ssr1.scrape.center/page/3>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/4> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/10> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/5> (referer: None)
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/8> (referer: None)
<200 https://ssr1.scrape.center/page/4>
<200 https://ssr1.scrape.center/page/10>
<200 https://ssr1.scrape.center/page/5>
<200 https://ssr1.scrape.center/page/8>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/2> (referer: None)
<200 https://ssr1.scrape.center/page/2>
2024-01-31 10:06:58 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/page/6> (referer: None)
<200 https://ssr1.scrape.center/page/6>
可以看到对应的列表页的数据就被爬取下来了,Response 的状态码为 200。
接着我们可以在 parse_index 方法里对 response 的内容进行解析,提取每部电影的详情页链接,通过审查源代码可以发现,其标题对应的 CSS 选择器为 .item .name
,如图 15-9 所示。
(图略)
所以这里我们可以借助 response 的 css 方法进行提取,提取链接之后生成详情页的 Request。可以把 parse_index 方法改写如下:
def parse_index(self, response):
for item in response.css('.item'):
href = item.css('.name::attr(href)').extract_first()
url = response.urljoin(href)
yield Request(url, callback=self.parse_detail)
def parse_detail(self, response):
print(response)
在这单我们首先筛选了每部电影对应的节点,即 .item
,然后遍历这些节点提取其中的 .name
选择器对应的详情页链接,接着通过 response 的 urljoin 方法拼接成完整的详情页 URL,最后构造新的详情页 Request,回调方法设置为 parse_detail,同时在 parse_detail 方法里面打印输出 response。
重新运行,我们可以看到详情页的内容就被爬取下来了,类似的输出如下:
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/10> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/10>
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/3> (referer: https://ssr1.scrape.center/page/1)
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/9> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/3>
2024-01-31 10:09:06 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/8> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/9>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/1> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/8>
<200 https://ssr1.scrape.center/detail/1>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/29> (referer: https://ssr1.scrape.center/page/3)
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/5> (referer: https://ssr1.scrape.center/page/1)
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/2> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/29>
<200 https://ssr1.scrape.center/detail/5>
<200 https://ssr1.scrape.center/detail/2>
2024-01-31 10:09:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://ssr1.scrape.center/detail/6> (referer: https://ssr1.scrape.center/page/1)
<200 https://ssr1.scrape.center/detail/6>
其实现在 parse_detail 里面的 response 就是详情页的内容了,我们可以进一步对详情页的内容进行解析,提取每部电影的名称、类别、评分,简介、导演、演员等信息。
首先让我们新建一个 Item,叫作 MovieItem,定义如下:
from scrapy import Item, Field
class MovieItem(Item):
name = Field()
categories = Field()
drama = Field()
score = Field()
directors = Field()
actors = Field()
这里我们定义的几个字段 name、categories、score、drama、directors、actors 分别代表电影名称,类别、评分,简介,导演,演员。接下来我们就可以提取详情页了,修改 parse_detail 方法如下:
def parse_detail(self, response):
item = MovieItem()
item['name'] = response.xpath('//div[contains(@class, "item")]//h2/text()').extract_first()
item['categories'] = response.xpath('//button[contains(@class, "category")]/span/text()').extract()
item['score'] = response.css('.score::text').re_first('[\d\.]+')
item['drama'] = response.css('.drama p::text').extract_first().strip()
item['directors'] = []
directors = response.xpath('//div[contains(@class, "directors")]//div[contains(@class, "director")]')
for director in directors:
director_image = director.xpath('.//img[@class="image"]/@src').extract_first()
director_name = director.xpath('.//p[contains(@class, "name")]/text()').extract_first()
item['directors'].append({
'name': director_name,
'image': director_image
})
item['actors'] = []
actors = response.css('.actors .actor')
for actor in actors:
actor_image = actor.css('.actor .image::attr(src)').extract_first()
actor_name = actor.css('.actor .name::text').extract_first()
item['actors'].append({
'name': actor_name,
'image': actor_image
})
yield item
在这里我们首先创建了一个 MovieItem 对象,赋值为 item。然后我们使用 xpath 方法提取了 name、categories 两个字段。为了让大家不仅仅掌握 xpath 的提取方式,我们还使用 CSS 选择器提取了 score 和 drama 字段,同时 score 字段最后还调用了 re_first 方法传人正则表达式提取了分数的内容,对于导演 directors 和演员 actors,我们首先提取了单个 director 和 actor 节点,然后分别从中提取了姓名和照片,最后组合成一个列表赋值给 directors 和 actors 字段。
重新运行一下,可以发现提取结果类似如下:
2024-01-31 10:23:50 [scrapy.core.scraper] DEBUG: Scraped from <200 https://ssr1.scrape.center/detail/21>
{'actors': [{'image': 'https://p1.meituan.net/moviemachine/d1156c14dd899ada7c2b98bc373021c852875.jpg@128w_170h_1e_1c',
'name': '克林特·伊斯特伍德'},
{'image': 'https://p0.meituan.net/movie/665eab6fdb7755138e0c8092f35ba39327553.jpg@128w_170h_1e_1c',
'name': '李·范·克里夫'},
{'image': 'https://p1.meituan.net/movie/275042f2bbe012263b8deed1c96e611b42623.jpg@128w_170h_1e_1c',
'name': '埃里·瓦拉赫'},
{'image': 'https://p1.meituan.net/movie/3c91cd5186e89e927056adbc8a722f5014760.jpg@128w_170h_1e_1c',
'name': '路易吉·皮斯蒂利'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Claudio Scarchilli'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'John Bartha'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Livio Lorenzon'},
{'image': 'https://p0.meituan.net/movie/69698bc0960b07e3acddd412f3b88ee821953.jpg@128w_170h_1e_1c',
'name': '贝尼托·斯特凡内利'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Angelo Novi'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': '安东尼奥·卡萨斯'},
{'image': 'https://p0.meituan.net/movie/a608b42289f39ebd73e7550c38b3be0430595.jpg@128w_170h_1e_1c',
'name': '阿尔多·桑布雷利'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Al Mulock'},
{'image': 'https://p0.meituan.net/movie/59d0f1cb25db1e1ed6d57cf1c163da9e36698.jpg@128w_170h_1e_1c',
'name': 'Sergio Mendizábal'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Antonio Molino Rojo'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Lorenzo Robledo'},
{'image': 'https://p0.meituan.net/movie/5d53dde873a6c1570d3c934c8a146fe314431.jpg@128w_170h_1e_1c',
'name': '马里奥·布雷加'},
{'image': 'https://p0.meituan.net/movie/2f87d0b53695ba7fe2d1f0ed47f5998112368.jpg@128w_170h_1e_1c',
'name': '弗兰克·布拉尼亚'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Luigi Ciavarro'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Jesús Guzmán'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Víctor Israel'},
{'image': 'https://p1.meituan.net/movie/c0b9c3bbec6fdb7026c412ec85150e9e20784.jpg@128w_170h_1e_1c',
'name': '纳扎雷诺·纳塔莱'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Ricardo Palacios'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Romano Puppo'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Antonio Ruiz'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Román Ariznavarreta'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'José Terrón'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Saturno Cerra'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Enrique Santiago'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Fortunato Arena'},
{'image': 'https://p1.meituan.net/movie/9bdf1740721372b72f1fb6040ab34a3b7406.jpg@128w_170h_1e_1c',
'name': '阿蒂利奥·罗戴德约'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Veriano Genesi'},
{'image': 'https://p1.meituan.net/movie/24473db1b02483d777a08270d4faa99c30173.jpg@128w_170h_1e_1c',
'name': '阿尔多·久弗瑞'},
{'image': 'https://p0.meituan.net/movie/2aee47645d238cdbfa48e167e6fbecb88474.jpg@128w_170h_1e_1c',
'name': 'Rada Rassimov'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Enzo Petito'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Antonio Casale'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Sandro Scarchilli'},
{'image': 'https://p1.meituan.net/movie/41ae6b3e57bf9e4123089180d755507a36295.jpg@128w_170h_1e_1c',
'name': '切诺·阿隆索'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'William Conroy'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Aysanoa Runachagua'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Franco Doria'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Amerigo Castrighella'},
{'image': 'https://p1.meituan.net/mmdb/3a2061d771d98566d3e5fa5c08c5e0b33685.png@128w_170h_1e_1c',
'name': 'Tony Di Mitri'}],
'categories': ['西部', '冒险'],
'directors': [{'image': 'https://p0.meituan.net/movie/bc6d42116220a072db01371208b5157815631.jpg@128w_170h_1e_1c',
'name': '赛尔乔·莱翁内'}],
'drama': '故事发生在美国南北战争时期。图科(埃里·瓦拉赫 饰)是一个图财害命的江洋大盗,因此他被镇上悬赏通缉。布兰迪(克林特·伊斯特伍德 '
'饰)是一个除暴安良的牛仔,他无意中抓住了图科,但嫌赏金不够又掳走了他。在荒漠中,布兰迪惩罚图科让其自生自灭。但是诡诈的图科居然逃过了一劫,并纠集一些帮凶在客栈捉住了布兰迪 。正当图科以牙还牙折磨布兰迪的时候,他劫持了一个名叫卡森的士兵。后者临死前留下了宝藏的秘密,图科和布兰迪分别获得了一半信息。与此同时,一个狡猾的杀手桑坦萨(李·范·克里夫 '
'饰)也通过其他渠道发现了宝藏的秘密。于是,在寻宝的道路上,三个人使出浑身解数,上演了一场场对决的好戏……',
'name': '黄金三镖客 - Il buono, il brutto, il cattivo.',
'score': '9.1'}
可以看到这里我们已经成功提取了各个字段然后生成了 MovieItem 对象了。
下一步就是本节的重点内容了,我们需要把当前爬取到的内容存储到 MongoDB 和 Elasticsearch 中、然后将导演和演员的图片也下载下来。
要实现这个操作,我们需要创建 3 个 Item Pipeline,其中两个分别用来将数据存储到 MongoDB、Elasticseareh,另外下一个用来下载图片。
MongoDB
之前我们已经实现过 MongoDB 相关的 Pipeline 了,这里我们再简略说一下。
首先确保 MongoDB 已经安装并且正常运行,既可以运行在本地,也可以运行在远程,我们需要把它的连接字符串构造好:连接字符串的格式如下:
mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
比如运行在本地 27017 端口的无密码的 MongoDB 可以直接写为:
mongodb://localhost:27017
如果是远程 MongoDB,可以根据用户名,密码、地址、端口等构造。
我们实现一个 MongoDBPipeline,将信息保存到 MongoDB,在 pipelines.py 里添加如下类的实现:
class MongoDBPipeline(object):
@classmethod
def from_crawler(cls, crawler):
cls.connection_string = crawler.settings.get('MONGODB_CONNECTION_STRING')
cls.database = crawler.settings.get('MONGODB_DATABASE')
cls.collection = crawler.settings.get('MONGODB_COLLECTION')
return cls()
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.connection_string)
self.db = self.client[self.database]
def process_item(self, item, spider):
self.db[self.collection].update_one({
'name': item['name']
}, {
'$set': dict(item)
}, True)
return item
def close_spider(self, spider):
self.client.close()
这里我们首先利用 from_crawler 获取了全局配置 MONGODB_CONNECTION_STRING、MONGODB_DATABASE 和 MONGODB_COLLECTION,即 MongoDB 连接字符串,数据库名称,集合名词,然后将三者赋值为类属性。
接着我们实现了 open_spider 方法,该方法就是利用 from_crawler 赋值的 connection_string 创建一个 MongoDB 连接对象,然后声明数据库操作对象,close_spider 则是在 Spider 运行结束时关闭 MongoDB 连接。
接着最重要的就是 process_item 方法了,这个方法接收的参数 item 就是从 Spider 生成的 Item 对象,该方法需要将此 Item 存储到 MongoDB 中。这里我们使用了 update_one 方法实现了存在即更新,不存在则插人的功能。
接下来我们需要在 settings.py 里添加 MONGODB_CONNECTION_STRING、MONGODB_DATABASE 和 MONGODB_COLLECTION 这 3 个变量,相关代码如下:
MONGODB_CONNECTION_STRING = os.getenv('MONGODB_CONNECTION_STRING')
MONGODB_DATABASE = 'movies'
MONGODB_COLLECTION = 'movies'
这里可以将 MONGODB_CONNECTION_STRING 设置为从环境变量中读取,而不用将明文将密码等信息写到代码里。
如果是本地无密码的 MongoDB,直接写为如下内容即可:
MONGODB_CONNECTION_STRING = 'mongodb://localhost:27017' # or just use 'localhost'
这样,一个保存到 MongoDB 的 Pipeline 就创建好了,利用 process_item 方法我们即可完成数据插人到 MongoDB 的操作,最后会返回 Item 对象。
Elasticsearch
存储到 Elasticsearch 也是一样,我们需要先创建一个 Pipeline,代码实现如下:
class ElasticsearchPipeline(object):
@classmethod
def from_crawler(cls, crawler):
cls.connection_string = crawler.settings.get('ELASTICSEARCH_CONNECTION_STRING')
cls.index = crawler.settings.get('ELASTICSEARCH_INDEX')
return cls()
def open_spider(self, spider):
self.conn = Elasticsearch([self.connection_string])
if not self.conn.indices.exists(self.index):
self.conn.indices.create(index=self.index)
def process_item(self, item, spider):
self.conn.index(index=self.index, body=dict(item), id=hash(item['name']))
return item
def close_spider(self, spider):
self.conn.transport.close()
这里同样定义了 ELASTICSEARCH_CONNECTION_STRING 代表 Elasticsearch 的连接字符串,ELASTICSEARCH_INDEX 代表索引名称,具体初始化的操作和 MongoDBPipeline 的原理是类似的。
在 process_item 方法中,我们调用了 index 方法对数据进行索引,我们指定了 3 个参数,第一个参数 index 代表索引名称,第二个参数 body 代表数据对象,在这里我们将 Item 转为了字典类型,第三个参数 id 则是索引数据的 id,这里我们直接使用电影名称的 hash 值作为 id,或者自行指定其他 id 也可以的。
同样地,我们需要在 settings.py 里面添加 ELASTICSEARCH_CONNECTION_STRING 和 ELASTICSEARCH_INDEX:
ELASTICSEARCH_CONNECTION_STRING = os.getenv('ELASTICSEARCH_CONNECTION_STRING')
ELASTICSEARCH_INDEX = 'movies'
这里的 ELASTICSEARCH_CONNECTION_STRING 同样是从环境变量中读取的,它的格式如下:
http[s]://[username:password@]host[:port]
比如我实际使用的 ELASTICSEARCH_CONNECTION_STRING 值就类似:
https://user:password@es.cuiqingcai.com:9200
这里你可以根据实际情况更换成你的连接字待量,这样 ElasticsearchPipeline 就完成了。
Image Pipeline
Scrapy 提供了专门处理下载的 Pipeline,包括文件下载和图片下载。下载文件和图片的原理与抓取页面的原理一样,因此下载过程支持异步和多线程,十分高效。下面我们来看看具体的实现过程。
首先定义存储文件的路径,需要定义一个 IMAGES_STORE 变量,在 settings.py 中添加如下代码:
IMAGES_STORE = './images'
在这里我们将路径定义为当前路径下的 images 子文件夹,即下载的图片都会保存到本项目的 images 文件夹中。
内置的 ImagesPipeline 会默认读取 Item 的 image_urls 字段,并认为它是列表形式,接着遍历该字段后取出每个 URL 进行图片下载。
但是现在生成的 Item 的图片链接字段并不是 image_urls 字段表示的,我们是想下载 directors 和 actors 的每张图片。所以为了实现下载,我们需要重新定义下载的部分逻辑,即自定义 ImagePipeline 继承内置的 ImagesPipeline,重写几个方法。
我们定义的 ImagePipeline 代码如下:
from scrapy import Request
from scrapy.exceptions import DropItem
from scrapy.pipelines.images import ImagesPipeline
class ImagePipeline(ImagesPipeline):
def file_path(self, request, response=None, info=None):
movie = request.meta['movie']
type = request.meta['type']
name = request.meta['name']
file_name = f'{movie}/{type}/{name}.jpg'
return file_name
def item_completed(self, results, item, info):
image_paths = [x['path'] for ok, x in results if ok]
if not image_paths:
raise DropItem('Image Downloaded Failed')
return item
def get_media_requests(self, item, info):
for director in item['directors']:
director_name = director['name']
director_image = director['image']
yield Request(director_image, meta={
'name': director_name,
'type': 'director',
'movie': item['name']
})
for actor in item['actors']:
actor_name = actor['name']
actor_image = actor['image']
yield Request(actor_image, meta={
'name': actor_name,
'type': 'actor',
'movie': item['name']
})
在这里我们实现了 ImagePipeline,继承 Scrapy 内置的 ImagesPipeline,重写下面几个方法。
-
get_media_requests:第一个参数 item 是爬取生成的 Item 对象,我们要下载的图片链接保存在 Item 的 directors 和 actors 每个元素的 image 字段中。所以我们将 URL 逐个取出,然后构造 Request 发起下载请求。同时我们指定了 meta 信息,方便构造图片的存储路径,以便在下载完成时使用。
-
file_path:第一个参数 request 就是当前下载对应的 Request 对象。这个方法用来返回保存的文件名,在这里我们获取了刚才生成的 Request 的 meta 信息,包括 movie(电影名称)、type(电影类型)和 name(导演或演员姓名),最终三者拼合为 file_name 作为最终的图片路径。
-
item_completed:单个 Item 完成下载时的处理方法。因为并不是每张图片都会下载成功,所以我们需要分析下载结果并剔除下载失败的图片。如果某张图片下载失败,那么我们就不需将此 Item 保存到数据库。item_completed 方法的第一个参数 results 就是该 Item 对应的下载结果,它是一个列表,列表的每个元素是一个元组,其中包含了下载成功或失败的信息。这里我们遍历下载结果,找出所有成功的下载列表。如果列表为空,那么该 Item 对应的图片下载失败,随即抛出 DropItem 异常,忽略该 Item;否则返回该 Item,说明此 Item 有效。
现在为止,3 个 Item Pipeline 的定义就完成了。最后只需要启用就可以了,修改 settings.py,设置 ITEM_PIPELINES 的代码如下所示:
ITEM_PIPELINES = {
'scrapyitempipelinedemo.pipelines.ImagePipeline': 300,
'scrapyitempipelinedemo.pipelines.MongoDBPipeline': 301,
'scrapyitempipelinedemo.pipelines.ElasticsearchPipeline': 302,
}
这里要注意调用的顺序。我们需要优先调用 ImagePipeline 对 Item 做下载后的筛选,下载失败的 Item 就直接忽略,它们不会保存到 MongoDB 和 MySQL 里。随后再调用其他两个存储的 Pipeline,这样就能确保存人数据库的图片都是下载成功的。
接下来运行程序,执行爬取,命令如下所示:
scrapy crawl images
爬虫一边爬取一边下载,速度非常快,对应的输出日志如图 15-10 所示。
(图略)
查看本地 images 文件夹,发现图片都已经成功下载,如图15-11所示。
(图略)
可以看到图片已经分路径存储了,一部电影一个文件夹,演员和导演分二级文件夹,图片名直接以演员和导演名命名。
然后我们用 Kibana 查看 Elasticsearch,相应的电影数据也成功存储,如图 15-12 所示。
(图略)
查看 MongoDB,下载成功的图片信息同样已成功保存,如图 15-13 所示,
(图略)
这样我们就可以成功实现图片的下载并把图片的信息存人数据库了。