python ORM框架sqlalchemy使用详解
发布时间
阅读量:
阅读量
from sqlalchemy import create_engine
#创建引擎
engine = create_engine(
"postgresql://postgres:postgres@xxxxxx",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有可用线程在队列最多等待的时间,否则报错
pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置)
encoding='utf-8' #sqlalchemy 默认使用latin-1进行编码,所以当出现中文时设置编码类型
)
#执行sql语句
engine.execute("INSERT INTO user (name) VALUES ('dadadadad')")
result = engine.execute('select * from user')
res = result.fetchall()
print(res)
ORM功能使用 使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling /Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
创建表
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://postgres:postgres@192.168.91.13:5432/legaldata",
max_overflow=2,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
Base = declarative_base()
# 创建单表
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32))
extra = Column(String(16))
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_id_name', 'name', 'extra'),
)
# 一对多
class Favor(Base):
__tablename__ = 'favor'
nid = Column(Integer, primary_key=True)
caption = Column(String(50), default='red', unique=True)
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
favor_id = Column(Integer, ForeignKey("favor.nid"))
# 多对多
class ServerToGroup(Base):
__tablename__ = 'servertogroup'
nid = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
port = Column(Integer, default=22)
def init_createdb():
"""
根据类创建所有数据库表
:return:
"""
Base.metadata.create_all(engine)
def init_dropdb():
"""
根据类删除所有数据库表
:return:
"""
Base.metadata.drop_all(engine)
Session = sessionmaker(engine) #生成一个会话类,用来操作数据
session = Session()
增
'''创建一个对象'''
obj = Users(name="liu", extra='lx')
# 单条
session.add(obj)
# 多条
session.add_all([
Users(name="liux", extra='123'),
Users(name="liux", extra='567'),
])
session.commit()
session.close()
删
session.query(Users).filter(Users.id > 2).delete()
session.commit()
改
session.query(Users).filter(Users.id > 1).update({"name" : "test"})
'''
synchronize_session用于query在进行delete or update操作时,对session的同步策略。
False - 不对session进行同步,直接进行delete or update操作。
'fetch' - 在delete or update操作之前,先发一条sql到数据库获取符合条件的记录。
'evaluate' - 在delete or update操作之前,用query中的条件直接对session
的identity_map中的objects进行eval操作,将符合条件的记录下来。
'''
session.query(Users).filter(Users.id > 1).update
({Users.name: Users.name + "123"}, synchronize_session=False)
session.commit()
查
res = session.query(Users).all()
res = session.query(Users.name, Users.extra).all()
# 条件查询
res = session.query(Users).filter_by(name='liu').all()
res = session.query(Users).filter_by(name='liu').first()
res = session.query(Users).filter(Users.id > 1, Users.name == 'liu').all()
res = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'liu').all()
'''
like,in_, and_ ,or_, != ,>, <, >=, <=
只有 filter 能用,filter_by 不能用
'''
# 查询某个字段的值属于某个列表(in_)
res = session.query(Users).filter(Users.id.in_([1,3,4])).all()
res = session.query(Users).filter(Users.id.in_
(session.query(Users.id).filter_by(name='liu'))).all()
from sqlalchemy import or_,and_
# 模糊查询(like)
res = session.query(Users).filter(Users.name.like('%li%')).all()
# 多条件查询(and_,建议使用filter_by,写法简洁)
res = session.query(Users).filter(and_(Users.name=='liu', Users.id==1)).all()
# 符合任一条件即可(or_)
res =session.query(Users).filter(or_(User.name.startswith('liu'))).all()
res = session.query(Users).filter(or_(Users.name=='liu', Users.id==2)).all()
# 对 Users 表的 name 列进行排序
res = session.query(Users).order_by(Users.name).all()
res = session.query(Users).order_by(Users.name.desc()).all() # 降序排序
# 限制数量
res = session.query(Users).order_by(Users.name.desc()).limit(2).all()
# 切片也可以
res = session.query(Users).order_by(Users.name.desc())[1:]
res = session.query(User.username).offset(2).all() #偏移量
res = session.query(User.username).slice(1,4).all() #切片
# 统计数量,注意这里没有 .all() 了
res = session.query(Users).order_by(Users.name.desc()).count()
# 分组,函数
from sqlalchemy.sql import func
res = session.query(Users).group_by(Users.extra).all()
res = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id))
.group_by(Users.name).all()
res = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id))
.group_by(Users.name)
.having(func.min(Users.id) >2).all()
'''
连表
join:内连接
leftjoin:左连接
outerjoin:外连接
'''
res = session.query(Users, Favor).filter(Users.id == Fav
or.nid).all()
res = session.query(Person).join(Favor).all()
res = session.query(Person).join(Favor, isouter=True).all()
组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
res = q1.union(q2).all()
flask_sqlalchemy (与sqlalchemy差不多) 第二种查询方式(flask_sqlalchemy 查询语句)
from flask_sqlalchemy import SQLAlchemy
from flask import Flask
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:postgres@192.168.91.13:5432/legaldata'
'''设置sqlalchemy自动跟踪数据库'''
SQLALCHEMY_TRACK_MODIFICATIONS = True
# 创建1个SQLAlichemy实例
db = SQLAlchemy(app)
"""
创建数据库模型类(继承 sqlalchemy 工具对象中的Model类),一个模型类对应一张模型表
数据库表名的常见规范:
(1) 数据库名缩写_表名 (2) tbl_表名
"""
class Role(db.Model):
"""用户身份表"""
__tablename__ = "tb_roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
'''
relationship() :第一个参数表明这个关系的另一端是哪个模型(类)
把两个表关联在一起,不添加也是可以的,根据自己的需求
backref : 在关系的另一模型中添加反向引用
相当于给要关联的表添加一个role属性
不添加也是可以的,根据自己的需求
'''
user = db.relationship("User", backref="rpp") # 从模型类中
def __repr__(self):
"""定义之后,可以让显示对象的时候更直观"""
return "Role object : id=%s,name=%s" % (self.id,self.name)
class User(db.Model):
"""用户表"""
__tablename__ = "tb_users" # 指明数据库的表名
id = db.Column(db.Integer, primary_key=True) # 整型的主键,会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128), unique=True)
password = db.Column(db.String(128))
# db.ForeignKey 外键关联
# User类中添加了一个role_id变量,数据类型db.Integer,第二个参数指定外键是哪个表中哪个id。
role_id = db.Column(db.Integer, db.ForeignKey("tb_roles.id")) # 从底层中
def __repr__(self):
"""定义返回的数据,定义之后,可以让显示对象的时候更直观"""
return "User object : name=%s" % self.name
if __name__ == '__main__':
'''清除数据库中的所有数据,没在Model的不清空'''
# db.drop_all()
'''创建所有表'''
# db.create_all()
Role.query.all()
"""另一种查询方式"""
db.session.query(Role).all()
PS:
# Query 根据返回结果来看, rs是一个 Query 对象,打印出来可以看到转化的 SQL
rs = session.query(User).filter(Users.username=='liu')
# all 是返回所有符合条件的数据
rs = session.query(Users).all() #查询所有数据,返回一个列表
print(rs)
print(rs[0].username) #属性访问
# first 是返回所有符合条件数据的第一条数据
rs = session.query(User).first() #查询第一条数据
过滤函数
filter 是一个过滤函数,过滤条件都可以书写在此函数中,不同的条件之间用 逗号 分隔
filter_by 也是一个过滤函数,但是功能要弱一些
二者都是 过滤函数,但是使用有如下差别:
1. filter 中需要添加 类对象,filter_by不需要
2. filter_by 中只能添加等于的条件,不能添加 不等于、大于小于等条件,filter没有这个限制
全部评论 (0)
还没有任何评论哟~
