Advertisement

python ORM框架sqlalchemy使用详解

阅读量:
复制代码
    from sqlalchemy import create_engine
    
    #创建引擎
    engine = create_engine(
    "postgresql://postgres:postgres@xxxxxx",
    max_overflow=2,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有可用线程在队列最多等待的时间,否则报错
    pool_recycle=-1,  # 多久之后对线程池中的线程进行一次连接的回收(重置)
       encoding='utf-8'  #sqlalchemy 默认使用latin-1进行编码,所以当出现中文时设置编码类型
    )
    #执行sql语句
    engine.execute("INSERT INTO user (name) VALUES ('dadadadad')")
    
    result = engine.execute('select * from user')
    res = result.fetchall()
    print(res)

ORM功能使用 使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling /Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

创建表

复制代码
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    
    engine = create_engine(
    "postgresql://postgres:postgres@192.168.91.13:5432/legaldata",
    max_overflow=2,
    pool_size=5,  
    pool_timeout=30,  
    pool_recycle=-1  
    )
    
    Base = declarative_base()
    
    # 创建单表
    class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))
    
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
       Index('ix_id_name', 'name', 'extra'),
    )
    
    # 一对多
    class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
    
    class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    
    # 多对多
    class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
    
    class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    
    class Server(Base):
    __tablename__ = 'server'
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    
    
    def init_createdb():
    """
    根据类创建所有数据库表
    :return:
    """
    Base.metadata.create_all(engine)
    
    def init_dropdb():
    """
    根据类删除所有数据库表
    :return:
    """
    Base.metadata.drop_all(engine) 
       
    Session = sessionmaker(engine)  #生成一个会话类,用来操作数据
    session = Session()   
    
    
    增
    '''创建一个对象'''
    obj = Users(name="liu", extra='lx')
    # 单条
    session.add(obj)
    # 多条
    session.add_all([
    Users(name="liux", extra='123'),
    Users(name="liux", extra='567'),
    ])
    session.commit()
    session.close()
    
    删
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    
     
    
    改
    session.query(Users).filter(Users.id > 1).update({"name" : "test"})
    
    '''
     synchronize_session用于query在进行delete or update操作时,对session的同步策略。
     False - 不对session进行同步,直接进行delete or update操作。
     'fetch' - 在delete or update操作之前,先发一条sql到数据库获取符合条件的记录。
     'evaluate' - 在delete or update操作之前,用query中的条件直接对session
                  的identity_map中的objects进行eval操作,将符合条件的记录下来。
    '''
    session.query(Users).filter(Users.id > 1).update
    ({Users.name: Users.name + "123"}, synchronize_session=False)
    
    session.commit()
    
    
     
    查
    res = session.query(Users).all() 
    res = session.query(Users.name, Users.extra).all()
    # 条件查询
    res = session.query(Users).filter_by(name='liu').all()
    res = session.query(Users).filter_by(name='liu').first()
    res = session.query(Users).filter(Users.id > 1, Users.name == 'liu').all()
    res = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'liu').all()
    
    '''
       like,in_, and_ ,or_,  != ,>, <, >=, <= 
       只有 filter 能用,filter_by 不能用
    '''
    # 查询某个字段的值属于某个列表(in_)
    res = session.query(Users).filter(Users.id.in_([1,3,4])).all()
    res = session.query(Users).filter(Users.id.in_
             (session.query(Users.id).filter_by(name='liu'))).all()
    
    from sqlalchemy import or_,and_
    # 模糊查询(like)
    res = session.query(Users).filter(Users.name.like('%li%')).all()
    # 多条件查询(and_,建议使用filter_by,写法简洁)
    res = session.query(Users).filter(and_(Users.name=='liu', Users.id==1)).all()
    # 符合任一条件即可(or_)
    res =session.query(Users).filter(or_(User.name.startswith('liu'))).all() 
    res = session.query(Users).filter(or_(Users.name=='liu', Users.id==2)).all()
      
    # 对 Users 表的 name 列进行排序
    res = session.query(Users).order_by(Users.name).all()
    res = session.query(Users).order_by(Users.name.desc()).all() # 降序排序
    
    # 限制数量
    res = session.query(Users).order_by(Users.name.desc()).limit(2).all()
    # 切片也可以
    res = session.query(Users).order_by(Users.name.desc())[1:]
    res = session.query(User.username).offset(2).all()  #偏移量
    res = session.query(User.username).slice(1,4).all()  #切片
    
    # 统计数量,注意这里没有 .all() 了
    res = session.query(Users).order_by(Users.name.desc()).count()  
     
    
    
    
       # 分组,函数
    from sqlalchemy.sql import func
    res = session.query(Users).group_by(Users.extra).all()
    res = session.query(
            func.max(Users.id),
            func.sum(Users.id),
            func.min(Users.id))
        .group_by(Users.name).all()
    
    res = session.query(
            func.max(Users.id),
            func.sum(Users.id),
            func.min(Users.id))
        .group_by(Users.name)
        .having(func.min(Users.id) >2).all()  
        
    '''
       连表  
       join:内连接  
       leftjoin:左连接
       outerjoin:外连接
    '''
    res = session.query(Users, Favor).filter(Users.id == Fav
    
    or.nid).all()
    res = session.query(Person).join(Favor).all()
    res = session.query(Person).join(Favor, isouter=True).all()

组合

复制代码
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    res = q1.union(q2).all()

flask_sqlalchemy (与sqlalchemy差不多) 第二种查询方式(flask_sqlalchemy 查询语句)

复制代码
     from flask_sqlalchemy import SQLAlchemy
     from flask import Flask
     app = Flask(__name__)
     app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:postgres@192.168.91.13:5432/legaldata'
      '''设置sqlalchemy自动跟踪数据库'''
      SQLALCHEMY_TRACK_MODIFICATIONS = True
      # 创建1个SQLAlichemy实例
     db = SQLAlchemy(app)
     
     
    """
    创建数据库模型类(继承 sqlalchemy 工具对象中的Model类),一个模型类对应一张模型表
    数据库表名的常见规范:
        (1) 数据库名缩写_表名   (2) tbl_表名
    """
    class Role(db.Model):
    """用户身份表"""
    __tablename__ = "tb_roles"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32), unique=True)
    '''
        relationship() :第一个参数表明这个关系的另一端是哪个模型(类)
           把两个表关联在一起,不添加也是可以的,根据自己的需求
        backref : 在关系的另一模型中添加反向引用
                   相当于给要关联的表添加一个role属性
                   不添加也是可以的,根据自己的需求 
    '''
    user = db.relationship("User", backref="rpp")  # 从模型类中
    
    def __repr__(self):
        """定义之后,可以让显示对象的时候更直观"""
        return "Role object : id=%s,name=%s" % (self.id,self.name)
    
    
    class User(db.Model):
    """用户表"""
    __tablename__ = "tb_users"  # 指明数据库的表名
    
    id = db.Column(db.Integer, primary_key=True)  # 整型的主键,会默认设置为自增主键
    name = db.Column(db.String(64), unique=True)
    email = db.Column(db.String(128), unique=True)
    password = db.Column(db.String(128))
    # db.ForeignKey  外键关联
    # User类中添加了一个role_id变量,数据类型db.Integer,第二个参数指定外键是哪个表中哪个id。
    role_id = db.Column(db.Integer, db.ForeignKey("tb_roles.id"))  # 从底层中
    
    def __repr__(self):
        """定义返回的数据,定义之后,可以让显示对象的时候更直观"""
        return "User object : name=%s" % self.name
    
    
    if __name__ == '__main__':
    '''清除数据库中的所有数据,没在Model的不清空'''
    # db.drop_all()
    '''创建所有表'''
    # db.create_all()
    Role.query.all()   
    """另一种查询方式"""
    db.session.query(Role).all()

PS:

复制代码
    #  Query 根据返回结果来看, rs是一个 Query 对象,打印出来可以看到转化的 SQL
    rs = session.query(User).filter(Users.username=='liu')
    
    # all 是返回所有符合条件的数据
    rs = session.query(Users).all() #查询所有数据,返回一个列表
    print(rs)
    print(rs[0].username) #属性访问
    
    # first 是返回所有符合条件数据的第一条数据
    rs = session.query(User).first()  #查询第一条数据
    
    过滤函数
    filter 是一个过滤函数,过滤条件都可以书写在此函数中,不同的条件之间用 逗号 分隔
    filter_by 也是一个过滤函数,但是功能要弱一些
    二者都是 过滤函数,但是使用有如下差别:
    1. filter 中需要添加 类对象,filter_by不需要
    2. filter_by 中只能添加等于的条件,不能添加 不等于、大于小于等条件,filter没有这个限制

全部评论 (0)

还没有任何评论哟~