aiohttp的使用
在 6.1节,我们介绍了异步爬虫的基本原理和 asyncio 的基本用法,并且在最后简单提及了使用 aiohttp 实现网页爬取的过程。本节我们介绍一下 aiohttp 的常见用法。
基本介绍
前面介绍的 asyncio 模块,其内部实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求来说,就需要用 aiohttp 实现了。
aiohttp 是一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,文提供了客户端。其中,我们用服务端可以搭建一个支持异步处理的服务器,这个服务器就是用来处理请求并返回响应的,类似于 Django、Flask、Tornado 等一些 Web 服务器。而客户端可以用来发起请求,类似于使用 requests 发起一个 HTTP 请求然后获得响应,但 requests 发起的是同步的网络请求,aiohttp 则是异步的。
本节我们主要了解一下 aiohttp 客户端部分的用法。
基本实例
我们来看一个基本的 aiohttp 请求案例,代码如下:
import aiohttp
import asyncio
async def fetch(session,url):
async with session.get(url) as response:
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html,status = await fetch(session, 'https://cuiqingcai.com')
print(f'html:{html[:100]}...')
print(f'status:{status}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这里使用 aiohttp 爬取了我的个人博客,获得了源码和响应状态码,并打印出来,运行结果如下:
html:<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content...
status:200
由于网页源码过长,这里只截取了输出的一部分。可以看到,我们成功获取了网页的源代码及响应状态码 200,也就是完成了一次基本的 HTTP 请求,即我们成功使用 aiohttp 通过异步的方式完成了网页爬取。当然,这个操作用之前讲的 requests 也可以做到。
能够发现,aiohttp 的请求方法的定义和之前有明显的区别,主要包括如下几点。
-
首先在导人库的时候,除了必须引人
aiohttp这个库,还必须引人asyncio库。因为要实现异步爬取,需要启动协程,而协程则需要借助于asyncio里面的事件循环才能执行。除了事件循环,asyncio里面也提供了很多基础的异步操作。 -
异步爬取方法的定义和之前有所不同,每个异步方法的前面都要统一加
async来修饰。 -
with as语句前面同样需要加async来修饰。在 Python 中,with as语句用于声明一个上下文管理器,能够帮我们自动分配和释放资源。而在异步方法中,with as前面加上async代表声明一个支持异步的上下文管理器。 -
对于一些返回协程对象的操作,前面需要加
await来修饰。例如response调用text方法,查询 API 可以发现,其返回的是协程对象,那么前面就要加await;而对于状态码来说,其返回值就是一个数值,因此前面不需要加await。所以,这里可以按照实际情况做处理,参考官方文档说明,看看其对应的返回值是怎样的类型,然后决定加不加await就可以了。 -
最后,定义完爬取方法之后,实际上是
main方法调用了fetch方法。要运行的话,必须启用事件循环,而事件循环需要使用asyncio库,然后调用run_until_complete方法来运行。
|
在 Python 3.7 及以后的版本中,我们可以使用 |
URL 参数设置
对于 URL 参数的设置,我们可以借助 params 参数,传入一个字典即可,实例如下:
import aiohttp
import asyncio
async def main():
params = {'name': 'germey','age':25}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.httpbin.org/get',params=params) as response:
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行结果如下:
{
"args": {
"age": "25",
"name": "germey"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.10 aiohttp/3.12.13",
"X-Amzn-Trace-Id": "Root=1-68538c39-4cc3276b484bcbdf637faa47"
},
"origin": "171.34.210.142",
"url": "https://www.httpbin.org/get?name=germey&age=25"
}
这里可以看到,实际请求的 URL 为 https://www.httpbin.org/get?name=germey&age=25 ,其中的参数对应于 params 的内容。
其它请求类型
aiohttp 还支持其他请求类型,如 POST、PUT、DELETE 等,这些和 requests 的使用方式有点类似,实例如下:
session.post('http://www.httpbin.org/post', data=b'data')
session.put('http://www.httpbin.org/put', data=b'data')
session.delete('http://www.httpbin.org/delete')
session.head('http://www.httpbin.org/get')
session.options('http://www.httpbin.org/get')
session.patch('http://www.httpbin.org/patch', data=b'data')
要使用这些方法,只需要把对应的方法和参数替换一下。
POST 请求
对于 POST 表单提交,其对应的请求头中的 Content-Type 为 application/x-www-form-urlencoded,我们可以用如下方式来实现:
import aiohttp
import asyncio
async def main():
data = {'name':'germey','age':25}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post',data=data) as response:
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行结果如下:
{
"args": {},
"data": "",
"files": {},
"form": {
"age": "25",
"name": "germey"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "18",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.10 aiohttp/3.12.13",
"X-Amzn-Trace-Id": "Root=1-68538c79-72eef5bf73b714a6648d7aa8"
},
"json": null,
"origin": "171.34.210.142",
"url": "https://www.httpbin.org/post"
}
对于 POST JSON 数据提交,其对应的请求头中的 Content-Type 为 application/json,我们只需要将 post 方法里的 data 参数改成 json 即可,实例代码如下:
import aiohttp
import asyncio
async def main():
data = {'name':'germey','age':25}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post', json=data) as response:
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行结果如下:
{
"args": {},
"data": "{\"name\": \"germey\", \"age\": 25}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "29",
"Content-Type": "application/json",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.10 aiohttp/3.12.13",
"X-Amzn-Trace-Id": "Root=1-68538d28-15f621b947926c230c0ab7cf"
},
"json": {
"age": 25,
"name": "germey"
},
"origin": "171.34.210.142",
"url": "https://www.httpbin.org/post"
}
可以发现,其实现也和 requests 非常像,不同的参数支持不同类型的请求内容。
响应
对于响应来说,我们可以用如下方法分别获取其中的状态码、响应头、响应体、响应体二进制内容、响应体 JSON 结果,实例代码如下:
import aiohttp
import asyncio
async def main():
data = {'name':'germey','age':25}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post',data=data) as response:
print('status:', response.status)
print('headers:', response.headers)
print('body:', await response.text())
print('bytes:', await response.read())
print('json:', await response.json())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
运行结果如下:
status: 200
headers: <CIMultiDictProxy('Date': 'Thu, 19 Jun 2025 04:09:17 GMT', 'Content-Type': 'application/json', 'Content-Length': '514', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
"args": {},
"data": "",
"files": {},
"form": {
"age": "25",
"name": "germey"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "18",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.10 aiohttp/3.12.13",
"X-Amzn-Trace-Id": "Root=1-68538d6d-4c315c7e6b837cd20b15d4f9"
},
"json": null,
"origin": "171.34.210.142",
"url": "https://www.httpbin.org/post"
}
bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "25", \n "name": "germey"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "18", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.10 aiohttp/3.12.13", \n "X-Amzn-Trace-Id": "Root=1-68538d6d-4c315c7e6b837cd20b15d4f9"\n }, \n "json": null, \n "origin": "171.34.210.142", \n "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'germey'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.10 aiohttp/3.12.13', 'X-Amzn-Trace-Id': 'Root=1-68538d6d-4c315c7e6b837cd20b15d4f9'}, 'json': None, 'origin': '171.34.210.142', 'url': 'https://www.httpbin.org/post'}
可以看到,这里有些字段前面需要加 await,有些则不需要。其原则是,如果返回的是一个协程对象(如 async 修饰的方法),那么前面就要加 await,具体可以看 aiohttp 的 API,其链接为: https://docs.aiohttp.org/en/stable/client_reference.html 。
超时设置
我们可以借助 ClientTimeout 对象设置超时,例如要设置 1 秒的超时时间,可以这么实现:
import aiohttp
import asyncio
async def main():
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/get') as response:
print('status:', response.status)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
如果在 1 秒之内成功获取响应,那么运行结果如下:
200
如果超时,则会抛出 TimeoutError 异常,其类型为 asyncio.TimeoutError,我们进行异常捕获即可。
另外,声明 ClientTimeout 对象时还有其他参数,如 connect、socket_connect 等,详细可以参考官方文档: https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts 。
并发限制
由于 aiohttp 可以支持非常高的并发量,如几万、十万、百万都是能做到的,但面对如此高的并发量,目标网站很可能无法在短时间内响应,而且有瞬间将目标网站爬挂掉的危险,这提示我们需要控制一下爬取的并发量。
一般情况下,可以借助 asyncio 的 Semaphore 来控制并发量,实例代码如下:
import asyncio
import aiohttp
CONCURRENCY = 5
URL = "https://www.baidu.com"
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():
async with semaphore:
print('scraping', URL)
async with session.get(URL) as response:
await asyncio.sleep(1)
return await response.text()
async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
这里我们声明 CONCURRENCY(代表爬取的最大并发量)为 5,同时声明爬取的目标 URL 为百度。接着,借助 Semaphore 创建了一个信号量对象,将其赋值为 semaphore,这样就可以用它来控制最大并发量了。怎么使用呢?这里我们把 semaphore 直接放置在了对应的爬取方法里,使用 async with 语句将 semaphore 作为上下文对象即可。这样一来,信号量便可以控制进入爬取的最大协程数量,即我们声明的 CONCURRENCY 的值。
在 main 方法里,我们声明了 10000 个 task,将其传递给 gather 方法运行。若不加以限制,那这 10000 个 task 会被同时执行,并发数量相当大。但有了信号量的控制之后,同时运行的 task 数量最大会被控制在 5 个,这样就能给 aiohttp 限制速度了。
总结
本节我们了解了 aiohttp 的基本使用方法,更详细的内容还是推荐大家查阅官方文档,详见 https://docs.aiohttp.org/ 。