aiohttp异步爬取实战

6.2 节我们介绍了 aiohttp 的基本用法,本节我们完成异步爬虫的实战演练。

案例介绍

本次我们要爬取一个数据量相对大一点的网站,链接为 https://spa5.scrape.center/ ,页面如图 6-1 所示。

image 2025 06 19 12 13 11 284
Figure 1. 图6-1 要爬取的网站页面

这是一个图书网站,整个网站包含数千本图书信息,网站数据是 JavaScript 渲染而得的,数据可以通过 Ajax 接口获取,并且接口没有设置任何反爬措施和加密参数。另外,由于这个网站之前的电影案例数据量多—些,所以更加适合做异步爬取。

本节我们要完成如下目标:

  • 使用 aiohttp 爬取全站的图书数据;

  • 将数据通过异步的方式保存到 MongoDB 中。

准备工作

开始本节的探索之前,请确保你已经做好了如下准备工作:

  • 安装好了 Python(最低为 Python 3.6 版本,最好为 3.7 版本或以上),并能成功运行 Python 程序;

  • 了解了 Ajax 爬取的一些基本原理和模拟方法;

  • 了解了异步爬虫的基本原理和 asyncio 库的基本用法;

  • 了解了 aiohttp 库的基本用法;

  • 安装并成功运行了 MongoDB 数据库,而且安装了异步爬虫库 motor

关于最后一条,要实现 MongoDB 异步存储,离不开异步实现的 MongoDB 存储库 motor,其安装命令为:

pip3 install motor

详细的安装方式可以参考:https://setup.scrape.center/motor。

做好如上准备工作之后,我们就可以开始数据的爬取了。

页面分析

第 5 章我们讲解了 Ajax 的基本分析方法,本节的案例站点和之前分析 Ajax 时用的案例站点结构类似,都是列表页加详情页的结构,加载方式也都是 Ajax,所以我们能轻松分析到如下信息。

  • 列表页的 Ajax 请求接口格式为 https://spa5.scrape.center/api/book/?limit=18&offset={offset} 。其中 limit 的值为每一页包含多少本书;offset 的值为每一页的偏移量,计算公式为 offset = limit *(page-1),如第 1 页的 offset 值为 0,第 2 页 offset 的值为 18,以此类推。

  • 在列表页 Ajax 接口返回的数据里,results 字段包含当前页里 18 本图书的信息,其中每本书的数据里都含有一个 id 字段,这个 id 就是图书本身的 ID,可以用来进一步请求详情页。

  • 详情页的 Ajax 请求接口格式为 https:/spa5.scrape.center/api/book/{id}。其中的 id 即为详情页对应图书的 ID,可以从列表页 Ajax 接口的返回结果中获取此内容。

如果你掌握了 5.3 节的内容,那么上面三点应该很容易分析出来。如果有难度,不妨先复习一下之前的知识。

实现思路

其实,一个完善的异步爬虫应该能够充分利用资源进行全速爬取,其实现思路是维护一个动态变化的爬取队列,每产生一个新的 task,就将其放入爬取队列中,有专门的爬虫消费者从此队列中获取 task 并执行,能做到在最大并发量的前提下充分利用等待时间进行额外的爬取处理。

但上面的实现思路整体较为烦琐,需要设计爬取队列、回调函数、消费者等机制,需要实现的功能较多。由于我们刚刚接触 aiohttp 的基本用法,本节也主要是了解 aiohttp 的实战应用,因此这里稍微将爬取案例网站的实现过程简化一下。

我们将爬取逻辑拆分成两部分,第一部分为爬取列表页,第二部分为爬取详情页。因为异步爬虫的关键点在于并发执行,所以可以将爬取拆分为如下两个阶段。

  • 第一阶段是异步爬取所有列表页,我们可以将所有列表页的爬取任务集合在一起,并将其声明为由 task 组成的列表,进行异步爬取。

  • 第二阶段则是拿到上一步列表页的所有内容并解析,将所有图书的 id 信息组合为所有详情页的爬取任务集合,并将其声明为 task 组成的列表,进行异步爬取,同时爬取结果也以异步方式存储到 MongoDB 里面。

因为两个阶段在拆分之后需要串行执行,所以可能无法达到协程的最佳调度方式和资源利用情况,但也差不了很多。这个实现思路比较简单清晰,代码实现起来也较为容易,能够帮我们快速了解 aiohttp 的基本用法。

基本配置

首先,先配置一些基本的变量并引入一些必需的库,代码如下:

import asyncio
import aiohttp
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s:%(message)s')

INDEX_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}'
DETAIL_URL = "https://spa5.scrape.center/api/book/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

这里我们导人了 asyncioaiohttplogging 这 3 个库,然后定义了 logging 的基本配置。接着定义了 URL、爬取页码数量 PAGE_NUMBER、并发量 CONCURRENCY 等信息。

爬取列表页

第一阶段来爬取列表页,还是和之前一样,先定义一个通用的爬取方法,代码如下:

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api(url):
    async with semaphore:
        try:
            logging.info('scraping%s', url)
            async with session.get(url) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.error('error occurredwhilescraping%s', url, exc_info=True)

这里我们声明了一个信号量,用来控制最大并发数量。

接着,定义了 scrape_api 方法,接收一个参数 url。该方法首先使用 async with 语句引人信号量作为上下文,接着调用 sessionget 方法请求这个 url,然后返回响应的 JSON 格式的结果。另外,这里还进行了异常处理,捕获了 ClientError,如果出现错误,就会输出异常信息。

然后,爬取列表页,实现代码如下:

async def scrapeindex(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)

这里定义了 scrape_index 方法用于爬取列表页,它接收一个参数 page。随后构造了一个列表页的 URL,将其传给 scrape_api 方法即可。这里注意,方法同样需要用 async 修饰,调用的 scrape_api 方法前面需要加 await,因为 scrape_api 调用之后本身会返回一个协程对象。另外,由于 scrape_api 的返回结果就是 JSON 格式,因此这个结果已经是我们想要爬取的信息,不需要再额外解析了。

接下来我们定义 main 方法,将上面的方法串联起来调用,实现如下:

import json

async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER+1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info('resuits %s', json.dumps(results,ensure_ascii=False, indent=2))

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

这里首先声明了 session 对象,即最初声明的全局变量。这样的话,就不需要在各个方法里面都传递 session 了,实现起来比较简单。

接着定义了 scrape_index_tasks,这就是用于爬取列表页的所有 task 组成的列表。然后调用 asynciogather 方法,并将 task 列表传入其参数,将结果赋值为 results,它是由所有 task 返回结果组成的列表。

最后,调用 main 方法,使用事件循环启动该 main 方法对应的协程即可。

运行结果如下:

2025-06-19 12:22:34,211-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=0
2025-06-19 12:22:34,211-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=18
2025-06-19 12:22:34,212-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=36
2025-06-19 12:22:34,212-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=54
2025-06-19 12:22:34,212-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=72
2025-06-19 12:22:34,904-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=90
2025-06-19 12:22:35,499-INFO:scrapinghttps://spa5.scrape.center/api/book/?limit=18&offset=108

可以看到,这里就开始异步爬取了,并发量是由我们控制的,目前为 5。当然,也可以进一步调高这个数字,在网站能承受的情况下,爬取速度会进一步加快。

最后,results 就是爬取所有列表页得到的结果,接着就可以用它进行第二阶段的爬取了。

爬取详情页

第二阶段是爬取详情页并保存数据。由于每个详情页分别对应一本书,每本书都需要一个 ID 作为唯一标识,而这个 ID 又正好存在 results 里面,所以下面我们需要将所有详情页的 ID 获取出来。

main 方法里增加 results 的解析代码,实现如下:

ids = []
for index_data in results:
    if not index_data: continue
    for item in index_data.get('results'):
        ids.append(item.get('id'))

这样 ids 就是所有书的 id 了,然后我们用所有的 id 构造所有详情页对应的 task,进行异步爬取即可。

这里再定义两个方法,用于爬取详情页和保存数据,实现如下:

from motor.motor_asyncio import AsyncIOMotorClient

MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'books'
MONGO_COLLECTION_NAME = 'books'

client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]

async def save_data(data):
    logging.info('saving data %s, data)
    if data:
        return await collection.update_one({
            'id':data.get('id')
        },{'$set': data }, upsert=True)

async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    await save_data(data)

这里定义了 scrape_detail 方法用于爬取详情页数据,并调用 save_data 方法保存数据。save_data 方法可以将数据保存到 MongoDB 里面。

这里我们用到了支持异步的 MongoDB 存储库 motormotor 的连接声明和 pymongo 是类似的,保存数据的调用方法也基本一致,不过整个都换成了异步方法。

接着在 main 方法里面增加对 scrape_detail 方法的调用即可爬取详情页,实现如下:

scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
await asyncio.wait(scrape_detail_tasks)
await session.close()

这里先声明了 scrape_detail_tasks,这是由所有爬取详情页的 task 组成的列表,接着调用了 asynciowait 方法,并将声明的列表传入其中,调用执行此方法即可爬取详情页。当然,这里也可以使用 gather 方法,效果是一样的,只不过返回结果略有差异。全部执行完毕后,调用 close 方法关闭 session

一些详情页的爬取过程如下:

最后,我们观察到,爬取的数据都保存到 MongoDB 数据库里面了,如图 6-2 所示。

图 6-2 爬取到的图书数据

至此,我们就使用 aiohttp 完成了对图书网站的异步爬取。

总结

本节我们通过一个实例讲解了 aiohttp 异步爬虫的具体实现。在学习过程中不难发现,相比普通的单线程爬虫来说,使用异步爬虫可以大大提高爬取效率,后面我们也会多多使用。