Advertisement

Python3配置MySQL数据库连接池

阅读量:

一、首先第一步,我们先来编写数据库配置文件,在test001keshanchu下创建目录和文件test_db\config.ini,内容如下

复制代码
 [DATABASE]

    
 host = 10.182.27.158
    
 port = 3306
    
 user = root
    
 passwd = 123456wt
    
 database = xttest_new
    
 dbchar = utf8
    
 table = interface_test

二、第二步,我们写一个从config.ini配置文件中读取我们想要信息的readConfig.py文件,在test_db下创建readConfig.py文件

复制代码
 import os, configparser

    
  
    
 path = os.path.split(os.path.realpath(__file__))[0]#得到readConfig.py文件的上级目录C:\Users\songlihui\PycharmProjects\test001keshanchu\test_db
    
 config_path = os.path.join(path, 'config.ini')#得到配置文件目录,配置文件目录为path下的\config.ini
    
 config = configparser.ConfigParser()#调用配置文件读取
    
 config.read(config_path, encoding='utf-8')
    
  
    
 class ReadConfig():
    
  
    
     def get_mysql(self, name):
    
     value = config.get('DATABASE', name)#通过config.get拿到配置文件中DATABASE的name的对应值
    
     return value
    
  
    
  
    
 if __name__ == '__main__':
    
     print('path值为:', path)#测试path内容
    
     print('config_path', config_path)#打印输出config_path测试内容是否正确
    
     print('通过config.get拿到配置文件中DATABASE的host的对应值:', ReadConfig().get_mysql('host'))#通过上面的ReadConfig().get_mysql方法获取配置文件中DATABASE的'host'的对应值为10.182.27.158
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/w1lB3U7tFnWGpsAERLvgf0PDdkZQ.png)

此文件,输入内容为

三、编写我们的数据库连接池文件,在test_db创建configDB.py

复制代码
 # -*- coding: UTF-8 -*-

    
 """
    
 1、执行带参数的SQL时,请先用sql语句指定需要输入的条件列表,然后再用tuple/list进行条件批配
    
 2、在格式SQL中不需要使用引号指定数据类型,系统会根据输入参数自动识别
    
 3、在输入的值中不需要使用转意函数,系统会自动处理
    
 """
    
 import MySQLdb
    
 from DBUtils.PooledDB import PooledDB
    
 from test_db import readConfig
    
  
    
 config = readConfig.ReadConfig()#实例化
    
  
    
 """
    
 Config是一些数据库的配置文件,通过调用我们写的readConfig来获取配置文件中对应值
    
 """
    
 host = config.get_mysql('host')
    
 port = int(config.get_mysql('port'))
    
 user = config.get_mysql('user')
    
 passwd  = config.get_mysql('passwd')
    
 database = config.get_mysql('database')
    
 dbchar = config.get_mysql('dbchar')
    
  
    
  
    
 class Mysql(object):
    
     """
    
     MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
    
         释放连接对象;conn.close()或del conn
    
     """
    
     # 连接池对象
    
     __pool = None
    
  
    
     def __init__(self):
    
     # 数据库构造函数,从连接池中取出连接,并生成操作游标
    
     self._conn = Mysql.__getConn()
    
     self._cursor = self._conn.cursor()
    
  
    
     @staticmethod
    
     def __getConn():
    
     """
    
     @summary: 静态方法,从连接池中取出连接
    
     @return MySQLdb.connection
    
     """
    
     if Mysql.__pool is None:
    
         __pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host=host, port=port, user=user, passwd=passwd, db=database)
    
     return __pool.connection()
    
  
    
     def getAll(self, sql, param=None):
    
     """
    
     @summary: 执行查询,并取出所有结果集
    
     @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
    
     @param param: 可选参数,条件列表值(元组/列表)
    
     @return: result list(字典对象)/boolean 查询到的结果集
    
     """
    
     if param is None:
    
         count = self._cursor.execute(sql)
    
     else:
    
         count = self._cursor.execute(sql, param)
    
     if count > 0:
    
         result = self._cursor.fetchall()
    
     else:
    
         result = False
    
     return result
    
  
    
     def getOne(self, sql, param=None):
    
     """
    
     @summary: 执行查询,并取出第一条
    
     @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
    
     @param param: 可选参数,条件列表值(元组/列表)
    
     @return: result list/boolean 查询到的结果集
    
     """
    
     if param is None:
    
         count = self._cursor.execute(sql)
    
     else:
    
         count = self._cursor.execute(sql, param)
    
     if count > 0:
    
         result = self._cursor.fetchone()
    
     else:
    
         result = False
    
     return result
    
  
    
     def getMany(self, sql, num, param=None):
    
     """
    
     @summary: 执行查询,并取出num条结果
    
     @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
    
     @param num:取得的结果条数
    
     @param param: 可选参数,条件列表值(元组/列表)
    
     @return: result list/boolean 查询到的结果集
    
     """
    
     if param is None:
    
         count = self._cursor.execute(sql)
    
     else:
    
         count = self._cursor.execute(sql, param)
    
     if count > 0:
    
         result = self._cursor.fetchmany(num)
    
     else:
    
         result = False
    
     return result
    
  
    
     def insertOne(self, sql, value):
    
     """
    
     @summary: 向数据表插入一条记录
    
     @param sql:要插入的SQL格式
    
     @param value:要插入的记录数据tuple/list
    
     @return: insertId 受影响的行数
    
     """
    
     self._cursor.execute(sql, value)
    
     return self.__getInsertId()
    
  
    
     def insertMany(self, sql, values):
    
     """
    
     @summary: 向数据表插入多条记录
    
     @param sql:要插入的SQL格式
    
     @param values:要插入的记录数据tuple(tuple)/list[list]
    
     @return: count 受影响的行数
    
     """
    
     count = self._cursor.executemany(sql, values)
    
     return count
    
  
    
     def __getInsertId(self):
    
     """
    
     获取当前连接最后一次插入操作生成的id,如果没有则为0
    
     """
    
     self._cursor.execute("SELECT @@IDENTITY AS id")
    
     result = self._cursor.fetchall()
    
     return result[0]['id']
    
  
    
     def __query(self, sql, param=None):
    
     if param is None:
    
         count = self._cursor.execute(sql)
    
     else:
    
         count = self._cursor.execute(sql, param)
    
     return count
    
  
    
     def update(self, sql, param=None):
    
     """
    
     @summary: 更新数据表记录
    
     @param sql: SQL格式及条件,使用(%s,%s)
    
     @param param: 要更新的  值 tuple/list
    
     @return: count 受影响的行数
    
     """
    
     return self.__query(sql, param)
    
  
    
     def delete(self, sql, param=None):
    
     """
    
     @summary: 删除数据表记录
    
     @param sql: SQL格式及条件,使用(%s,%s)
    
     @param param: 要删除的条件 值 tuple/list
    
     @return: count 受影响的行数
    
     """
    
     return self.__query(sql, param)
    
  
    
     def begin(self):
    
     """
    
     @summary: 开启事务
    
     """
    
     self._conn.autocommit(0)
    
  
    
     def end(self, option='commit'):
    
     """
    
     @summary: 结束事务
    
     """
    
     if option == 'commit':
    
         self._conn.commit()
    
     else:
    
         self._conn.rollback()
    
  
    
     def dispose(self, isEnd=1):
    
     """
    
     @summary: 释放连接池资源
    
     """
    
     if isEnd == 1:
    
         self.end('commit')
    
     else:
    
         self.end('rollback');
    
     self._cursor.close()
    
     self._conn.close()
    
  
    
 if __name__ == '__main__':
    
     print(host, port, user, passwd, database)
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/ebOIY4Wqdn1thoT7zfMkXg0Q2pyJ.png)

四、测试验证在test_db下创建test_sql.py

复制代码
 # coding:utf-8

    
 import test_db.configDB
    
  
    
 mysql = test_db.configDB.Mysql()
    
 sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息
    
 result = mysql.getAll(sqlAll, None)
    
 if result:
    
     print("get all")
    
     for row in result:
    
     print(row[0], row[1])
    
 mysql.dispose()#释放连接池资源
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/il2rp8VZnQfWPKGODusBz3YhHegL.png)

五、结果

注意事项:

config.ini的配置项根据自己的环境填写对应的正确数据,sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息

全部评论 (0)

还没有任何评论哟~