""" 配置管理器 - 从数据库读取配置,兼容原有config.py 支持Redis缓存,实现配置即时生效 """ import sys import os import json from pathlib import Path from typing import Optional, Any # 加载.env文件 try: from dotenv import load_dotenv backend_dir = Path(__file__).parent project_root = backend_dir.parent env_files = [ backend_dir / '.env', project_root / '.env', ] for env_file in env_files: if env_file.exists(): load_dotenv(env_file, override=True) break else: load_dotenv(project_root / '.env', override=False) except ImportError: pass except Exception: pass # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 延迟导入,避免在trading_system中导入时因为缺少依赖而失败 try: from database.models import TradingConfig except ImportError as e: TradingConfig = None import logging logger = logging.getLogger(__name__) logger.warning(f"无法导入TradingConfig: {e},配置管理器将无法使用数据库") import logging logger = logging.getLogger(__name__) # 尝试导入同步Redis客户端(用于配置缓存) try: import redis REDIS_SYNC_AVAILABLE = True except ImportError: REDIS_SYNC_AVAILABLE = False redis = None class ConfigManager: """配置管理器 - 优先从Redis缓存读取,其次从数据库读取,回退到环境变量和默认值""" def __init__(self): self._cache = {} self._redis_client: Optional[redis.Redis] = None self._redis_connected = False self._init_redis() self._load_from_db() def _init_redis(self): """初始化Redis客户端(同步)""" if not REDIS_SYNC_AVAILABLE: logger.debug("redis-py未安装,配置缓存将不使用Redis") return try: # 从环境变量或配置获取Redis连接信息 redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379') redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true' redis_username = os.getenv('REDIS_USERNAME', None) redis_password = os.getenv('REDIS_PASSWORD', None) # 解析Redis URL if redis_url.startswith('rediss://') or redis_use_tls: # TLS连接 import ssl ssl_context = ssl.create_default_context() self._redis_client = redis.from_url( redis_url, username=redis_username, password=redis_password, ssl=ssl_context, decode_responses=True ) else: # 普通连接 self._redis_client = redis.from_url( redis_url, username=redis_username, password=redis_password, decode_responses=True ) # 测试连接 self._redis_client.ping() self._redis_connected = True logger.info("✓ Redis配置缓存连接成功") except Exception as e: logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存") self._redis_client = None self._redis_connected = False def _get_from_redis(self, key: str) -> Optional[Any]: """从Redis获取配置值""" if not self._redis_connected or not self._redis_client: return None try: # 使用Hash存储所有配置,键为 trading_config:{key} value = self._redis_client.hget('trading_config', key) if value: # 尝试解析JSON(如果是复杂类型) try: return json.loads(value) except (json.JSONDecodeError, TypeError): return value except Exception as e: logger.debug(f"从Redis获取配置失败 {key}: {e}") # 连接失败时,尝试重新连接 try: self._redis_client.ping() self._redis_connected = True except: self._redis_connected = False return None def _set_to_redis(self, key: str, value: Any): """设置配置到Redis""" if not self._redis_connected or not self._redis_client: return False try: # 使用Hash存储所有配置,键为 trading_config:{key} # 将值序列化为JSON(如果是复杂类型) if isinstance(value, (dict, list)): value_str = json.dumps(value) else: value_str = str(value) self._redis_client.hset('trading_config', key, value_str) # 设置整个Hash的过期时间为7天(配置不会频繁变化,但需要定期刷新) self._redis_client.expire('trading_config', 7 * 24 * 3600) return True except Exception as e: logger.debug(f"设置配置到Redis失败 {key}: {e}") # 连接失败时,尝试重新连接 try: self._redis_client.ping() self._redis_connected = True # 重试一次 if isinstance(value, (dict, list)): value_str = json.dumps(value) else: value_str = str(value) self._redis_client.hset('trading_config', key, value_str) self._redis_client.expire('trading_config', 7 * 24 * 3600) return True except: self._redis_connected = False return False def _load_all_to_redis(self): """将所有配置加载到Redis""" if not self._redis_connected or not self._redis_client: return try: # 批量设置所有配置到Redis pipe = self._redis_client.pipeline() for key, value in self._cache.items(): if isinstance(value, (dict, list)): value_str = json.dumps(value) else: value_str = str(value) pipe.hset('trading_config', key, value_str) pipe.expire('trading_config', 7 * 24 * 3600) pipe.execute() logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis") except Exception as e: logger.debug(f"同步配置到Redis失败: {e}") def _load_from_db(self): """从数据库加载配置(如果Redis中没有)""" if TradingConfig is None: logger.warning("TradingConfig未导入,无法从数据库加载配置") self._cache = {} return try: # 先尝试从Redis加载所有配置 if self._redis_connected and self._redis_client: try: # 测试连接是否真正可用 self._redis_client.ping() redis_configs = self._redis_client.hgetall('trading_config') if redis_configs and len(redis_configs) > 0: # 解析Redis中的配置 for key, value_str in redis_configs.items(): try: # 尝试解析JSON value = json.loads(value_str) except (json.JSONDecodeError, TypeError): # 如果不是JSON,尝试转换类型 value = value_str self._cache[key] = value logger.info(f"从Redis加载了 {len(self._cache)} 个配置项") return else: # Redis中没有配置,需要从数据库加载并同步到Redis logger.debug("Redis中没有配置数据,从数据库加载并同步到Redis") except Exception as e: logger.debug(f"从Redis加载配置失败: {e},回退到数据库") # 连接失败时,标记为未连接 try: self._redis_client.ping() except: self._redis_connected = False return # 从数据库加载配置(仅在Redis不可用或Redis中没有数据时) configs = TradingConfig.get_all() for config in configs: key = config['config_key'] value = TradingConfig._convert_value( config['config_value'], config['config_type'] ) self._cache[key] = value # 同时写入Redis缓存 self._set_to_redis(key, value) logger.info(f"从数据库加载了 {len(self._cache)} 个配置项,已同步到Redis") except Exception as e: logger.warning(f"从数据库加载配置失败,使用默认配置: {e}") self._cache = {} def get(self, key, default=None): """获取配置值""" # 1. 优先从Redis缓存读取(最新) redis_value = self._get_from_redis(key) if redis_value is not None: # 同时更新本地缓存 self._cache[key] = redis_value return redis_value # 2. 从本地缓存读取 if key in self._cache: return self._cache[key] # 3. 从环境变量读取 env_value = os.getenv(key) if env_value is not None: return env_value # 4. 返回默认值 return default def set(self, key, value, config_type='string', category='general', description=None): """设置配置(同时更新数据库、Redis缓存和本地缓存)""" if TradingConfig is None: logger.warning("TradingConfig未导入,无法更新数据库配置") self._cache[key] = value # 仍然尝试更新Redis self._set_to_redis(key, value) return try: # 1. 更新数据库 TradingConfig.set(key, value, config_type, category, description) # 2. 更新本地缓存 self._cache[key] = value # 3. 更新Redis缓存(确保trading_system能立即读取到最新配置) self._set_to_redis(key, value) logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)") except Exception as e: logger.error(f"更新配置失败: {e}") raise def reload(self): """重新加载配置(优先从Redis,其次从数据库)""" self._cache = {} self._load_from_db() def reload_from_redis(self): """强制从Redis重新加载配置(用于trading_system即时获取最新配置)""" # 如果Redis不可用,不重新加载,保持现有缓存 if not self._redis_connected or not self._redis_client: logger.debug("Redis未连接,跳过从Redis重新加载,保持现有缓存") return try: # 测试连接是否真正可用 self._redis_client.ping() except Exception as e: logger.debug(f"Redis连接不可用: {e},跳过从Redis重新加载") self._redis_connected = False return try: redis_configs = self._redis_client.hgetall('trading_config') if redis_configs and len(redis_configs) > 0: self._cache = {} # 清空缓存 for key, value_str in redis_configs.items(): try: value = json.loads(value_str) except (json.JSONDecodeError, TypeError): value = value_str self._cache[key] = value logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项") else: # Redis中没有配置,但不回退到数据库(避免频繁从数据库加载) logger.debug("Redis中没有配置数据,保持现有缓存") except Exception as e: logger.debug(f"从Redis重新加载配置失败: {e},保持现有缓存") # 连接失败时,标记为未连接 try: self._redis_client.ping() except: self._redis_connected = False def get_trading_config(self): """获取交易配置字典(兼容原有config.py的TRADING_CONFIG)""" return { # 仓位控制 'MAX_POSITION_PERCENT': self.get('MAX_POSITION_PERCENT', 0.08), # 提高单笔仓位到8% 'MAX_TOTAL_POSITION_PERCENT': self.get('MAX_TOTAL_POSITION_PERCENT', 0.40), # 提高总仓位到40% 'MIN_POSITION_PERCENT': self.get('MIN_POSITION_PERCENT', 0.02), # 提高最小仓位到2% 'MIN_MARGIN_USDT': self.get('MIN_MARGIN_USDT', 5.0), # 提高最小保证金到5美元 # 涨跌幅阈值 'MIN_CHANGE_PERCENT': self.get('MIN_CHANGE_PERCENT', 2.0), 'TOP_N_SYMBOLS': self.get('TOP_N_SYMBOLS', 10), # 风险控制 'STOP_LOSS_PERCENT': self.get('STOP_LOSS_PERCENT', 0.10), # 默认10% 'TAKE_PROFIT_PERCENT': self.get('TAKE_PROFIT_PERCENT', 0.30), # 默认30%(盈亏比3:1) 'MIN_STOP_LOSS_PRICE_PCT': self.get('MIN_STOP_LOSS_PRICE_PCT', 0.02), # 默认2% 'MIN_TAKE_PROFIT_PRICE_PCT': self.get('MIN_TAKE_PROFIT_PRICE_PCT', 0.03), # 默认3% 'USE_ATR_STOP_LOSS': self.get('USE_ATR_STOP_LOSS', True), # 是否使用ATR动态止损 'ATR_STOP_LOSS_MULTIPLIER': self.get('ATR_STOP_LOSS_MULTIPLIER', 1.8), # ATR止损倍数(1.5-2倍) 'ATR_TAKE_PROFIT_MULTIPLIER': self.get('ATR_TAKE_PROFIT_MULTIPLIER', 3.0), # ATR止盈倍数(3倍ATR) 'RISK_REWARD_RATIO': self.get('RISK_REWARD_RATIO', 3.0), # 盈亏比(止损距离的倍数) 'ATR_PERIOD': self.get('ATR_PERIOD', 14), # ATR计算周期 'USE_DYNAMIC_ATR_MULTIPLIER': self.get('USE_DYNAMIC_ATR_MULTIPLIER', False), # 是否根据波动率动态调整ATR倍数 'ATR_MULTIPLIER_MIN': self.get('ATR_MULTIPLIER_MIN', 1.5), # 动态ATR倍数最小值 'ATR_MULTIPLIER_MAX': self.get('ATR_MULTIPLIER_MAX', 2.5), # 动态ATR倍数最大值 # 市场扫描(1小时主周期) 'SCAN_INTERVAL': self.get('SCAN_INTERVAL', 3600), # 1小时 'TOP_N_SYMBOLS': self.get('TOP_N_SYMBOLS', 10), # 每次扫描后处理的交易对数量 'MAX_SCAN_SYMBOLS': self.get('MAX_SCAN_SYMBOLS', 500), # 扫描的最大交易对数量(0表示扫描所有) 'KLINE_INTERVAL': self.get('KLINE_INTERVAL', '1h'), 'PRIMARY_INTERVAL': self.get('PRIMARY_INTERVAL', '1h'), 'CONFIRM_INTERVAL': self.get('CONFIRM_INTERVAL', '4h'), 'ENTRY_INTERVAL': self.get('ENTRY_INTERVAL', '15m'), # 过滤条件 'MIN_VOLUME_24H': self.get('MIN_VOLUME_24H', 10000000), 'MIN_VOLATILITY': self.get('MIN_VOLATILITY', 0.02), # 高胜率策略参数 'MIN_SIGNAL_STRENGTH': self.get('MIN_SIGNAL_STRENGTH', 5), 'LEVERAGE': self.get('LEVERAGE', 10), 'USE_DYNAMIC_LEVERAGE': self.get('USE_DYNAMIC_LEVERAGE', True), 'MAX_LEVERAGE': self.get('MAX_LEVERAGE', 15), # 降低到15,更保守,配合更大的保证金 'USE_TRAILING_STOP': self.get('USE_TRAILING_STOP', True), 'TRAILING_STOP_ACTIVATION': self.get('TRAILING_STOP_ACTIVATION', 0.10), # 默认10%(给趋势更多空间) 'TRAILING_STOP_PROTECT': self.get('TRAILING_STOP_PROTECT', 0.05), # 默认5%(保护更多利润) } # 全局配置管理器实例 config_manager = ConfigManager() # 兼容原有config.py的接口 def get_config(key, default=None): """获取配置(兼容函数)""" return config_manager.get(key, default) def get_trading_config(): """获取交易配置(兼容函数)""" return config_manager.get_trading_config()