diff --git a/backend/api/routes/config.py b/backend/api/routes/config.py index 51e5efa..9f70e6d 100644 --- a/backend/api/routes/config.py +++ b/backend/api/routes/config.py @@ -381,24 +381,24 @@ async def update_config(key: str, item: ConfigUpdate): detail=f"{key} must be between 0 and 1 (0% to 100%)" ) - # 更新配置 + # 更新配置(会同时更新数据库和Redis缓存) TradingConfig.set(key, item.value, config_type, category, description) - # 清除配置缓存,确保立即生效 + # 更新config_manager的缓存(包括Redis) try: - from config_manager import ConfigManager - # 如果存在全局配置管理器实例,清除其缓存 import config_manager - if hasattr(config_manager, '_config_manager') and config_manager._config_manager: - config_manager._config_manager.reload() + if hasattr(config_manager, 'config_manager') and config_manager.config_manager: + # 调用set方法会同时更新数据库、Redis和本地缓存 + config_manager.config_manager.set(key, item.value, config_type, category, description) + logger.info(f"配置已更新到Redis缓存: {key} = {item.value}") except Exception as e: - logger.debug(f"清除配置缓存失败: {e}") + logger.warning(f"更新配置缓存失败: {e}") return { "message": "配置已更新", "key": key, "value": item.value, - "note": "交易系统将在下次扫描时自动使用新配置" + "note": "配置已同步到Redis,交易系统将立即使用新配置" } except HTTPException: raise diff --git a/backend/config_manager.py b/backend/config_manager.py index e0dc609..f643ee4 100644 --- a/backend/config_manager.py +++ b/backend/config_manager.py @@ -1,9 +1,12 @@ """ 配置管理器 - 从数据库读取配置,兼容原有config.py +支持Redis缓存,实现配置即时生效 """ import sys import os +import json from pathlib import Path +from typing import Optional, Any # 加载.env文件 try: @@ -42,22 +45,176 @@ import logging logger = logging.getLogger(__name__) +# 尝试导入同步Redis客户端(用于配置缓存) +try: + import redis + REDIS_SYNC_AVAILABLE = True +except ImportError: + REDIS_SYNC_AVAILABLE = False + redis = None + class ConfigManager: - """配置管理器 - 优先从数据库读取,回退到环境变量和默认值""" + """配置管理器 - 优先从Redis缓存读取,其次从数据库读取,回退到环境变量和默认值""" def __init__(self): self._cache = {} + self._redis_client: Optional[redis.Redis] = None + self._redis_connected = False + self._init_redis() self._load_from_db() + def _init_redis(self): + """初始化Redis客户端(同步)""" + if not REDIS_SYNC_AVAILABLE: + logger.debug("redis-py未安装,配置缓存将不使用Redis") + return + + try: + # 从环境变量或配置获取Redis连接信息 + redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379') + redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true' + redis_username = os.getenv('REDIS_USERNAME', None) + redis_password = os.getenv('REDIS_PASSWORD', None) + + # 解析Redis URL + if redis_url.startswith('rediss://') or redis_use_tls: + # TLS连接 + import ssl + ssl_context = ssl.create_default_context() + self._redis_client = redis.from_url( + redis_url, + username=redis_username, + password=redis_password, + ssl=ssl_context, + decode_responses=True + ) + else: + # 普通连接 + self._redis_client = redis.from_url( + redis_url, + username=redis_username, + password=redis_password, + decode_responses=True + ) + + # 测试连接 + self._redis_client.ping() + self._redis_connected = True + logger.info("✓ Redis配置缓存连接成功") + except Exception as e: + logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存") + self._redis_client = None + self._redis_connected = False + + def _get_from_redis(self, key: str) -> Optional[Any]: + """从Redis获取配置值""" + if not self._redis_connected or not self._redis_client: + return None + + try: + # 使用Hash存储所有配置,键为 trading_config:{key} + value = self._redis_client.hget('trading_config', key) + if value: + # 尝试解析JSON(如果是复杂类型) + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return value + except Exception as e: + logger.debug(f"从Redis获取配置失败 {key}: {e}") + # 连接失败时,尝试重新连接 + try: + self._redis_client.ping() + self._redis_connected = True + except: + self._redis_connected = False + + return None + + def _set_to_redis(self, key: str, value: Any): + """设置配置到Redis""" + if not self._redis_connected or not self._redis_client: + return False + + try: + # 使用Hash存储所有配置,键为 trading_config:{key} + # 将值序列化为JSON(如果是复杂类型) + if isinstance(value, (dict, list)): + value_str = json.dumps(value) + else: + value_str = str(value) + + self._redis_client.hset('trading_config', key, value_str) + # 设置整个Hash的过期时间为7天(配置不会频繁变化,但需要定期刷新) + self._redis_client.expire('trading_config', 7 * 24 * 3600) + return True + except Exception as e: + logger.debug(f"设置配置到Redis失败 {key}: {e}") + # 连接失败时,尝试重新连接 + try: + self._redis_client.ping() + self._redis_connected = True + # 重试一次 + if isinstance(value, (dict, list)): + value_str = json.dumps(value) + else: + value_str = str(value) + self._redis_client.hset('trading_config', key, value_str) + self._redis_client.expire('trading_config', 7 * 24 * 3600) + return True + except: + self._redis_connected = False + return False + + def _load_all_to_redis(self): + """将所有配置加载到Redis""" + if not self._redis_connected or not self._redis_client: + return + + try: + # 批量设置所有配置到Redis + pipe = self._redis_client.pipeline() + for key, value in self._cache.items(): + if isinstance(value, (dict, list)): + value_str = json.dumps(value) + else: + value_str = str(value) + pipe.hset('trading_config', key, value_str) + pipe.expire('trading_config', 7 * 24 * 3600) + pipe.execute() + logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis") + except Exception as e: + logger.debug(f"同步配置到Redis失败: {e}") + def _load_from_db(self): - """从数据库加载配置""" + """从数据库加载配置(如果Redis中没有)""" if TradingConfig is None: logger.warning("TradingConfig未导入,无法从数据库加载配置") self._cache = {} return try: + # 先尝试从Redis加载所有配置 + if self._redis_connected and self._redis_client: + try: + redis_configs = self._redis_client.hgetall('trading_config') + if redis_configs: + # 解析Redis中的配置 + for key, value_str in redis_configs.items(): + try: + # 尝试解析JSON + value = json.loads(value_str) + except (json.JSONDecodeError, TypeError): + # 如果不是JSON,尝试转换类型 + value = value_str + self._cache[key] = value + logger.info(f"从Redis加载了 {len(self._cache)} 个配置项") + return + except Exception as e: + logger.debug(f"从Redis加载配置失败: {e},回退到数据库") + + # 从数据库加载配置 configs = TradingConfig.get_all() for config in configs: key = config['config_key'] @@ -66,45 +223,85 @@ class ConfigManager: config['config_type'] ) self._cache[key] = value - logger.info(f"从数据库加载了 {len(self._cache)} 个配置项") + # 同时写入Redis缓存 + self._set_to_redis(key, value) + + logger.info(f"从数据库加载了 {len(self._cache)} 个配置项,已同步到Redis") except Exception as e: logger.warning(f"从数据库加载配置失败,使用默认配置: {e}") self._cache = {} def get(self, key, default=None): """获取配置值""" - # 1. 优先从数据库缓存读取 + # 1. 优先从Redis缓存读取(最新) + redis_value = self._get_from_redis(key) + if redis_value is not None: + # 同时更新本地缓存 + self._cache[key] = redis_value + return redis_value + + # 2. 从本地缓存读取 if key in self._cache: return self._cache[key] - # 2. 从环境变量读取 + # 3. 从环境变量读取 env_value = os.getenv(key) if env_value is not None: return env_value - # 3. 返回默认值 + # 4. 返回默认值 return default def set(self, key, value, config_type='string', category='general', description=None): - """设置配置(同时更新数据库和缓存)""" + """设置配置(同时更新数据库、Redis缓存和本地缓存)""" if TradingConfig is None: logger.warning("TradingConfig未导入,无法更新数据库配置") self._cache[key] = value + # 仍然尝试更新Redis + self._set_to_redis(key, value) return try: + # 1. 更新数据库 TradingConfig.set(key, value, config_type, category, description) + + # 2. 更新本地缓存 self._cache[key] = value - logger.info(f"配置已更新: {key} = {value}") + + # 3. 更新Redis缓存(确保trading_system能立即读取到最新配置) + self._set_to_redis(key, value) + + logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)") except Exception as e: logger.error(f"更新配置失败: {e}") raise def reload(self): - """重新加载配置""" + """重新加载配置(优先从Redis,其次从数据库)""" self._cache = {} self._load_from_db() + def reload_from_redis(self): + """强制从Redis重新加载配置(用于trading_system即时获取最新配置)""" + self._cache = {} + if self._redis_connected and self._redis_client: + try: + redis_configs = self._redis_client.hgetall('trading_config') + if redis_configs: + for key, value_str in redis_configs.items(): + try: + value = json.loads(value_str) + except (json.JSONDecodeError, TypeError): + value = value_str + self._cache[key] = value + logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项") + return + except Exception as e: + logger.debug(f"从Redis重新加载配置失败: {e}") + + # 如果Redis失败,回退到数据库 + self._load_from_db() + def get_trading_config(self): """获取交易配置字典(兼容原有config.py的TRADING_CONFIG)""" return { diff --git a/trading_system/config.py b/trading_system/config.py index 6270bf9..c9805d6 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -165,9 +165,16 @@ def _get_config_value(key, default=None): return default def _get_trading_config(): +<<<<<<< Current (Your changes) """获取交易配置(支持动态重载)""" if _config_manager: _config_manager.reload() # 每次获取配置时重新加载 +======= + """获取交易配置(支持动态重载,优先从Redis读取最新值)""" + if _config_manager: + # 从Redis重新加载配置(轻量级,只从Redis读取,不查数据库) + _config_manager.reload_from_redis() +>>>>>>> Incoming (Background Agent changes) return _config_manager.get_trading_config() # 回退到默认配置 return { diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 6188eec..f2dc6df 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -155,6 +155,16 @@ class PositionManager: logger.debug(f"获取K线数据失败,使用固定止损: {e}") klines = None + # 在开仓前从Redis重新加载配置,确保使用最新配置(包括ATR参数) + # 从Redis读取最新配置(轻量级,即时生效) + try: + if config._config_manager: + config._config_manager.reload_from_redis() + config.TRADING_CONFIG = config._get_trading_config() + logger.debug(f"{symbol} 开仓前已从Redis重新加载配置") + except Exception as e: + logger.debug(f"从Redis重新加载配置失败: {e}") + # 使用基于保证金的止损止盈(结合技术分析) # 计算保证金和仓位价值 position_value = entry_price * quantity @@ -769,6 +779,14 @@ class PositionManager: position_info['maxProfit'] = pnl_percent_margin # 移动止损逻辑(盈利后保护利润,基于保证金) + # 每次检查时从Redis重新加载配置,确保配置修改能即时生效 + try: + if config._config_manager: + config._config_manager.reload_from_redis() + config.TRADING_CONFIG = config._get_trading_config() + except Exception as e: + logger.debug(f"从Redis重新加载配置失败: {e}") + use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True) if use_trailing: trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金 @@ -1781,6 +1799,14 @@ class PositionManager: position_info['maxProfit'] = pnl_percent_margin # 移动止损逻辑(盈利后保护利润,基于保证金) + # 每次检查时从Redis重新加载配置,确保配置修改能即时生效 + try: + if config._config_manager: + config._config_manager.reload_from_redis() + config.TRADING_CONFIG = config._get_trading_config() + except Exception as e: + logger.debug(f"从Redis重新加载配置失败: {e}") + use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True) if use_trailing: trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金 diff --git a/trading_system/strategy.py b/trading_system/strategy.py index 67bbc31..766646e 100644 --- a/trading_system/strategy.py +++ b/trading_system/strategy.py @@ -71,6 +71,16 @@ class TradingStrategy: try: while self.running: + # 0. 定期从Redis重新加载配置(确保配置修改能即时生效) + # 每次循环开始时从Redis重新加载,确保使用最新配置 + try: + if config._config_manager: + config._config_manager.reload_from_redis() + config.TRADING_CONFIG = config._get_trading_config() + logger.debug("配置已从Redis重新加载") + except Exception as e: + logger.warning(f"从Redis重新加载配置失败: {e}") + # 1. 扫描市场,找出涨跌幅最大的前N个货币对 top_symbols = await self.scanner.scan_market()