Flask-SQLAlchemy的基本使用

连接MySQL

在使用 Flask-SQLAlchemy 操作数据库之前,要先创建一个由 Flask-SQLAlchemy 提供的 SQLAlchemy 类的对象。在创建这个类时,要传入当前的 app,然后还需要在 app.config 中设置 SQLALCHEMY_DATABASE_URI,来配置数据库的连接,示例代码如下。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# MySQL所在的主机名
HOSTNAME = "127.0.0.1"
# MySQL监听的端口号,默认3306
PORT = 3306
# 连接MySQL的用户名,读者用自己设置的
USERNAME = "root"
# 连接MySQL的密码,读者用自己的
PASSWORD = "root"
# MySQL上创建的数据库名称
DATABASE = "database_learn"

app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8"

db = SQLAlchemy(app)

# 测试是否连接成功
with db.engine.connect() as conn:
    rs = conn.execute("select 1")
    print(rs.fetchone())

Flask-SQLAlchemy 在连接数据库时,会从 app.config 中读取 SQLALCHEMY_DATABASE_URI 参数,以上代码分别设置了 MySQL 主机名、端口号、用户名、密码及数据库名称,数据库应该提前在 MySQL 中创建好。SQLALCHEMY_DATABASE_URI 根据不同的数据库有不同的连接方式,MySQL 的连接方式如下。

mysql+[driver]://[username]:[password]@[host]:[port]/[database]?charset=utf8

其中 [] 中是变量,需要配置时填充进去即可。如果单击运行后,在 PyCharm 的控制台中打印了 (1,),则说明已经连接成功。

ORM模型

对象关系映射(object relationship mapping,简称 ORM)是一种可以用 Python 面向对象的方式来操作关系型数据库的技术,具有可以映射到数据库表能力的 Python 类我们称之为 ORM 模型。一个 ORM 模型与数据库中的一个表相对应,ORM 模型中的每个类属性分别对应表的每个字段;ORM 模型的每个实例对象对应表中的每条记录。ORM 技术提供了面向对象与 SQL 交互的桥梁,让开发者用面向对象的方式操作数据库,使用 ORM 模型具有以下优势。

  • 开发效率高:几乎不需要写原生 SQL 语句,使用纯 Python 的方式操作数据库,大大地提高了开发效率。

  • 安全性高:ORM 模型底层代码对一些常见的安全问题,如 SQL 注入做了防护,比直接使用 SQL 语句更加安全。

  • 灵活性强:Flask-SQLAlchemy 底层支持 SQLite、MySQL、Oracle、PostgreSQL 等关系型数据库,但针对不同的数据库,ORM 模型的代码几乎一模一样,只需修改少量代码,即可完成底层数据库的更换。

下面用 Flask-SQLAlchemy 来创建一个 User 模型,示例代码如下。

class User(db.Model):
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(100))
    password = db.Column(db.String(100))

db.create_all()

以上代码中,首先创建了一个 User 类,并使它继承自 db.Model 类,所有 ORM 模型必须是 db.Model 的直接或者间接子类。然后通过 __tablename__ 属性,指定 User 模型映射到数据库中表的名称。接着定义了 3 个 db.Column 类型的类属性,分别是 id、username、password,只有使用 db.Column 定义的类属性,才会被映射到数据库表中成为字段。在这个 User 模型中,id 是 db.Integer 类型,在数据库中将表现为整型,并且传递 primary_key=True 参数指定 id 作为主键,传递 autoincrement=True 参数设置 id 为自增长。username 和 password 属性分别指定其类型为 db.String 类型,在数据库中将表现为 varchar 类型,并且指定其最大长度为 100。

最后通过 db.create_all() 把 User 模型映射成数据库中的表。我们可以通过 navicat 软件来查看数据库中的表,如图 5-1 所示。

image 2025 01 21 14 06 37 604
Figure 1. 图5-1 navicat中查看user表

创建数据库的字段类型,除了以上的 db.Integer 和 db.String,还有以下字段类型,如表 5-1 所示。

image 2025 01 21 14 07 15 817
Figure 2. 表5-1 Flask-SQLAlchemy字段类型

字段在数据库中的表现,都是通过 db.Column 上的参数实现的。db.Column 常用参数如表 5-2 所示。

image 2025 01 21 14 08 49 658
Figure 3. 表5-2 db.Column常用参数

CRUD操作

使用 ORM 进行 CRUD(create、read、update、delete)操作,需要先把操作添加到会话中,通过 db.session 可以获取到会话对象。会话对象存在内存中,如果要把会话中的操作提取到数据库中,需要调用 db.session.commit() 操作;如果要把会话中的操作回滚,则需要调用 db.session.rollback() 实现。下面分别对 CRUD 操作进行讲解。

Create操作

使用 ORM 创建一条数据非常简单,先使用 ORM 模型创建一个对象,然后添加到会话中,再进行 commit 操作即可,示例代码如下。

@app.route('/user/add')
def user_add():
    user1 = User(username="张三", password="111111")
    user2 = User(username="李四", password="222222")
    user3 = User(username="王五", password="333333")

    db.session.add(user1)
    db.session.add(user2)
    db.session.add(user3)
    db.session.commit()

    return "用户添加成功!"

在以上代码中,首先用 User 类创建了 3 个对象,在创建对象时,必须通过关键字参数给字段赋值,否则 SQLAlchemy 将不知道是给哪个字段赋值,从而报错。由于 id 是作为一个自增长的主键,因此可以不需要赋值。然后再把 3 个对象添加到 session 中,最后再统一进行 commit 操作,即可把数据添加到数据库中。

Read操作

Read 就是查询操作。ORM 模型都是继承自 db.Model,db.Model 内置的 query 属性上有许多方法,可以实现对 ORM 模型的查询操作。query 上的方法可以分为两大类,分别是提取方法和过滤方法。首先来看提取方法,示例代码如下。

@app.route('/user/fetch')
def user_fetch():
    # 1. 获取User中所有数据
    users = User.query.all()

    # 2. 获取主键为1的User对象
    user = User.query.get(1)

    # 3. 获取第一条数据
    user = User.query.first()

    return "数据提取成功!"

在以上代码中,通过 all() 方法获取所有 User 对象,通过 get 方法获取指定主键的 User 对象,通过 first() 方法获取第一个 User 对象。提取数据的常用方法如表 5-3 所示。

image 2025 01 21 14 19 30 493
Figure 4. 表5-3 提取数据的常用方法

在查询数据时,经常需要做过滤操作。过滤最常用的两个方法是 filter 和 filter_by,filter 方法传递查询条件,filter_by 方法传递关键字参数,示例代码如下。

@app.route('/user/filter')
def user_filter():
    # 1. filter方法:
    users = User.query.filter(User.username == "张三").all()

    # 2. filter_by方法:
    users = User.query.filter_by(username="张三").all()

    return "数据过滤成功!"

除了 filter 和 filter_by 方法以外,Flask-SQLAlchemy 还提供了以下过滤方法,如表 5-4 所示。

image 2025 01 21 14 24 35 755
Figure 5. 表5-4 常用过滤方法

在表 5-4 中,query.slice(start,stop)、query.limit(limit)、query.offset(offset) 方法的使用比较简单,这里不做过多讲解。下面讲解 query.order_by() 和 query.group_by() 的用法。

(1)query.order_by() 的用法如以下代码所示。

from sqlalchemy import desc

@app.route('/user/filter')
def user_filter():
    # 正序排序
    users = User.query.order_by("id")
    users = User.query.order_by(User.id)

    # 倒序排序
    users = User.query.order_by(db.text("-id"))
    users = User.query.order_by(User.id.desc())

    # 使用desc函数进行倒序排序
    users = User.query.order_by(desc("id"))

    for user in users:
        print(user.id)

    return "数据过滤成功!"

以上代码中,以 User 的 id 字段为例(可以换成任何其他的字段),详细地罗列了正序和倒序排序的方法,读者在开发过程中可自行选择合适的方法实现排序。

(2)query.group_by() 方法是根据某个字段进行分组,分组的主要目的是获取分组后的数量、最大值、最小值、平均值、总和等。因为提取的数据不再是某个模型,所以不能通过 <模型>.query 的方式获取,而是通过 db.session.query 来提取,如要获取所有用户名在表中存在的个数,那么可以通过以下代码实现。

from sqlalchemy import func

@app.route('/user/filter')
def user_filter():
    # query.group_by方法
    users = db.session.query(User.username, func.count(User.id)) \
        .group_by("username").all()

    return "数据过滤成功!"

过滤是数据提取的一个很重要的功能,除了直接使用 == 和 != 关系运算符外,还可以使用以下常用的过滤条件进行过滤,但这些过滤条件只能通过 filter 方法实现。常用的过滤条件如下。

  • like:模糊查询,使用方式与 SQL 语句中的 like 类似,可以在搜索字符左右两边添加 % 来匹配任意字符。contains 方法相当于 like 在搜索字符左右两边都添加 %,例如,contains("张") 与 like("%张%") 是一样的效果。示例代码如下。

    users = User.query.filter(User.username.contains("张"))
    users = User.query.filter(User.username.like("%张%"))
  • in:判断值是否在指定数据集中,如果是就提取,否则就不提取。为了不与 Python 中的 in 关键字混淆,在 in 后加下画线,实际的方法名为 in_,示例代码如下。

    users = User.query.filter(User.username.in_(["张三","李四","王五"]))
  • not in:作用与 in 相反。其使用方式是在 in_ 方法所在代码表达式前添加(~),示例代码如下。

    users = User.query.filter(~User.username.in_(['张三']))
  • is null:判断值是否为空,如果为空就提取,否则就不提取。可以通过判断值是否为 none,或通过 is_ 方法实现,为了不与 Python 中的 in 关键字混淆,同样需要在 is 后加下画线,示例代码如下。

    users = User.query.filter(User.username==None)
    users = User.query.filter(User.username.is_(None))
  • is not null:作用与 is null 相反,实际的方法名为 isnot,示例代码如下。

    users = User.query.filter(User.username != None)
    users = User.query.filter(User.username.isnot(None))
  • and:用于同时满足多条件的查询,实际的方法名为 and_,示例代码如下。

    from sqlalchemy import and_
    users = User.query.filter(and_(User.username=="张三",User.id < 10))
  • or:用于满足一个或多个条件的查询,实际的方法名为 or_,示例代码如下。

    from sqlalchemy import or_
    users = User.query.filter(or_(User.username=="张三",User.username=="李四"))

update操作

更新操作分为两种,第一种针对一条数据,第二种针对多条数据。针对一条数据,可以直接修改对象的属性,然后执行 commit 操作即可,示例代码如下。

user = User.query.get(1)
user.username = "张三_重新修改的"
db.session.commit()

针对修改多条数据的情况,则是通过调用 filter 或者 filter_by 方法获取 BaseQuery 对象,然后再调用 update 方法,实现批量修改的,示例代码如下。

User.query.filter(User.username.like("%张三%")).update({"password":User.password+"_被修改的"},synchronize_session=False)
db.session.commit()

以上代码先通过 filter 方法过滤数据,然后再调用 update 方法,在所有的 password 后面都添加 "_被修改的" 字符串,并且因为使用了 like 方法作为过滤条件,所以需要指定 synchronize_session 参数为 False,最后再调用 commit() 方法即可批量完成数据的修改。

delete操作

删除操作也是分成两种。第一种是删除一条数据,第二种是删除多条数据。删除单条数据的操作方式非常简单,直接调用 db.session.delete 方法即可,示例代码如下。

user = User.query.get(1)
db.session.delete(user)
db.session.commit()

删除多条数据的操作方式类似更新多条数据,通过 BaseQuery 的 delete 方法即可实现,示例代码如下。

User.query.filter(User.username.contains("张三")).delete(synchronize_session=False)
db.session.commit()