""" 配置管理器 - 从数据库读取配置,兼容原有config.py 支持Redis缓存,实现配置即时生效 """ import sys import os import json import re from pathlib import Path from typing import Optional, Any # 加载.env文件 try: from dotenv import load_dotenv backend_dir = Path(__file__).parent project_root = backend_dir.parent env_files = [ backend_dir / '.env', project_root / '.env', ] for env_file in env_files: if env_file.exists(): load_dotenv(env_file, override=True) break else: load_dotenv(project_root / '.env', override=False) except ImportError: pass except Exception: pass # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 延迟导入,避免在trading_system中导入时因为缺少依赖而失败 try: from database.models import TradingConfig, Account except ImportError as e: TradingConfig = None Account = None import logging logger = logging.getLogger(__name__) logger.warning(f"无法导入TradingConfig: {e},配置管理器将无法使用数据库") import logging logger = logging.getLogger(__name__) # 平台兜底:策略核心使用全局账号配置(默认 account_id=1),普通用户账号只允许调整“风险旋钮” # - 风险旋钮:每个账号独立(仓位/频次等) # - 其它策略参数:统一从全局账号读取,避免每个用户乱改导致策略不可控 try: GLOBAL_STRATEGY_ACCOUNT_ID = int(os.getenv("ATS_GLOBAL_STRATEGY_ACCOUNT_ID") or "1") except Exception: GLOBAL_STRATEGY_ACCOUNT_ID = 1 RISK_KNOBS_KEYS = { "MIN_MARGIN_USDT", "MIN_POSITION_PERCENT", "MAX_POSITION_PERCENT", "MAX_TOTAL_POSITION_PERCENT", "AUTO_TRADE_ENABLED", "MAX_OPEN_POSITIONS", "MAX_DAILY_ENTRIES", } # 尝试导入同步Redis客户端(用于配置缓存) try: import redis REDIS_SYNC_AVAILABLE = True except ImportError: REDIS_SYNC_AVAILABLE = False redis = None class ConfigManager: """配置管理器 - 优先从Redis缓存读取,其次从数据库读取,回退到环境变量和默认值""" _instances = {} def __init__(self, account_id: int = 1): self.account_id = int(account_id or 1) self._cache = {} self._redis_client: Optional[redis.Redis] = None self._redis_connected = False self._redis_hash_key = f"trading_config:{self.account_id}" self._legacy_hash_key = "trading_config" if self.account_id == 1 else None self._init_redis() self._load_from_db() @classmethod def for_account(cls, account_id: int): aid = int(account_id or 1) inst = cls._instances.get(aid) if inst: return inst inst = cls(account_id=aid) cls._instances[aid] = inst return inst def _init_redis(self): """初始化Redis客户端(同步)""" if not REDIS_SYNC_AVAILABLE: logger.debug("redis-py未安装,配置缓存将不使用Redis") return try: # 从环境变量或配置获取Redis连接信息 redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379') redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true' redis_username = os.getenv('REDIS_USERNAME', None) redis_password = os.getenv('REDIS_PASSWORD', None) # 验证URL格式 if not redis_url or not isinstance(redis_url, str): logger.warning(f"Redis URL无效: {redis_url},使用默认值") redis_url = 'redis://localhost:6379' # 如果使用TLS但URL不是rediss://,自动转换 if redis_use_tls and not redis_url.startswith('rediss://'): if redis_url.startswith('redis://'): redis_url = redis_url.replace('redis://', 'rediss://', 1) else: # 如果URL格式不正确,构建新的URL logger.warning(f"Redis URL格式不正确且需要TLS: {redis_url},尝试构建rediss://URL") # 尝试从URL中提取主机和端口 if '://' in redis_url: parts = redis_url.split('://', 1) host_port = parts[1].split('/')[0] if '/' in parts[1] else parts[1] redis_url = f"rediss://{host_port}" else: # 如果完全没有scheme,添加rediss:// redis_url = f"rediss://{redis_url}" logger.info(f"已自动转换Redis URL为TLS格式: {redis_url}") # 解析Redis URL # redis-py的同步客户端也支持通过ssl_cert_reqs等参数配置TLS # 当URL是rediss://时,会自动启用SSL connection_kwargs = { 'username': redis_username, 'password': redis_password, 'decode_responses': True } if redis_url.startswith('rediss://') or redis_use_tls: # TLS连接 - 使用redis-py支持的SSL参数 # 从环境变量获取SSL配置(如果未设置,使用默认值) ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required') ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None) connection_kwargs['select'] = os.getenv('REDIS_SELECT', 0) if connection_kwargs['select'] is not None: connection_kwargs['select'] = int(connection_kwargs['select']) else: connection_kwargs['select'] = 0 logger.info(f"使用 Redis 数据库: {connection_kwargs['select']}") # 设置SSL参数 connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs if ssl_ca_certs: connection_kwargs['ssl_ca_certs'] = ssl_ca_certs # 根据ssl_cert_reqs设置主机名验证 if ssl_cert_reqs == 'none': connection_kwargs['ssl_check_hostname'] = False elif ssl_cert_reqs == 'required': connection_kwargs['ssl_check_hostname'] = True else: # optional connection_kwargs['ssl_check_hostname'] = False logger.info(f"使用 TLS 连接 Redis: {redis_url} (ssl_cert_reqs={ssl_cert_reqs})") # 创建Redis客户端(同步) self._redis_client = redis.from_url( redis_url, **connection_kwargs ) # 测试连接 self._redis_client.ping() self._redis_connected = True logger.info("✓ Redis配置缓存连接成功") except Exception as e: logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存") self._redis_client = None self._redis_connected = False def _get_from_redis(self, key: str) -> Optional[Any]: """从Redis获取配置值""" if not self._redis_connected or not self._redis_client: return None try: # 使用账号维度 Hash 存储所有配置 value = self._redis_client.hget(self._redis_hash_key, key) if (value is None or value == '') and self._legacy_hash_key: value = self._redis_client.hget(self._legacy_hash_key, key) if value is not None and value != '': return self._coerce_redis_value(value) except Exception as e: logger.debug(f"从Redis获取配置失败 {key}: {e}") # 连接失败时,尝试重新连接 try: self._redis_client.ping() self._redis_connected = True except: self._redis_connected = False return None @staticmethod def _coerce_redis_value(value_str: Any) -> Any: """ 将 Redis Hash 中的字符串值尽量还原为正确类型。 兼容历史写法: - 之前 bool 会被写成 "True"/"False"(非 JSON),导致读取后变成字符串,进而在逻辑判断里永远为 True。 - 现在会优先把 bool/int/float/dict/list 用 JSON 序列化写入(见 _set_to_redis)。 """ if value_str is None: return None # redis-py decode_responses=True 时是 str;否则可能是 bytes if isinstance(value_str, bytes): try: value_str = value_str.decode('utf-8', errors='ignore') except Exception: value_str = str(value_str) s = str(value_str).strip() if s == '': return '' # 1) JSON 优先(dict/list/bool/number 都能覆盖) try: return json.loads(s) except Exception: pass sl = s.lower() # 2) bool 兼容(历史 "True"/"False") if sl in ('true', 'false', '1', '0', 'yes', 'no', 'on', 'off'): return sl in ('true', '1', 'yes', 'on') # 3) number 兼容(避免把 "1h" 之类误判) # int: -?\d+ if re.fullmatch(r'-?\d+', s): try: return int(s) except Exception: return s # float: -?\d+\.\d+ if re.fullmatch(r'-?\d+\.\d+', s): try: return float(s) except Exception: return s return s def _set_to_redis(self, key: str, value: Any): """设置配置到Redis(账号维度 + legacy兼容)""" if not self._redis_connected or not self._redis_client: return False try: # 将值序列化:复杂类型/基础类型使用 JSON,避免 bool 被写成 "False" 字符串后逻辑误判 if isinstance(value, (dict, list, bool, int, float)): value_str = json.dumps(value, ensure_ascii=False) else: value_str = str(value) self._redis_client.hset(self._redis_hash_key, key, value_str) self._redis_client.expire(self._redis_hash_key, 3600) if self._legacy_hash_key: self._redis_client.hset(self._legacy_hash_key, key, value_str) self._redis_client.expire(self._legacy_hash_key, 3600) return True except Exception as e: logger.debug(f"设置配置到Redis失败 {key}: {e}") # 连接失败时,尝试重新连接 try: self._redis_client.ping() self._redis_connected = True # 重试一次 if isinstance(value, (dict, list, bool, int, float)): value_str = json.dumps(value, ensure_ascii=False) else: value_str = str(value) self._redis_client.hset(self._redis_hash_key, key, value_str) self._redis_client.expire(self._redis_hash_key, 3600) if self._legacy_hash_key: self._redis_client.hset(self._legacy_hash_key, key, value_str) self._redis_client.expire(self._legacy_hash_key, 3600) return True except: self._redis_connected = False return False def _load_all_to_redis(self): """将所有配置加载到Redis""" if not self._redis_connected or not self._redis_client: return try: # 批量设置所有配置到Redis(账号维度) pipe = self._redis_client.pipeline() for key, value in self._cache.items(): if isinstance(value, (dict, list, bool, int, float)): value_str = json.dumps(value, ensure_ascii=False) else: value_str = str(value) pipe.hset(self._redis_hash_key, key, value_str) pipe.expire(self._redis_hash_key, 3600) if self._legacy_hash_key: for key, value in self._cache.items(): if isinstance(value, (dict, list, bool, int, float)): value_str = json.dumps(value, ensure_ascii=False) else: value_str = str(value) pipe.hset(self._legacy_hash_key, key, value_str) pipe.expire(self._legacy_hash_key, 3600) pipe.execute() logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis") except Exception as e: logger.debug(f"同步配置到Redis失败: {e}") def _load_from_db(self): """从数据库加载配置(如果Redis中没有)""" if TradingConfig is None: logger.warning("TradingConfig未导入,无法从数据库加载配置") self._cache = {} return try: # 先尝试从Redis加载所有配置 if self._redis_connected and self._redis_client: try: # 测试连接是否真正可用 self._redis_client.ping() redis_configs = self._redis_client.hgetall(self._redis_hash_key) if (not redis_configs) and self._legacy_hash_key: redis_configs = self._redis_client.hgetall(self._legacy_hash_key) if redis_configs and len(redis_configs) > 0: # 解析Redis中的配置 for key, value_str in redis_configs.items(): self._cache[key] = self._coerce_redis_value(value_str) logger.info(f"从Redis加载了 {len(self._cache)} 个配置项") return else: # Redis中没有配置,需要从数据库加载并同步到Redis logger.debug("Redis中没有配置数据,从数据库加载并同步到Redis") except Exception as e: logger.debug(f"从Redis加载配置失败: {e},回退到数据库") # 连接失败时,标记为未连接 try: self._redis_client.ping() except: self._redis_connected = False # 从数据库加载配置(仅在Redis不可用或Redis中没有数据时) configs = TradingConfig.get_all(account_id=self.account_id) for config in configs: key = config['config_key'] value = TradingConfig._convert_value( config['config_value'], config['config_type'] ) self._cache[key] = value # 同时写入Redis缓存 self._set_to_redis(key, value) logger.info(f"从数据库加载了 {len(self._cache)} 个配置项,已同步到Redis") except Exception as e: logger.warning(f"从数据库加载配置失败,使用默认配置: {e}") self._cache = {} def get(self, key, default=None): """获取配置值""" # 账号私有:API Key/Secret/Testnet 从 accounts 表读取(不走 trading_config) if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None: try: api_key, api_secret, use_testnet = Account.get_credentials(self.account_id) if key == "BINANCE_API_KEY": return api_key if api_key else default if key == "BINANCE_API_SECRET": return api_secret if api_secret else default return bool(use_testnet) except Exception: # 回退到后续逻辑(旧数据/无表) pass # 1. 优先从Redis缓存读取(最新) # 注意:只在Redis连接正常时尝试读取,避免频繁连接失败 if self._redis_connected and self._redis_client: redis_value = self._get_from_redis(key) if redis_value is not None: # 同时更新本地缓存 self._cache[key] = redis_value return redis_value # 2. 从本地缓存读取 if key in self._cache: return self._cache[key] # 3. 从环境变量读取 env_value = os.getenv(key) if env_value is not None: return env_value # 4. 返回默认值 return default def set(self, key, value, config_type='string', category='general', description=None): """设置配置(同时更新数据库、Redis缓存和本地缓存)""" # 账号私有:API Key/Secret/Testnet 写入 accounts 表 if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None: try: if key == "BINANCE_API_KEY": Account.update_credentials(self.account_id, api_key=str(value or "")) elif key == "BINANCE_API_SECRET": Account.update_credentials(self.account_id, api_secret=str(value or "")) else: Account.update_credentials(self.account_id, use_testnet=bool(value)) self._cache[key] = value return except Exception as e: logger.error(f"更新账号API配置失败: {e}") raise if TradingConfig is None: logger.warning("TradingConfig未导入,无法更新数据库配置") self._cache[key] = value # 仍然尝试更新Redis self._set_to_redis(key, value) return try: # 1. 更新数据库 TradingConfig.set(key, value, config_type, category, description, account_id=self.account_id) # 2. 更新本地缓存 self._cache[key] = value # 3. 更新Redis缓存(确保trading_system能立即读取到最新配置) self._set_to_redis(key, value) logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)") except Exception as e: logger.error(f"更新配置失败: {e}") raise def reload(self): """重新加载配置(优先从Redis,其次从数据库)""" self._cache = {} self._load_from_db() def reload_from_redis(self): """强制从Redis重新加载配置(用于trading_system即时获取最新配置)""" # 如果Redis不可用,不重新加载,保持现有缓存 if not self._redis_connected or not self._redis_client: logger.debug("Redis未连接,跳过从Redis重新加载,保持现有缓存") return try: # 测试连接是否真正可用 self._redis_client.ping() except Exception as e: logger.debug(f"Redis连接不可用: {e},跳过从Redis重新加载") self._redis_connected = False return try: redis_configs = self._redis_client.hgetall(self._redis_hash_key) if (not redis_configs) and self._legacy_hash_key: redis_configs = self._redis_client.hgetall(self._legacy_hash_key) if redis_configs and len(redis_configs) > 0: self._cache = {} # 清空缓存 for key, value_str in redis_configs.items(): self._cache[key] = self._coerce_redis_value(value_str) logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项") else: # Redis中没有配置,但不回退到数据库(避免频繁从数据库加载) logger.debug("Redis中没有配置数据,保持现有缓存") except Exception as e: logger.debug(f"从Redis重新加载配置失败: {e},保持现有缓存") # 连接失败时,标记为未连接 try: self._redis_client.ping() except: self._redis_connected = False def get_trading_config(self): """获取交易配置字典(兼容原有config.py的TRADING_CONFIG)""" # 全局策略配置管理器(避免递归:当 self 就是全局账号时,不做跨账号读取) global_mgr = None if self.account_id != int(GLOBAL_STRATEGY_ACCOUNT_ID or 1): try: global_mgr = ConfigManager.for_account(int(GLOBAL_STRATEGY_ACCOUNT_ID or 1)) except Exception: global_mgr = None # 预热全局 cache:避免每个 key 都 HGET 一次 if global_mgr is not None: try: global_mgr.reload_from_redis() except Exception: pass def eff_get(key: str, default: Any): """ 策略核心:默认从全局账号读取(GLOBAL_STRATEGY_ACCOUNT_ID)。 风险旋钮:从当前账号读取。 """ # API key/secret/testnet 永远按账号读取(在 get() 内部已处理) if key in RISK_KNOBS_KEYS or global_mgr is None: return self.get(key, default) try: if key in global_mgr._cache: # noqa: SLF001 return global_mgr._cache.get(key, default) # noqa: SLF001 return global_mgr.get(key, default) except Exception: return self.get(key, default) return { # 仓位控制 'MAX_POSITION_PERCENT': eff_get('MAX_POSITION_PERCENT', 0.08), # 单笔最大保证金占比 'MAX_TOTAL_POSITION_PERCENT': eff_get('MAX_TOTAL_POSITION_PERCENT', 0.40), # 总保证金占比上限 'MIN_POSITION_PERCENT': eff_get('MIN_POSITION_PERCENT', 0.02), # 最小保证金占比 'MIN_MARGIN_USDT': eff_get('MIN_MARGIN_USDT', 5.0), # 最小保证金(USDT) # 用户风险旋钮:自动交易开关/频次控制 'AUTO_TRADE_ENABLED': eff_get('AUTO_TRADE_ENABLED', True), 'MAX_OPEN_POSITIONS': eff_get('MAX_OPEN_POSITIONS', 3), 'MAX_DAILY_ENTRIES': eff_get('MAX_DAILY_ENTRIES', 8), # 涨跌幅阈值 'MIN_CHANGE_PERCENT': eff_get('MIN_CHANGE_PERCENT', 2.0), 'TOP_N_SYMBOLS': eff_get('TOP_N_SYMBOLS', 10), # 风险控制 'STOP_LOSS_PERCENT': eff_get('STOP_LOSS_PERCENT', 0.10), # 默认10% 'TAKE_PROFIT_PERCENT': eff_get('TAKE_PROFIT_PERCENT', 0.30), # 默认30%(盈亏比3:1) 'MIN_STOP_LOSS_PRICE_PCT': eff_get('MIN_STOP_LOSS_PRICE_PCT', 0.02), # 默认2% 'MIN_TAKE_PROFIT_PRICE_PCT': eff_get('MIN_TAKE_PROFIT_PRICE_PCT', 0.03), # 默认3% 'USE_ATR_STOP_LOSS': eff_get('USE_ATR_STOP_LOSS', True), # 是否使用ATR动态止损 'ATR_STOP_LOSS_MULTIPLIER': eff_get('ATR_STOP_LOSS_MULTIPLIER', 1.8), # ATR止损倍数(1.5-2倍) 'ATR_TAKE_PROFIT_MULTIPLIER': eff_get('ATR_TAKE_PROFIT_MULTIPLIER', 3.0), # ATR止盈倍数(3倍ATR) 'RISK_REWARD_RATIO': eff_get('RISK_REWARD_RATIO', 3.0), # 盈亏比(止损距离的倍数) 'ATR_PERIOD': eff_get('ATR_PERIOD', 14), # ATR计算周期 'USE_DYNAMIC_ATR_MULTIPLIER': eff_get('USE_DYNAMIC_ATR_MULTIPLIER', False), # 是否根据波动率动态调整ATR倍数 'ATR_MULTIPLIER_MIN': eff_get('ATR_MULTIPLIER_MIN', 1.5), # 动态ATR倍数最小值 'ATR_MULTIPLIER_MAX': eff_get('ATR_MULTIPLIER_MAX', 2.5), # 动态ATR倍数最大值 # 市场扫描(1小时主周期) 'SCAN_INTERVAL': eff_get('SCAN_INTERVAL', 3600), # 1小时 'TOP_N_SYMBOLS': eff_get('TOP_N_SYMBOLS', 10), # 每次扫描后处理的交易对数量 'MAX_SCAN_SYMBOLS': eff_get('MAX_SCAN_SYMBOLS', 500), # 扫描的最大交易对数量(0表示扫描所有) 'KLINE_INTERVAL': eff_get('KLINE_INTERVAL', '1h'), 'PRIMARY_INTERVAL': eff_get('PRIMARY_INTERVAL', '1h'), 'CONFIRM_INTERVAL': eff_get('CONFIRM_INTERVAL', '4h'), 'ENTRY_INTERVAL': eff_get('ENTRY_INTERVAL', '15m'), # 过滤条件 'MIN_VOLUME_24H': eff_get('MIN_VOLUME_24H', 10000000), 'MIN_VOLATILITY': eff_get('MIN_VOLATILITY', 0.02), # 高胜率策略参数 'MIN_SIGNAL_STRENGTH': eff_get('MIN_SIGNAL_STRENGTH', 5), 'LEVERAGE': eff_get('LEVERAGE', 10), 'USE_DYNAMIC_LEVERAGE': eff_get('USE_DYNAMIC_LEVERAGE', True), 'MAX_LEVERAGE': eff_get('MAX_LEVERAGE', 15), # 降低到15,更保守,配合更大的保证金 'USE_TRAILING_STOP': eff_get('USE_TRAILING_STOP', True), 'TRAILING_STOP_ACTIVATION': eff_get('TRAILING_STOP_ACTIVATION', 0.10), # 默认10%(给趋势更多空间) 'TRAILING_STOP_PROTECT': eff_get('TRAILING_STOP_PROTECT', 0.05), # 默认5%(保护更多利润) # 自动交易过滤(用于提升胜率/控频) # 说明:这两个 key 需要出现在 TRADING_CONFIG 中,否则 trading_system 在每次 reload_from_redis 后会丢失它们, # 导致始终按默认值拦截自动交易(用户在配置页怎么开都没用)。 'AUTO_TRADE_ONLY_TRENDING': eff_get('AUTO_TRADE_ONLY_TRENDING', True), 'AUTO_TRADE_ALLOW_4H_NEUTRAL': eff_get('AUTO_TRADE_ALLOW_4H_NEUTRAL', False), # 智能入场/限价偏移(部分逻辑会直接读取 TRADING_CONFIG) 'LIMIT_ORDER_OFFSET_PCT': eff_get('LIMIT_ORDER_OFFSET_PCT', 0.5), 'SMART_ENTRY_ENABLED': eff_get('SMART_ENTRY_ENABLED', False), 'SMART_ENTRY_STRONG_SIGNAL': eff_get('SMART_ENTRY_STRONG_SIGNAL', 8), 'ENTRY_SYMBOL_COOLDOWN_SEC': eff_get('ENTRY_SYMBOL_COOLDOWN_SEC', 120), 'ENTRY_TIMEOUT_SEC': eff_get('ENTRY_TIMEOUT_SEC', 180), 'ENTRY_STEP_WAIT_SEC': eff_get('ENTRY_STEP_WAIT_SEC', 15), 'ENTRY_CHASE_MAX_STEPS': eff_get('ENTRY_CHASE_MAX_STEPS', 4), 'ENTRY_MARKET_FALLBACK_AFTER_SEC': eff_get('ENTRY_MARKET_FALLBACK_AFTER_SEC', 45), 'ENTRY_CONFIRM_TIMEOUT_SEC': eff_get('ENTRY_CONFIRM_TIMEOUT_SEC', 30), 'ENTRY_MAX_DRIFT_PCT_TRENDING': eff_get('ENTRY_MAX_DRIFT_PCT_TRENDING', 0.6), 'ENTRY_MAX_DRIFT_PCT_RANGING': eff_get('ENTRY_MAX_DRIFT_PCT_RANGING', 0.3), } def _sync_to_redis(self): """将配置同步到Redis缓存(账号维度)""" if not self._redis_connected or not self._redis_client: return try: payload = {k: json.dumps(v) for k, v in self._cache.items()} self._redis_client.hset(self._redis_hash_key, mapping=payload) self._redis_client.expire(self._redis_hash_key, 3600) if self._legacy_hash_key: self._redis_client.hset(self._legacy_hash_key, mapping=payload) self._redis_client.expire(self._legacy_hash_key, 3600) except Exception as e: logger.debug(f"同步配置到Redis失败: {e}") # 全局配置管理器实例(默认账号;trading_system 进程可通过 ATS_ACCOUNT_ID 指定) try: _default_account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1) except Exception: _default_account_id = 1 config_manager = ConfigManager.for_account(_default_account_id) # 兼容原有config.py的接口 def get_config(key, default=None): """获取配置(兼容函数)""" return config_manager.get(key, default) def get_trading_config(): """获取交易配置(兼容函数)""" return config_manager.get_trading_config()