异步版Flask安装与异步编程性能
Flask 在 2.0 版本实现了异步功能,接下来讲解异步版 Flask 的安装、Flask 异步编程性能,以及异步编程在 Flask 项目中的实战应用场景。
安装异步版Flask
要在 Flask 中执行异步编程,首先必须使用 Python 3.7 以上的版本,其次要在安装 Flask 时选择异步拓展。通过以下命令即可安装异步版 Flask。
$ pip install flask[async]
以上命令除了安装异步版 Flask,还会安装 asgiref,从而可以方便地将同步代码和异步代码相互转换。安装了 asgiref 后,在 Flask 中,视图函数、钩子函数(如 error_handlers、before_request 等)都可以定义成异步的,示例代码如下。
@app.route("/get-data")
async def get_data():
data = await async_db_query(...)
return jsonify(data)
Flask异步编程性能
Flask 是一个符合 WSGI 接口的框架,WSGI 本身是同步的。Flask 中的异步运行过程如图 11-1 所示。
可以看到,在请求到达 WSGI 服务器后,WSGI 服务器会启动一个新的线程,然后在线程中创建一个事件循环来执行异步视图函数,每来一个请求就创建一个新线程,因此异步与同步的并发量其实是一致的。异步代码不一定比同步代码执行效率更高,只有并发执行 I/O 操作才能体现并发的优势,当满足以下两个条件的 I/O 操作时可以考虑使用异步代码。

-
有一些 I/O 操作。
-
每个 I/O 操作不需要花费太长时间。
例如以下情形。
-
向站外发送一些 HTTP 请求。
-
和数据库有一些交互操作。
-
有一些文件操作。
对于一些长时间任务和 CPU 密集型任务,使用异步是不合适的,如以下情形。
-
运行机器学习模型。
-
处理图片、转码视频或者生成PDF文件等。
-
执行备份。
以上这类长时间或者 CPU 密集型任务,建议使用 Celery 异步框架。
实战——异步发送HTTP请求
发送一些 HTTP 请求是异步最适合的场景之一。按照同步的方式,必须要等第一个请求响应后才能执行第二个请求,以此类推,如果每个请求响应时间是 1s,有 10 个请求则必须要 10s 才能完成。我们使用 requests 库发送同步请求,首先通过 pip install requests 命令安装 requests 库。然后发送 5 个网络请求,并计算时间,代码如下。
@app.route("/website/sync")
def website_sync():
start_time = time.time()
urls = [
"https://www.python.org/",
"https://www.php.net/",
"https://www.java.com/",
"https://dotnet.microsoft.com/",
"https://www.javascript.com/"
]
sites = []
for url in urls:
response = requests.get(url)
sites.append({'url': response.url, 'status': response.status_code})
response = '<h1>URLs: </h1>'
for site in sites:
response += f"<p>URL: {site['url']}, Status Code: {site['status']}</p>"
end_time = time.time()
print("time: %.2f" % (end_time - start_time))
return response
上述代码向 5 个不同的 URL 发送请求,执行所消耗的时间为 10.28s(不同速率的网络和配置的计算机会有所区别)。
异步则不同,它可以发送完一个请求后,不需要等待即可发送第二个请求,以此类推,如果每个请求响应时间是 1s,则 10 个请求可以在 1s 多一点的时间即可完成。使用异步完成以上 5 个请求的示例代码如下。
async def fetch_url(session, url):
response = await session.get(url)
return {'url': response.url, 'status': response.status}
@app.route("/website/async")
async def website_async():
start_time = time.time()
urls = [
"https://www.python.org/",
"https://www.php.net/",
"https://www.java.com/",
"https://dotnet.microsoft.com/",
"https://www.javascript.com/"
]
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_url(session, url))
sites = await asyncio.gather(*tasks)
response = '<h1>URLs: </h1>'
for site in sites:
response += f"<p>URL: {site['url']}, Status Code: {site['status']}</p>"
end_time = time.time()
print("time: %.2f" % (end_time - start_time))
return response
上述代码中,使用了 asyncio 将 5 个发送 HTTP 请求的协程并行运行。执行上述代码所消耗的时间为 2.57s(不同速率的网络和配置的计算机会有所区别),执行效率是同步的 4 倍左右。如果上述代码不使用 asyncio.gather 并行运行协程,则执行过程依然为同步。
使用异步SQLAlchemy
SQLAlchemy 在 1.4 版本就添加了异步 API,可以方便地被集成到 Flask 和其他 Web 框架中,如 FastAPI。在 Flask 中使用异步 SQLAlchemy 不能选择 Flask-SQLAlchemy, Flask-SQLAlchemy 没有提供用来替换 AsyncEngine 和 AsyncSession 的接口,因此需要手动创建连接对象。这里为了简单,我们使用 sqlite 作为数据库。首先安装异步连接 sqlite 数据库的驱动包,命令如下。
$ pip install aiosqlite
如果读者想要异步操作 MySQL 数据库,则需要安装 pymysql 和 aiomysql 两个驱动程序,并在数据库连接 URL 上将 mysql+pymysql 修改为 mysql+aiomysql。 |
使用 SQLAlchemy 创建异步连接 sqlite 对象的代码如下。
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String, Float
DATABASE_URL = "sqlite+aiosqlite:///./book.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()
上述代码中,首先创建了连接数据库配置的 URL,然后使用 create_async_engine 方法创建了一个异步引擎,接着使用 sessionmaker 方法创建了一个异步 session 对象,并且指定创建的父类是 AsyncSession。最后使用 declarative_base 方法创建了一个 ORM 模型的基类 Base,这样以后所有ORM模型都需要继承自 Base。下面创建一个 Book 模型,代码如下。
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
author = Column(String(200), nullable=False)
price = Column(Float, default=0)
在项目第一次请求之前,我们把 ORM 模型创建到数据库中。添加以下钩子函数。
@app.before_first_request
async def before_first_request():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
上述代码中,首先将 before_first_request 钩子函数定义成了协程,然后使用 async with 异步上下文创建了一个事务,以同步的方式将原来的表删除,并创建新的表。这里之所以用同步的方式,是因为 Base.metadata.drop_all 和 Base.metadata.create_all 两个方法都没有被定义成异步。
如果想要使用类似 flask-migrate 的方式同步 ORM 模型到数据库中,可以使用 alembic 来实现,官网为 https://alembic.sqlalchemy.org/en/latest/ 。 |
接着添加一个创建图书的异步视图,代码如下。
@app.post('/books/add')
async def add_books():
name = request.form.get('name')
author = request.form.get('author')
price = request.form.get('price')
async with async_session() as session:
async with session.begin():
book = Book(name=name, author=author, price=price)
session.add(book)
await session.flush()
return "success"
上述代码中,首先使用异步上下文创建了一个 session 对象,然后使用 session.begin() 创建了一个事务,又在事务中添加图书到数据库中。本例中没有涉及并行 I/O 的操作,与同步的方式效率相同。这也从侧面说明了,同步编程方式仍然是大部分场景的首选方案。
如果读者需要了解更多有关异步 SQLAlchemy 的知识,请参考其官方文档,网址为 https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html 。
Jinja2开启异步支持
Jinja2 从 2.9 版本开始增加了异步支持,在创建 Environment 时,通过设置 enable_async 参数即可开启异步模式。但是 Flask 对象默认已经创建了 Environment 对象,我们只需要通过设置 app.jinja_env.is_async=True 即可开启异步模式。Jinja2 开启异步模式后,就可以在模板中使用协程。下面通过 app.jinja_env.globals 添加协程,代码如下。
app = Flask(__name__)
app.jinja_env.is_async = True
async def get_all_books():
async with async_session() as session:
stmt = select(Book)
result = await session.execute(stmt)
books = result.scalars().all()
return books
app.jinja_env.globals["books"] = get_all_books
上述代码中,我们首先通过 app.jinja_env.is_async 开启了异步模式,然后定义了一个 get_all_books 的协程,取名为 books,再将这个协程添加到了模板全局环境中。这样在模板中,就可以直接执行 books 协程了。
在 templates 下创建 index.html 模板,并在首页视图函数中通过 flask.render_template 渲染模板,代码如下。
@app.route('/')
def index():
return render_template("index.html")
以上代码中的 index.html 模板的代码如下。
<table>
<thead>
<tr>
<th>书名</th>
<th>作者</th>
<th>价格</th>
</tr>
</thead>
<tbody>
{% for book in books() %}
<tr>
<td>{{ book.name }}</td>
<td>{{ book.author }}</td>
<td>{{ book.price }}</td>
</tr>
{% endfor %}
</tbody>
</table>
上述代码中,books 协程被当作普通函数执行,即可得到 books 协程返回的结果。