""" 数据库连接管理 """ import pymysql from contextlib import contextmanager import os import logging logger = logging.getLogger(__name__) class Database: """数据库连接类""" def __init__(self): self.host = os.getenv('DB_HOST', 'localhost') self.port = int(os.getenv('DB_PORT', 3306)) self.user = os.getenv('DB_USER', 'root') self.password = os.getenv('DB_PASSWORD', '') self.database = os.getenv('DB_NAME', 'auto_trade_sys') @contextmanager def get_connection(self): """获取数据库连接(上下文管理器)""" conn = None try: conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=False ) yield conn except Exception as e: if conn: conn.rollback() logger.error(f"数据库连接错误: {e}") raise finally: if conn: conn.close() def execute_query(self, query, params=None): """执行查询,返回所有结果""" with self.get_connection() as conn: with conn.cursor() as cursor: cursor.execute(query, params) conn.commit() return cursor.fetchall() def execute_one(self, query, params=None): """执行查询,返回单条结果""" with self.get_connection() as conn: with conn.cursor() as cursor: cursor.execute(query, params) conn.commit() return cursor.fetchone() def execute_update(self, query, params=None): """执行更新,返回影响行数""" with self.get_connection() as conn: with conn.cursor() as cursor: affected = cursor.execute(query, params) conn.commit() return affected def execute_many(self, query, params_list): """批量执行""" with self.get_connection() as conn: with conn.cursor() as cursor: affected = cursor.executemany(query, params_list) conn.commit() return affected # 全局数据库实例 db = Database()