455 lines
19 KiB
Python
455 lines
19 KiB
Python
"""
|
||
配置管理器 - 从数据库读取配置,兼容原有config.py
|
||
支持Redis缓存,实现配置即时生效
|
||
"""
|
||
import sys
|
||
import os
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Optional, Any
|
||
|
||
# 加载.env文件
|
||
try:
|
||
from dotenv import load_dotenv
|
||
backend_dir = Path(__file__).parent
|
||
project_root = backend_dir.parent
|
||
env_files = [
|
||
backend_dir / '.env',
|
||
project_root / '.env',
|
||
]
|
||
for env_file in env_files:
|
||
if env_file.exists():
|
||
load_dotenv(env_file, override=True)
|
||
break
|
||
else:
|
||
load_dotenv(project_root / '.env', override=False)
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 添加项目根目录到路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
# 延迟导入,避免在trading_system中导入时因为缺少依赖而失败
|
||
try:
|
||
from database.models import TradingConfig
|
||
except ImportError as e:
|
||
TradingConfig = None
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning(f"无法导入TradingConfig: {e},配置管理器将无法使用数据库")
|
||
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试导入同步Redis客户端(用于配置缓存)
|
||
try:
|
||
import redis
|
||
REDIS_SYNC_AVAILABLE = True
|
||
except ImportError:
|
||
REDIS_SYNC_AVAILABLE = False
|
||
redis = None
|
||
|
||
|
||
class ConfigManager:
|
||
"""配置管理器 - 优先从Redis缓存读取,其次从数据库读取,回退到环境变量和默认值"""
|
||
|
||
def __init__(self):
|
||
self._cache = {}
|
||
self._redis_client: Optional[redis.Redis] = None
|
||
self._redis_connected = False
|
||
self._init_redis()
|
||
self._load_from_db()
|
||
|
||
def _init_redis(self):
|
||
"""初始化Redis客户端(同步)"""
|
||
if not REDIS_SYNC_AVAILABLE:
|
||
logger.debug("redis-py未安装,配置缓存将不使用Redis")
|
||
return
|
||
|
||
try:
|
||
# 从环境变量或配置获取Redis连接信息
|
||
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
|
||
redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'
|
||
redis_username = os.getenv('REDIS_USERNAME', None)
|
||
redis_password = os.getenv('REDIS_PASSWORD', None)
|
||
|
||
# 验证URL格式
|
||
if not redis_url or not isinstance(redis_url, str):
|
||
logger.warning(f"Redis URL无效: {redis_url},使用默认值")
|
||
redis_url = 'redis://localhost:6379'
|
||
|
||
# 如果使用TLS但URL不是rediss://,自动转换
|
||
if redis_use_tls and not redis_url.startswith('rediss://'):
|
||
if redis_url.startswith('redis://'):
|
||
redis_url = redis_url.replace('redis://', 'rediss://', 1)
|
||
else:
|
||
# 如果URL格式不正确,构建新的URL
|
||
logger.warning(f"Redis URL格式不正确且需要TLS: {redis_url},尝试构建rediss://URL")
|
||
# 尝试从URL中提取主机和端口
|
||
if '://' in redis_url:
|
||
parts = redis_url.split('://', 1)
|
||
host_port = parts[1].split('/')[0] if '/' in parts[1] else parts[1]
|
||
redis_url = f"rediss://{host_port}"
|
||
else:
|
||
# 如果完全没有scheme,添加rediss://
|
||
redis_url = f"rediss://{redis_url}"
|
||
logger.info(f"已自动转换Redis URL为TLS格式: {redis_url}")
|
||
|
||
# 解析Redis URL
|
||
<<<<<<< Current (Your changes)
|
||
if redis_url.startswith('rediss://') or redis_use_tls:
|
||
# TLS连接
|
||
import ssl
|
||
ssl_context = ssl.create_default_context()
|
||
self._redis_client = redis.from_url(
|
||
redis_url,
|
||
username=redis_username,
|
||
password=redis_password,
|
||
ssl=ssl_context,
|
||
decode_responses=True
|
||
)
|
||
else:
|
||
# 普通连接
|
||
self._redis_client = redis.from_url(
|
||
redis_url,
|
||
username=redis_username,
|
||
password=redis_password,
|
||
decode_responses=True
|
||
)
|
||
=======
|
||
# redis-py的同步客户端也支持通过ssl_cert_reqs等参数配置TLS
|
||
# 当URL是rediss://时,会自动启用SSL
|
||
connection_kwargs = {
|
||
'username': redis_username,
|
||
'password': redis_password,
|
||
'decode_responses': True
|
||
}
|
||
|
||
if redis_url.startswith('rediss://') or redis_use_tls:
|
||
# TLS连接 - 使用redis-py支持的SSL参数
|
||
# 从环境变量获取SSL配置(如果未设置,使用默认值)
|
||
ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required')
|
||
ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None)
|
||
|
||
# 设置SSL参数
|
||
connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
|
||
if ssl_ca_certs:
|
||
connection_kwargs['ssl_ca_certs'] = ssl_ca_certs
|
||
|
||
# 根据ssl_cert_reqs设置主机名验证
|
||
if ssl_cert_reqs == 'none':
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
elif ssl_cert_reqs == 'required':
|
||
connection_kwargs['ssl_check_hostname'] = True
|
||
else: # optional
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
|
||
logger.info(f"使用 TLS 连接 Redis: {redis_url} (ssl_cert_reqs={ssl_cert_reqs})")
|
||
|
||
# 创建Redis客户端(同步)
|
||
self._redis_client = redis.from_url(
|
||
redis_url,
|
||
**connection_kwargs
|
||
)
|
||
>>>>>>> Incoming (Background Agent changes)
|
||
|
||
# 测试连接
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
logger.info("✓ Redis配置缓存连接成功")
|
||
except Exception as e:
|
||
logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存")
|
||
self._redis_client = None
|
||
self._redis_connected = False
|
||
|
||
def _get_from_redis(self, key: str) -> Optional[Any]:
|
||
"""从Redis获取配置值"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
# 使用Hash存储所有配置,键为 trading_config:{key}
|
||
value = self._redis_client.hget('trading_config', key)
|
||
if value:
|
||
# 尝试解析JSON(如果是复杂类型)
|
||
try:
|
||
return json.loads(value)
|
||
except (json.JSONDecodeError, TypeError):
|
||
return value
|
||
except Exception as e:
|
||
logger.debug(f"从Redis获取配置失败 {key}: {e}")
|
||
# 连接失败时,尝试重新连接
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
return None
|
||
|
||
def _set_to_redis(self, key: str, value: Any):
|
||
"""设置配置到Redis"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
# 使用Hash存储所有配置,键为 trading_config:{key}
|
||
# 将值序列化为JSON(如果是复杂类型)
|
||
if isinstance(value, (dict, list)):
|
||
value_str = json.dumps(value)
|
||
else:
|
||
value_str = str(value)
|
||
|
||
self._redis_client.hset('trading_config', key, value_str)
|
||
# 设置整个Hash的过期时间为7天(配置不会频繁变化,但需要定期刷新)
|
||
self._redis_client.expire('trading_config', 7 * 24 * 3600)
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"设置配置到Redis失败 {key}: {e}")
|
||
# 连接失败时,尝试重新连接
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
# 重试一次
|
||
if isinstance(value, (dict, list)):
|
||
value_str = json.dumps(value)
|
||
else:
|
||
value_str = str(value)
|
||
self._redis_client.hset('trading_config', key, value_str)
|
||
self._redis_client.expire('trading_config', 7 * 24 * 3600)
|
||
return True
|
||
except:
|
||
self._redis_connected = False
|
||
return False
|
||
|
||
def _load_all_to_redis(self):
|
||
"""将所有配置加载到Redis"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return
|
||
|
||
try:
|
||
# 批量设置所有配置到Redis
|
||
pipe = self._redis_client.pipeline()
|
||
for key, value in self._cache.items():
|
||
if isinstance(value, (dict, list)):
|
||
value_str = json.dumps(value)
|
||
else:
|
||
value_str = str(value)
|
||
pipe.hset('trading_config', key, value_str)
|
||
pipe.expire('trading_config', 7 * 24 * 3600)
|
||
pipe.execute()
|
||
logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis")
|
||
except Exception as e:
|
||
logger.debug(f"同步配置到Redis失败: {e}")
|
||
|
||
def _load_from_db(self):
|
||
"""从数据库加载配置(如果Redis中没有)"""
|
||
if TradingConfig is None:
|
||
logger.warning("TradingConfig未导入,无法从数据库加载配置")
|
||
self._cache = {}
|
||
return
|
||
|
||
try:
|
||
# 先尝试从Redis加载所有配置
|
||
if self._redis_connected and self._redis_client:
|
||
try:
|
||
# 测试连接是否真正可用
|
||
self._redis_client.ping()
|
||
redis_configs = self._redis_client.hgetall('trading_config')
|
||
if redis_configs and len(redis_configs) > 0:
|
||
# 解析Redis中的配置
|
||
for key, value_str in redis_configs.items():
|
||
try:
|
||
# 尝试解析JSON
|
||
value = json.loads(value_str)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果不是JSON,尝试转换类型
|
||
value = value_str
|
||
self._cache[key] = value
|
||
logger.info(f"从Redis加载了 {len(self._cache)} 个配置项")
|
||
return
|
||
else:
|
||
# Redis中没有配置,需要从数据库加载并同步到Redis
|
||
logger.debug("Redis中没有配置数据,从数据库加载并同步到Redis")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis加载配置失败: {e},回退到数据库")
|
||
# 连接失败时,标记为未连接
|
||
try:
|
||
self._redis_client.ping()
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
# 从数据库加载配置(仅在Redis不可用或Redis中没有数据时)
|
||
configs = TradingConfig.get_all()
|
||
for config in configs:
|
||
key = config['config_key']
|
||
value = TradingConfig._convert_value(
|
||
config['config_value'],
|
||
config['config_type']
|
||
)
|
||
self._cache[key] = value
|
||
# 同时写入Redis缓存
|
||
self._set_to_redis(key, value)
|
||
|
||
logger.info(f"从数据库加载了 {len(self._cache)} 个配置项,已同步到Redis")
|
||
except Exception as e:
|
||
logger.warning(f"从数据库加载配置失败,使用默认配置: {e}")
|
||
self._cache = {}
|
||
|
||
def get(self, key, default=None):
|
||
"""获取配置值"""
|
||
# 1. 优先从Redis缓存读取(最新)
|
||
# 注意:只在Redis连接正常时尝试读取,避免频繁连接失败
|
||
if self._redis_connected and self._redis_client:
|
||
redis_value = self._get_from_redis(key)
|
||
if redis_value is not None:
|
||
# 同时更新本地缓存
|
||
self._cache[key] = redis_value
|
||
return redis_value
|
||
|
||
# 2. 从本地缓存读取
|
||
if key in self._cache:
|
||
return self._cache[key]
|
||
|
||
# 3. 从环境变量读取
|
||
env_value = os.getenv(key)
|
||
if env_value is not None:
|
||
return env_value
|
||
|
||
# 4. 返回默认值
|
||
return default
|
||
|
||
def set(self, key, value, config_type='string', category='general', description=None):
|
||
"""设置配置(同时更新数据库、Redis缓存和本地缓存)"""
|
||
if TradingConfig is None:
|
||
logger.warning("TradingConfig未导入,无法更新数据库配置")
|
||
self._cache[key] = value
|
||
# 仍然尝试更新Redis
|
||
self._set_to_redis(key, value)
|
||
return
|
||
|
||
try:
|
||
# 1. 更新数据库
|
||
TradingConfig.set(key, value, config_type, category, description)
|
||
|
||
# 2. 更新本地缓存
|
||
self._cache[key] = value
|
||
|
||
# 3. 更新Redis缓存(确保trading_system能立即读取到最新配置)
|
||
self._set_to_redis(key, value)
|
||
|
||
logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)")
|
||
except Exception as e:
|
||
logger.error(f"更新配置失败: {e}")
|
||
raise
|
||
|
||
def reload(self):
|
||
"""重新加载配置(优先从Redis,其次从数据库)"""
|
||
self._cache = {}
|
||
self._load_from_db()
|
||
|
||
def reload_from_redis(self):
|
||
"""强制从Redis重新加载配置(用于trading_system即时获取最新配置)"""
|
||
# 如果Redis不可用,不重新加载,保持现有缓存
|
||
if not self._redis_connected or not self._redis_client:
|
||
logger.debug("Redis未连接,跳过从Redis重新加载,保持现有缓存")
|
||
return
|
||
|
||
try:
|
||
# 测试连接是否真正可用
|
||
self._redis_client.ping()
|
||
except Exception as e:
|
||
logger.debug(f"Redis连接不可用: {e},跳过从Redis重新加载")
|
||
self._redis_connected = False
|
||
return
|
||
|
||
try:
|
||
redis_configs = self._redis_client.hgetall('trading_config')
|
||
if redis_configs and len(redis_configs) > 0:
|
||
self._cache = {} # 清空缓存
|
||
for key, value_str in redis_configs.items():
|
||
try:
|
||
value = json.loads(value_str)
|
||
except (json.JSONDecodeError, TypeError):
|
||
value = value_str
|
||
self._cache[key] = value
|
||
logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项")
|
||
else:
|
||
# Redis中没有配置,但不回退到数据库(避免频繁从数据库加载)
|
||
logger.debug("Redis中没有配置数据,保持现有缓存")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载配置失败: {e},保持现有缓存")
|
||
# 连接失败时,标记为未连接
|
||
try:
|
||
self._redis_client.ping()
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
def get_trading_config(self):
|
||
"""获取交易配置字典(兼容原有config.py的TRADING_CONFIG)"""
|
||
return {
|
||
# 仓位控制
|
||
'MAX_POSITION_PERCENT': self.get('MAX_POSITION_PERCENT', 0.08), # 提高单笔仓位到8%
|
||
'MAX_TOTAL_POSITION_PERCENT': self.get('MAX_TOTAL_POSITION_PERCENT', 0.40), # 提高总仓位到40%
|
||
'MIN_POSITION_PERCENT': self.get('MIN_POSITION_PERCENT', 0.02), # 提高最小仓位到2%
|
||
'MIN_MARGIN_USDT': self.get('MIN_MARGIN_USDT', 5.0), # 提高最小保证金到5美元
|
||
|
||
# 涨跌幅阈值
|
||
'MIN_CHANGE_PERCENT': self.get('MIN_CHANGE_PERCENT', 2.0),
|
||
'TOP_N_SYMBOLS': self.get('TOP_N_SYMBOLS', 10),
|
||
|
||
# 风险控制
|
||
'STOP_LOSS_PERCENT': self.get('STOP_LOSS_PERCENT', 0.10), # 默认10%
|
||
'TAKE_PROFIT_PERCENT': self.get('TAKE_PROFIT_PERCENT', 0.30), # 默认30%(盈亏比3:1)
|
||
'MIN_STOP_LOSS_PRICE_PCT': self.get('MIN_STOP_LOSS_PRICE_PCT', 0.02), # 默认2%
|
||
'MIN_TAKE_PROFIT_PRICE_PCT': self.get('MIN_TAKE_PROFIT_PRICE_PCT', 0.03), # 默认3%
|
||
'USE_ATR_STOP_LOSS': self.get('USE_ATR_STOP_LOSS', True), # 是否使用ATR动态止损
|
||
'ATR_STOP_LOSS_MULTIPLIER': self.get('ATR_STOP_LOSS_MULTIPLIER', 1.8), # ATR止损倍数(1.5-2倍)
|
||
'ATR_TAKE_PROFIT_MULTIPLIER': self.get('ATR_TAKE_PROFIT_MULTIPLIER', 3.0), # ATR止盈倍数(3倍ATR)
|
||
'RISK_REWARD_RATIO': self.get('RISK_REWARD_RATIO', 3.0), # 盈亏比(止损距离的倍数)
|
||
'ATR_PERIOD': self.get('ATR_PERIOD', 14), # ATR计算周期
|
||
'USE_DYNAMIC_ATR_MULTIPLIER': self.get('USE_DYNAMIC_ATR_MULTIPLIER', False), # 是否根据波动率动态调整ATR倍数
|
||
'ATR_MULTIPLIER_MIN': self.get('ATR_MULTIPLIER_MIN', 1.5), # 动态ATR倍数最小值
|
||
'ATR_MULTIPLIER_MAX': self.get('ATR_MULTIPLIER_MAX', 2.5), # 动态ATR倍数最大值
|
||
|
||
# 市场扫描(1小时主周期)
|
||
'SCAN_INTERVAL': self.get('SCAN_INTERVAL', 3600), # 1小时
|
||
'TOP_N_SYMBOLS': self.get('TOP_N_SYMBOLS', 10), # 每次扫描后处理的交易对数量
|
||
'MAX_SCAN_SYMBOLS': self.get('MAX_SCAN_SYMBOLS', 500), # 扫描的最大交易对数量(0表示扫描所有)
|
||
'KLINE_INTERVAL': self.get('KLINE_INTERVAL', '1h'),
|
||
'PRIMARY_INTERVAL': self.get('PRIMARY_INTERVAL', '1h'),
|
||
'CONFIRM_INTERVAL': self.get('CONFIRM_INTERVAL', '4h'),
|
||
'ENTRY_INTERVAL': self.get('ENTRY_INTERVAL', '15m'),
|
||
|
||
# 过滤条件
|
||
'MIN_VOLUME_24H': self.get('MIN_VOLUME_24H', 10000000),
|
||
'MIN_VOLATILITY': self.get('MIN_VOLATILITY', 0.02),
|
||
|
||
# 高胜率策略参数
|
||
'MIN_SIGNAL_STRENGTH': self.get('MIN_SIGNAL_STRENGTH', 5),
|
||
'LEVERAGE': self.get('LEVERAGE', 10),
|
||
'USE_DYNAMIC_LEVERAGE': self.get('USE_DYNAMIC_LEVERAGE', True),
|
||
'MAX_LEVERAGE': self.get('MAX_LEVERAGE', 15), # 降低到15,更保守,配合更大的保证金
|
||
'USE_TRAILING_STOP': self.get('USE_TRAILING_STOP', True),
|
||
'TRAILING_STOP_ACTIVATION': self.get('TRAILING_STOP_ACTIVATION', 0.10), # 默认10%(给趋势更多空间)
|
||
'TRAILING_STOP_PROTECT': self.get('TRAILING_STOP_PROTECT', 0.05), # 默认5%(保护更多利润)
|
||
|
||
}
|
||
|
||
|
||
# 全局配置管理器实例
|
||
config_manager = ConfigManager()
|
||
|
||
# 兼容原有config.py的接口
|
||
def get_config(key, default=None):
|
||
"""获取配置(兼容函数)"""
|
||
return config_manager.get(key, default)
|
||
|
||
def get_trading_config():
|
||
"""获取交易配置(兼容函数)"""
|
||
return config_manager.get_trading_config()
|