Sqlalchemy Python
ORM(Object Relational Mapping)使用介绍
介绍
Sqlalchemy API 分新旧两种:
1.3及之前的老 API1.4及之后的2.0新 API
| 概念 | 对应数据库 | 说明 |
|---|---|---|
| Engine | 连接 | 驱动引擎 |
| Session | 连接池,事务 | 由此开始查询 |
| Model | 表 | 类定义 |
| Column | 列 | |
| Query | 若干行 | 可以链式添加多个条件 |
| 数据类型 | 数据库数据类型 | python 数据类型 | 说明 |
|---|---|---|---|
| Integer | int | int | 整形,32 位 |
| String | varchar | string | 字符串 |
| Text | text | string | 长字符串 |
| Float | float | float | 浮点型 |
| Boolean | tinyint | bool | True / False |
| Date | date | datetime.date | 存储时间年月日 |
| DateTime | datetime | datetime.datetime | 存储年月日时分秒毫秒等 |
| Time | time | datetime.datetime | 存储时分秒 |
使用步骤
- 导入 SQLAlchemy
- 创建类,定义表名和字段名
- 初始化数据库链接:
create_engine('数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名') - 创建 session:
sessionmaker(bind=engine) session.add(对象)session.commit()session.close()
安装
pip install sqlalchemycreate_engine
from sqlalchemy import create_engine
engine = create_engine(
DATABASE_URI,
# connect_args={"ssl": {"key": SQLALCHEMY_DATABASE_PEM}},
echo=True,
pool_size=8,
pool_recycle=60*30,)说明:
echo为True时打印sql语句,一般与debug配合使用echo=False关闭日志
pool_size连接池的大小,默认为 5 个,设置为 0 时表示连接无限制pool_recycle设置数据库多久没连接自动断开
模型
class User(Base):
"""User account."""
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement="auto")
username = Column(String(255), unique=True, nullable=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
def __repr__(self):
return "<User %r>" % self.username__repr__ 的作用
__repr__ 的主要目的是为开发人员提供调试和日志记录的便利。它的输出应该尽可能清晰、无歧义,并且理想情况下,是一个有效的 Python 表达式,你可以直接将其复制粘贴到代码中来重新创建该对象。
当你在交互式会话中打印一个对象,或者在列表、字典等容器中查看对象时,Python 默认会调用对象的 __repr__ 方法来获取其字符串表示。
与 __str__ 的区别
__repr__ 常常与另一个特殊方法 __str__ 进行比较。
__repr__:用于开发人员。它返回一个精确、明确的字符串,其目标是无歧义。__str__:用于普通用户。它返回一个可读性高、友好的字符串,其目标是可读性。
举个例子:
import datetime
now = datetime.datetime.now()
# __repr__ 的输出:适合开发人员,精确到微秒,格式严谨
print(repr(now))
# 例如: datetime.datetime(2016, 05, 07, 11, 42, 6, 888777)
# __str__ 的输出:适合普通用户,更易读
print(str(now))
# 例如: 2016-05-07 11:42:06.888777如果一个类没有定义 __str__ 方法,但定义了 __repr__ 方法,那么 str() 函数和 print() 函数会退而求其次地使用 __repr__ 的输出来作为字符串表示。
生成数据表
Base.metadata.create_all(engine)操作数据
from sqlalchemy.orm import sessionmaker
# Create database session
Session = sessionmaker(bind=engine)
session = Session()session 的常见操作:
-
flush预提交,未写入数据库中 -
commit提交事务 -
rollback回滚事务 -
close关闭事务 -
增加
add_user = Users("test")
session.add(add_user)
session.commit()- 查
| filter | filter_by |
|---|---|
| 比较运算符,相等比较用== | 只能使用"=","!=“和”><" |
| 类名.属性名 | 属性名 |
| 类名.属性名 | 参数是**kwargs,支持组合查询 |
| 支持 and,or 和 in 等 |
existing_user = (
session.query(User).filter(User.username == user.username).first()
)- 改
session.query(Users).filter_by(id=1).update({'username': "xianbin"})
# 或
users = session.query(Users).filter_by(username="test").first()
users.username = "xianbin"
session.add(users)- 删
session.delete(user) # Delete the user
session.commit() # Commit the change
# 或
session.query(Users).filter(Users.username == "test").delete()
session.commit()原生 SQL
from sqlalchemy import create_engine
from sqlalchemy import text
engine = create_engine('sqlite:///:memory:')
result = engine.execute(
text("SELECT * FROM users WHERE age >= :age"), {'age': 21})
for row in result:
print(row)from sqlalchemy.orm import Session
session = Session(bind=engine)
result = session.execute(text("SELECT * FROM users WHERE age >= :age"), {'age': 21})
for row in result:
print(row)
with session.begin():
session.execute(
text("UPDATE users SET age = :new_age WHERE age = :old_age"),
{'new_age': 30, 'old_age': 20}
)F&Q
ModuleNotFoundError: No module named ‘MySQLdb’
在 python2.x 中用 mysqldb,python3.x 中使用 pymysql 代替
import pymysql
pymysql.install_as_MySQLdb()TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30
修改 pool_size 和 max_overflow 的大小
from sqlalchemy import create_engine
engine = create_engine(
"mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"),
echo=False,
pool_size=20,
pool_recycle=60*30,
max_overflow=50)Can’t reconnect until invalid transaction is rolled back. (Background on this error at: http://sqlalche.me/e/14/8s2b)
- 原因:连接断开后,事务没有回滚,残留的锁导致后续的查询报错
- 解决:在所有的数据库操作的时候捕捉异常进行事务的回滚
from models import OrderInfo
from sqlalchemy.exc import InvalidRequestError
try:
order = OrderInfo.query.filter_by(task_id=user_dict.get('task_id')).first()
order.status = 'COMPLETE'
db.session.commit()
except InvalidRequestError:
db.session.rollback()
except Exception as e:
print(e)最近更新
最新评论