923 lines
43 KiB
Python
923 lines
43 KiB
Python
"""
|
||
配置管理器 - 从数据库读取配置,兼容原有config.py
|
||
支持Redis缓存,实现配置即时生效
|
||
"""
|
||
import sys
|
||
import os
|
||
import json
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Optional, Any
|
||
|
||
# 加载.env文件
|
||
try:
|
||
from dotenv import load_dotenv
|
||
backend_dir = Path(__file__).parent
|
||
project_root = backend_dir.parent
|
||
env_files = [
|
||
backend_dir / '.env',
|
||
project_root / '.env',
|
||
]
|
||
for env_file in env_files:
|
||
if env_file.exists():
|
||
load_dotenv(env_file, override=True)
|
||
break
|
||
else:
|
||
load_dotenv(project_root / '.env', override=False)
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 添加项目根目录到路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
# 延迟导入,避免在trading_system中导入时因为缺少依赖而失败
|
||
try:
|
||
from database.models import TradingConfig, Account
|
||
except ImportError as e:
|
||
TradingConfig = None
|
||
Account = None
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning(f"无法导入TradingConfig: {e},配置管理器将无法使用数据库")
|
||
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 平台兜底:策略核心使用全局配置表(global_strategy_config),普通用户账号只允许调整“风险旋钮”
|
||
# - 风险旋钮:每个账号独立(仓位/频次等)
|
||
# - 其它策略参数:统一从全局配置表读取,避免每个用户乱改导致策略不可控
|
||
# 注意:不再依赖account_id=1,全局配置存储在独立的global_strategy_config表中
|
||
|
||
RISK_KNOBS_KEYS = {
|
||
"MIN_MARGIN_USDT",
|
||
"MIN_POSITION_PERCENT",
|
||
"MAX_POSITION_PERCENT",
|
||
"MAX_TOTAL_POSITION_PERCENT",
|
||
"AUTO_TRADE_ENABLED",
|
||
"MAX_OPEN_POSITIONS",
|
||
"MAX_DAILY_ENTRIES",
|
||
}
|
||
|
||
# 尝试导入同步Redis客户端(用于配置缓存)
|
||
try:
|
||
import redis
|
||
REDIS_SYNC_AVAILABLE = True
|
||
except ImportError:
|
||
REDIS_SYNC_AVAILABLE = False
|
||
redis = None
|
||
|
||
|
||
class GlobalStrategyConfigManager:
|
||
"""全局策略配置管理器(独立于账户,管理员专用)"""
|
||
|
||
_instance = None
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if hasattr(self, '_initialized'):
|
||
return
|
||
self._initialized = True
|
||
self._cache = {}
|
||
self._redis_client: Optional[redis.Redis] = None
|
||
self._redis_connected = False
|
||
self._redis_hash_key = "global_strategy_config" # 独立的Redis键
|
||
self._init_redis()
|
||
self._load_from_db()
|
||
|
||
def _init_redis(self):
|
||
"""初始化Redis客户端(同步)"""
|
||
if not REDIS_SYNC_AVAILABLE:
|
||
logger.debug("redis-py未安装,全局配置缓存将不使用Redis")
|
||
return
|
||
|
||
try:
|
||
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
|
||
redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'
|
||
redis_username = os.getenv('REDIS_USERNAME', None)
|
||
redis_password = os.getenv('REDIS_PASSWORD', None)
|
||
|
||
if not redis_url or not isinstance(redis_url, str):
|
||
redis_url = 'redis://localhost:6379'
|
||
|
||
if redis_use_tls and not redis_url.startswith('rediss://'):
|
||
if redis_url.startswith('redis://'):
|
||
redis_url = redis_url.replace('redis://', 'rediss://', 1)
|
||
|
||
connection_kwargs = {
|
||
'username': redis_username,
|
||
'password': redis_password,
|
||
'decode_responses': True
|
||
}
|
||
|
||
if redis_url.startswith('rediss://') or redis_use_tls:
|
||
ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required')
|
||
ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None)
|
||
connection_kwargs['select'] = int(os.getenv('REDIS_SELECT', 0))
|
||
connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
|
||
if ssl_ca_certs:
|
||
connection_kwargs['ssl_ca_certs'] = ssl_ca_certs
|
||
if ssl_cert_reqs == 'none':
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
elif ssl_cert_reqs == 'required':
|
||
connection_kwargs['ssl_check_hostname'] = True
|
||
else:
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
|
||
self._redis_client = redis.from_url(redis_url, **connection_kwargs)
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
logger.info("✓ 全局策略配置Redis缓存连接成功")
|
||
except Exception as e:
|
||
logger.debug(f"全局策略配置Redis缓存连接失败: {e},将使用数据库缓存")
|
||
self._redis_client = None
|
||
self._redis_connected = False
|
||
|
||
def _get_from_redis(self, key: str) -> Optional[Any]:
|
||
"""从Redis获取全局配置值"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
value = self._redis_client.hget(self._redis_hash_key, key)
|
||
if value is not None and value != '':
|
||
return ConfigManager._coerce_redis_value(value)
|
||
except Exception as e:
|
||
logger.debug(f"从Redis获取全局配置失败 {key}: {e}")
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
return None
|
||
|
||
def _set_to_redis(self, key: str, value: Any):
|
||
"""设置全局配置到Redis"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
if isinstance(value, (dict, list, bool, int, float)):
|
||
value_str = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
value_str = str(value)
|
||
|
||
self._redis_client.hset(self._redis_hash_key, key, value_str)
|
||
self._redis_client.expire(self._redis_hash_key, 3600)
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"设置全局配置到Redis失败 {key}: {e}")
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
except:
|
||
self._redis_connected = False
|
||
return False
|
||
|
||
def _load_from_db(self):
|
||
"""从数据库加载全局配置"""
|
||
try:
|
||
from database.models import GlobalStrategyConfig
|
||
except ImportError:
|
||
logger.warning("GlobalStrategyConfig未导入,无法从数据库加载全局配置")
|
||
self._cache = {}
|
||
return
|
||
|
||
try:
|
||
# 先尝试从Redis加载
|
||
if self._redis_connected and self._redis_client:
|
||
try:
|
||
self._redis_client.ping()
|
||
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
|
||
if redis_configs and len(redis_configs) > 0:
|
||
for key, value_str in redis_configs.items():
|
||
self._cache[key] = ConfigManager._coerce_redis_value(value_str)
|
||
logger.info(f"从Redis加载了 {len(self._cache)} 个全局配置项")
|
||
return
|
||
except Exception as e:
|
||
logger.debug(f"从Redis加载全局配置失败: {e},回退到数据库")
|
||
try:
|
||
self._redis_client.ping()
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
# 从数据库加载
|
||
configs = GlobalStrategyConfig.get_all()
|
||
for config in configs:
|
||
key = config['config_key']
|
||
# 使用TradingConfig的转换方法(GlobalStrategyConfig复用)
|
||
from database.models import TradingConfig
|
||
value = TradingConfig._convert_value(
|
||
config['config_value'],
|
||
config['config_type']
|
||
)
|
||
self._cache[key] = value
|
||
self._set_to_redis(key, value)
|
||
|
||
logger.info(f"从数据库加载了 {len(self._cache)} 个全局配置项,已同步到Redis")
|
||
except Exception as e:
|
||
logger.warning(f"从数据库加载全局配置失败,使用默认配置: {e}")
|
||
self._cache = {}
|
||
|
||
def get(self, key: str, default: Any = None) -> Any:
|
||
"""获取全局配置值"""
|
||
# 1. 优先从Redis缓存读取
|
||
if self._redis_connected and self._redis_client:
|
||
redis_value = self._get_from_redis(key)
|
||
if redis_value is not None:
|
||
self._cache[key] = redis_value
|
||
return redis_value
|
||
|
||
# 2. 从本地缓存读取
|
||
if key in self._cache:
|
||
return self._cache[key]
|
||
|
||
# 3. 从数据库读取
|
||
try:
|
||
from database.models import GlobalStrategyConfig
|
||
db_value = GlobalStrategyConfig.get_value(key)
|
||
if db_value is not None:
|
||
self._cache[key] = db_value
|
||
self._set_to_redis(key, db_value)
|
||
return db_value
|
||
except Exception:
|
||
pass
|
||
|
||
# 4. 从环境变量读取
|
||
env_value = os.getenv(key)
|
||
if env_value is not None:
|
||
return env_value
|
||
|
||
# 5. 返回默认值
|
||
return default
|
||
|
||
def reload_from_redis(self):
|
||
"""强制从Redis重新加载全局配置"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return
|
||
|
||
try:
|
||
self._redis_client.ping()
|
||
except Exception as e:
|
||
logger.debug(f"Redis连接不可用: {e},跳过从Redis重新加载")
|
||
self._redis_connected = False
|
||
return
|
||
|
||
try:
|
||
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
|
||
if redis_configs and len(redis_configs) > 0:
|
||
self._cache = {}
|
||
for key, value_str in redis_configs.items():
|
||
self._cache[key] = ConfigManager._coerce_redis_value(value_str)
|
||
logger.debug(f"从Redis重新加载了 {len(self._cache)} 个全局配置项")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载全局配置失败: {e},保持现有缓存")
|
||
|
||
|
||
class ConfigManager:
|
||
"""配置管理器 - 优先从Redis缓存读取,其次从数据库读取,回退到环境变量和默认值"""
|
||
|
||
_instances = {}
|
||
|
||
def __init__(self, account_id: int = 1):
|
||
self.account_id = int(account_id or 1)
|
||
self._cache = {}
|
||
self._redis_client: Optional[redis.Redis] = None
|
||
self._redis_connected = False
|
||
self._redis_hash_key = f"trading_config:{self.account_id}"
|
||
self._legacy_hash_key = "trading_config" if self.account_id == 1 else None
|
||
self._init_redis()
|
||
self._load_from_db()
|
||
|
||
@classmethod
|
||
def for_account(cls, account_id: int):
|
||
aid = int(account_id or 1)
|
||
inst = cls._instances.get(aid)
|
||
if inst:
|
||
return inst
|
||
inst = cls(account_id=aid)
|
||
cls._instances[aid] = inst
|
||
return inst
|
||
|
||
def _init_redis(self):
|
||
"""初始化Redis客户端(同步)"""
|
||
if not REDIS_SYNC_AVAILABLE:
|
||
logger.debug("redis-py未安装,配置缓存将不使用Redis")
|
||
return
|
||
|
||
try:
|
||
# 从环境变量或配置获取Redis连接信息
|
||
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
|
||
redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'
|
||
redis_username = os.getenv('REDIS_USERNAME', None)
|
||
redis_password = os.getenv('REDIS_PASSWORD', None)
|
||
|
||
# 验证URL格式
|
||
if not redis_url or not isinstance(redis_url, str):
|
||
logger.warning(f"Redis URL无效: {redis_url},使用默认值")
|
||
redis_url = 'redis://localhost:6379'
|
||
|
||
# 如果使用TLS但URL不是rediss://,自动转换
|
||
if redis_use_tls and not redis_url.startswith('rediss://'):
|
||
if redis_url.startswith('redis://'):
|
||
redis_url = redis_url.replace('redis://', 'rediss://', 1)
|
||
else:
|
||
# 如果URL格式不正确,构建新的URL
|
||
logger.warning(f"Redis URL格式不正确且需要TLS: {redis_url},尝试构建rediss://URL")
|
||
# 尝试从URL中提取主机和端口
|
||
if '://' in redis_url:
|
||
parts = redis_url.split('://', 1)
|
||
host_port = parts[1].split('/')[0] if '/' in parts[1] else parts[1]
|
||
redis_url = f"rediss://{host_port}"
|
||
else:
|
||
# 如果完全没有scheme,添加rediss://
|
||
redis_url = f"rediss://{redis_url}"
|
||
logger.info(f"已自动转换Redis URL为TLS格式: {redis_url}")
|
||
|
||
# 解析Redis URL
|
||
# redis-py的同步客户端也支持通过ssl_cert_reqs等参数配置TLS
|
||
# 当URL是rediss://时,会自动启用SSL
|
||
connection_kwargs = {
|
||
'username': redis_username,
|
||
'password': redis_password,
|
||
'decode_responses': True
|
||
}
|
||
|
||
if redis_url.startswith('rediss://') or redis_use_tls:
|
||
# TLS连接 - 使用redis-py支持的SSL参数
|
||
# 从环境变量获取SSL配置(如果未设置,使用默认值)
|
||
ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required')
|
||
ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None)
|
||
|
||
connection_kwargs['select'] = os.getenv('REDIS_SELECT', 0)
|
||
if connection_kwargs['select'] is not None:
|
||
connection_kwargs['select'] = int(connection_kwargs['select'])
|
||
else:
|
||
connection_kwargs['select'] = 0
|
||
logger.info(f"使用 Redis 数据库: {connection_kwargs['select']}")
|
||
# 设置SSL参数
|
||
connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
|
||
if ssl_ca_certs:
|
||
connection_kwargs['ssl_ca_certs'] = ssl_ca_certs
|
||
|
||
# 根据ssl_cert_reqs设置主机名验证
|
||
if ssl_cert_reqs == 'none':
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
elif ssl_cert_reqs == 'required':
|
||
connection_kwargs['ssl_check_hostname'] = True
|
||
else: # optional
|
||
connection_kwargs['ssl_check_hostname'] = False
|
||
|
||
logger.info(f"使用 TLS 连接 Redis: {redis_url} (ssl_cert_reqs={ssl_cert_reqs})")
|
||
|
||
# 创建Redis客户端(同步)
|
||
self._redis_client = redis.from_url(
|
||
redis_url,
|
||
**connection_kwargs
|
||
)
|
||
|
||
# 测试连接
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
logger.info("✓ Redis配置缓存连接成功")
|
||
except Exception as e:
|
||
logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存")
|
||
self._redis_client = None
|
||
self._redis_connected = False
|
||
|
||
def _get_from_redis(self, key: str) -> Optional[Any]:
|
||
"""从Redis获取配置值"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
# 使用账号维度 Hash 存储所有配置
|
||
value = self._redis_client.hget(self._redis_hash_key, key)
|
||
if (value is None or value == '') and self._legacy_hash_key:
|
||
value = self._redis_client.hget(self._legacy_hash_key, key)
|
||
if value is not None and value != '':
|
||
return self._coerce_redis_value(value)
|
||
except Exception as e:
|
||
logger.debug(f"从Redis获取配置失败 {key}: {e}")
|
||
# 连接失败时,尝试重新连接
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def _coerce_redis_value(value_str: Any) -> Any:
|
||
"""
|
||
将 Redis Hash 中的字符串值尽量还原为正确类型。
|
||
|
||
兼容历史写法:
|
||
- 之前 bool 会被写成 "True"/"False"(非 JSON),导致读取后变成字符串,进而在逻辑判断里永远为 True。
|
||
- 现在会优先把 bool/int/float/dict/list 用 JSON 序列化写入(见 _set_to_redis)。
|
||
"""
|
||
if value_str is None:
|
||
return None
|
||
|
||
# redis-py decode_responses=True 时是 str;否则可能是 bytes
|
||
if isinstance(value_str, bytes):
|
||
try:
|
||
value_str = value_str.decode('utf-8', errors='ignore')
|
||
except Exception:
|
||
value_str = str(value_str)
|
||
|
||
s = str(value_str).strip()
|
||
if s == '':
|
||
return ''
|
||
|
||
# 1) JSON 优先(dict/list/bool/number 都能覆盖)
|
||
try:
|
||
return json.loads(s)
|
||
except Exception:
|
||
pass
|
||
|
||
sl = s.lower()
|
||
# 2) bool 兼容(历史 "True"/"False")
|
||
if sl in ('true', 'false', '1', '0', 'yes', 'no', 'on', 'off'):
|
||
return sl in ('true', '1', 'yes', 'on')
|
||
|
||
# 3) number 兼容(避免把 "1h" 之类误判)
|
||
# int: -?\d+
|
||
if re.fullmatch(r'-?\d+', s):
|
||
try:
|
||
return int(s)
|
||
except Exception:
|
||
return s
|
||
# float: -?\d+\.\d+
|
||
if re.fullmatch(r'-?\d+\.\d+', s):
|
||
try:
|
||
return float(s)
|
||
except Exception:
|
||
return s
|
||
|
||
return s
|
||
|
||
def _set_to_redis(self, key: str, value: Any):
|
||
"""设置配置到Redis(账号维度 + legacy兼容)"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
# 将值序列化:复杂类型/基础类型使用 JSON,避免 bool 被写成 "False" 字符串后逻辑误判
|
||
if isinstance(value, (dict, list, bool, int, float)):
|
||
value_str = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
value_str = str(value)
|
||
|
||
self._redis_client.hset(self._redis_hash_key, key, value_str)
|
||
self._redis_client.expire(self._redis_hash_key, 3600)
|
||
if self._legacy_hash_key:
|
||
self._redis_client.hset(self._legacy_hash_key, key, value_str)
|
||
self._redis_client.expire(self._legacy_hash_key, 3600)
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"设置配置到Redis失败 {key}: {e}")
|
||
# 连接失败时,尝试重新连接
|
||
try:
|
||
self._redis_client.ping()
|
||
self._redis_connected = True
|
||
# 重试一次
|
||
if isinstance(value, (dict, list, bool, int, float)):
|
||
value_str = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
value_str = str(value)
|
||
self._redis_client.hset(self._redis_hash_key, key, value_str)
|
||
self._redis_client.expire(self._redis_hash_key, 3600)
|
||
if self._legacy_hash_key:
|
||
self._redis_client.hset(self._legacy_hash_key, key, value_str)
|
||
self._redis_client.expire(self._legacy_hash_key, 3600)
|
||
return True
|
||
except:
|
||
self._redis_connected = False
|
||
return False
|
||
|
||
def _load_all_to_redis(self):
|
||
"""将所有配置加载到Redis"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return
|
||
|
||
try:
|
||
# 批量设置所有配置到Redis(账号维度)
|
||
pipe = self._redis_client.pipeline()
|
||
for key, value in self._cache.items():
|
||
if isinstance(value, (dict, list, bool, int, float)):
|
||
value_str = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
value_str = str(value)
|
||
pipe.hset(self._redis_hash_key, key, value_str)
|
||
pipe.expire(self._redis_hash_key, 3600)
|
||
if self._legacy_hash_key:
|
||
for key, value in self._cache.items():
|
||
if isinstance(value, (dict, list, bool, int, float)):
|
||
value_str = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
value_str = str(value)
|
||
pipe.hset(self._legacy_hash_key, key, value_str)
|
||
pipe.expire(self._legacy_hash_key, 3600)
|
||
pipe.execute()
|
||
logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis")
|
||
except Exception as e:
|
||
logger.debug(f"同步配置到Redis失败: {e}")
|
||
|
||
def _load_from_db(self):
|
||
"""从数据库加载配置(如果Redis中没有)"""
|
||
if TradingConfig is None:
|
||
logger.warning("TradingConfig未导入,无法从数据库加载配置")
|
||
self._cache = {}
|
||
return
|
||
|
||
try:
|
||
# 先尝试从Redis加载所有配置
|
||
if self._redis_connected and self._redis_client:
|
||
try:
|
||
# 测试连接是否真正可用
|
||
self._redis_client.ping()
|
||
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
|
||
if (not redis_configs) and self._legacy_hash_key:
|
||
redis_configs = self._redis_client.hgetall(self._legacy_hash_key)
|
||
if redis_configs and len(redis_configs) > 0:
|
||
# 解析Redis中的配置
|
||
for key, value_str in redis_configs.items():
|
||
self._cache[key] = self._coerce_redis_value(value_str)
|
||
logger.info(f"从Redis加载了 {len(self._cache)} 个配置项")
|
||
return
|
||
else:
|
||
# Redis中没有配置,需要从数据库加载并同步到Redis
|
||
logger.debug("Redis中没有配置数据,从数据库加载并同步到Redis")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis加载配置失败: {e},回退到数据库")
|
||
# 连接失败时,标记为未连接
|
||
try:
|
||
self._redis_client.ping()
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
# 从数据库加载配置(仅在Redis不可用或Redis中没有数据时)
|
||
configs = TradingConfig.get_all(account_id=self.account_id)
|
||
for config in configs:
|
||
key = config['config_key']
|
||
value = TradingConfig._convert_value(
|
||
config['config_value'],
|
||
config['config_type']
|
||
)
|
||
self._cache[key] = value
|
||
# 同时写入Redis缓存
|
||
self._set_to_redis(key, value)
|
||
|
||
logger.info(f"从数据库加载了 {len(self._cache)} 个配置项,已同步到Redis")
|
||
except Exception as e:
|
||
logger.warning(f"从数据库加载配置失败,使用默认配置: {e}")
|
||
self._cache = {}
|
||
|
||
def get(self, key, default=None):
|
||
"""获取配置值"""
|
||
# 账号私有:API Key/Secret/Testnet 从 accounts 表读取(不走 trading_config)
|
||
if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None:
|
||
try:
|
||
api_key, api_secret, use_testnet, status = Account.get_credentials(self.account_id)
|
||
logger.debug(f"ConfigManager.get({key}, account_id={self.account_id}): api_key存在={bool(api_key)}, api_secret存在={bool(api_secret)}, status={status}")
|
||
if key == "BINANCE_API_KEY":
|
||
# 如果 api_key 为空字符串,返回 None 而不是 default(避免返回 'your_api_key_here')
|
||
if not api_key or api_key.strip() == "":
|
||
logger.warning(f"ConfigManager.get(BINANCE_API_KEY, account_id={self.account_id}): API密钥为空字符串")
|
||
return None # 返回 None,让调用方知道密钥未配置
|
||
return api_key
|
||
if key == "BINANCE_API_SECRET":
|
||
# 如果 api_secret 为空字符串,返回 None 而不是 default(避免返回 'your_api_secret_here')
|
||
if not api_secret or api_secret.strip() == "":
|
||
logger.warning(f"ConfigManager.get(BINANCE_API_SECRET, account_id={self.account_id}): API密钥Secret为空字符串")
|
||
return None # 返回 None,让调用方知道密钥未配置
|
||
return api_secret
|
||
return bool(use_testnet)
|
||
except Exception as e:
|
||
# 回退到后续逻辑(旧数据/无表)
|
||
logger.warning(f"ConfigManager.get({key}, account_id={self.account_id}): Account.get_credentials 失败: {e}")
|
||
pass
|
||
|
||
# 1. 优先从Redis缓存读取(最新)
|
||
# 注意:只在Redis连接正常时尝试读取,避免频繁连接失败
|
||
if self._redis_connected and self._redis_client:
|
||
redis_value = self._get_from_redis(key)
|
||
if redis_value is not None:
|
||
# 同时更新本地缓存
|
||
self._cache[key] = redis_value
|
||
return redis_value
|
||
|
||
# 2. 从本地缓存读取
|
||
if key in self._cache:
|
||
return self._cache[key]
|
||
|
||
# 3. 从环境变量读取
|
||
env_value = os.getenv(key)
|
||
if env_value is not None:
|
||
return env_value
|
||
|
||
# 4. 返回默认值
|
||
return default
|
||
|
||
def set(self, key, value, config_type='string', category='general', description=None):
|
||
"""设置配置(同时更新数据库、Redis缓存和本地缓存)"""
|
||
# 账号私有:API Key/Secret/Testnet 写入 accounts 表
|
||
if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None:
|
||
try:
|
||
if key == "BINANCE_API_KEY":
|
||
Account.update_credentials(self.account_id, api_key=str(value or ""))
|
||
elif key == "BINANCE_API_SECRET":
|
||
Account.update_credentials(self.account_id, api_secret=str(value or ""))
|
||
else:
|
||
Account.update_credentials(self.account_id, use_testnet=bool(value))
|
||
self._cache[key] = value
|
||
return
|
||
except Exception as e:
|
||
logger.error(f"更新账号API配置失败: {e}")
|
||
raise
|
||
|
||
if TradingConfig is None:
|
||
logger.warning("TradingConfig未导入,无法更新数据库配置")
|
||
self._cache[key] = value
|
||
# 仍然尝试更新Redis
|
||
self._set_to_redis(key, value)
|
||
return
|
||
|
||
try:
|
||
# 1. 更新数据库
|
||
TradingConfig.set(key, value, config_type, category, description, account_id=self.account_id)
|
||
|
||
# 2. 更新本地缓存
|
||
self._cache[key] = value
|
||
|
||
# 3. 更新Redis缓存(确保trading_system能立即读取到最新配置)
|
||
self._set_to_redis(key, value)
|
||
|
||
logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)")
|
||
except Exception as e:
|
||
logger.error(f"更新配置失败: {e}")
|
||
raise
|
||
|
||
def reload(self):
|
||
"""重新加载配置(优先从Redis,其次从数据库)"""
|
||
self._cache = {}
|
||
self._load_from_db()
|
||
|
||
def reload_from_redis(self):
|
||
"""强制从Redis重新加载配置(用于trading_system即时获取最新配置)"""
|
||
# 如果Redis不可用,不重新加载,保持现有缓存
|
||
if not self._redis_connected or not self._redis_client:
|
||
logger.debug("Redis未连接,跳过从Redis重新加载,保持现有缓存")
|
||
return
|
||
|
||
try:
|
||
# 测试连接是否真正可用
|
||
self._redis_client.ping()
|
||
except Exception as e:
|
||
logger.debug(f"Redis连接不可用: {e},跳过从Redis重新加载")
|
||
self._redis_connected = False
|
||
return
|
||
|
||
try:
|
||
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
|
||
if (not redis_configs) and self._legacy_hash_key:
|
||
redis_configs = self._redis_client.hgetall(self._legacy_hash_key)
|
||
if redis_configs and len(redis_configs) > 0:
|
||
self._cache = {} # 清空缓存
|
||
for key, value_str in redis_configs.items():
|
||
self._cache[key] = self._coerce_redis_value(value_str)
|
||
logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项")
|
||
else:
|
||
# Redis中没有配置,但不回退到数据库(避免频繁从数据库加载)
|
||
logger.debug("Redis中没有配置数据,保持现有缓存")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载配置失败: {e},保持现有缓存")
|
||
# 连接失败时,标记为未连接
|
||
try:
|
||
self._redis_client.ping()
|
||
except:
|
||
self._redis_connected = False
|
||
|
||
def get_trading_config(self):
|
||
"""获取交易配置字典(兼容原有config.py的TRADING_CONFIG)"""
|
||
# 全局策略配置管理器(从独立的global_strategy_config表读取)
|
||
global_config_mgr = GlobalStrategyConfigManager()
|
||
try:
|
||
global_config_mgr.reload_from_redis()
|
||
except Exception:
|
||
pass
|
||
|
||
def eff_get(key: str, default: Any):
|
||
"""
|
||
策略核心:从全局配置表读取(global_strategy_config)。
|
||
风险旋钮:从当前账号读取。
|
||
"""
|
||
# API key/secret/testnet 永远按账号读取(在 get() 内部已处理)
|
||
if key in RISK_KNOBS_KEYS:
|
||
value = self.get(key, default)
|
||
else:
|
||
# 从全局配置表读取
|
||
try:
|
||
value = global_config_mgr.get(key, default)
|
||
except Exception:
|
||
value = self.get(key, default)
|
||
|
||
# ⚠️ 临时兼容性处理:百分比配置值格式转换
|
||
# 如果配置值是百分比形式(>1),转换为比例形式(除以100)
|
||
# 兼容数据库中可能存在的旧数据(百分比形式,如30表示30%)
|
||
# 数据迁移完成后,可以移除此逻辑
|
||
# 统一格式:数据库、前端、后端都使用比例形式(0.30表示30%)
|
||
if isinstance(value, (int, float)) and value is not None:
|
||
# 需要转换的百分比配置项
|
||
percent_keys = [
|
||
'TRAILING_STOP_ACTIVATION',
|
||
'TRAILING_STOP_PROTECT',
|
||
'MIN_VOLATILITY',
|
||
'TAKE_PROFIT_PERCENT',
|
||
'STOP_LOSS_PERCENT',
|
||
'MIN_STOP_LOSS_PRICE_PCT',
|
||
'MIN_TAKE_PROFIT_PRICE_PCT',
|
||
'FIXED_RISK_PERCENT',
|
||
'MAX_POSITION_PERCENT',
|
||
'MAX_TOTAL_POSITION_PERCENT',
|
||
'MIN_POSITION_PERCENT',
|
||
]
|
||
|
||
if key in percent_keys:
|
||
# 如果值>1,认为是百分比形式(旧数据),转换为比例形式
|
||
# 静默转换,不输出警告(用户已确认数据库应存储小数形式)
|
||
if value > 1:
|
||
value = value / 100.0
|
||
# 静默更新Redis缓存,避免下次读取时再次触发转换
|
||
try:
|
||
if key in RISK_KNOBS_KEYS:
|
||
# 风险旋钮:更新当前账号的Redis缓存
|
||
self._set_to_redis(key, value)
|
||
# 同时更新本地缓存
|
||
self._cache[key] = value
|
||
else:
|
||
# 全局配置:更新全局配置的Redis缓存
|
||
global_config_mgr._set_to_redis(key, value)
|
||
# 同时更新本地缓存
|
||
global_config_mgr._cache[key] = value
|
||
except Exception as e:
|
||
logger.debug(f"更新Redis缓存失败(不影响使用): {key} = {e}")
|
||
|
||
return value
|
||
|
||
# 交易预设:控制一组参数的“默认性格”
|
||
profile = str(eff_get('TRADING_PROFILE', 'conservative') or 'conservative').lower()
|
||
is_fast = profile in ('fast', 'fast_test', 'aggressive')
|
||
|
||
max_daily_default = 30 if is_fast else 8
|
||
scan_interval_default = 900 if is_fast else 1800
|
||
min_signal_default = 7 if is_fast else 8 # 2026-01-29优化:稳健模式从9降到8(平衡胜率和交易频率)
|
||
cooldown_default = 900 if is_fast else 1800
|
||
allow_neutral_default = True if is_fast else False
|
||
short_filter_default = False if is_fast else True
|
||
max_trend_move_default = 0.08 if is_fast else 0.05
|
||
|
||
return {
|
||
# 仓位控制
|
||
'MAX_POSITION_PERCENT': eff_get('MAX_POSITION_PERCENT', 0.08), # 单笔最大保证金占比
|
||
'MAX_TOTAL_POSITION_PERCENT': eff_get('MAX_TOTAL_POSITION_PERCENT', 0.40), # 总保证金占比上限
|
||
'MIN_POSITION_PERCENT': eff_get('MIN_POSITION_PERCENT', 0.02), # 最小保证金占比
|
||
'MIN_MARGIN_USDT': eff_get('MIN_MARGIN_USDT', 5.0), # 最小保证金(USDT)
|
||
|
||
# 用户风险旋钮:自动交易开关/频次控制
|
||
'AUTO_TRADE_ENABLED': eff_get('AUTO_TRADE_ENABLED', True),
|
||
'MAX_OPEN_POSITIONS': eff_get('MAX_OPEN_POSITIONS', 3),
|
||
'MAX_DAILY_ENTRIES': eff_get('MAX_DAILY_ENTRIES', max_daily_default),
|
||
|
||
# 涨跌幅阈值
|
||
'MIN_CHANGE_PERCENT': eff_get('MIN_CHANGE_PERCENT', 2.0),
|
||
|
||
# 风险控制
|
||
# ⚠️ 2026-01-29优化:放宽止损,减少被正常波动扫出
|
||
# - 提高ATR倍数(从1.5到2.0),给市场波动更多空间
|
||
# - 提高最小价格变动百分比(从2%到2.5%),避免止损过紧
|
||
'STOP_LOSS_PERCENT': eff_get('STOP_LOSS_PERCENT', 0.12), # 默认12%(保证金百分比)
|
||
'TAKE_PROFIT_PERCENT': eff_get('TAKE_PROFIT_PERCENT', 0.10), # 默认10%(2026-01-27优化:进一步降低止盈目标,更容易触发,提升止盈单比例)
|
||
'MIN_STOP_LOSS_PRICE_PCT': eff_get('MIN_STOP_LOSS_PRICE_PCT', 0.025), # 默认2.5%(2026-01-29优化:从2%提高到2.5%,给波动更多空间)
|
||
'MIN_TAKE_PROFIT_PRICE_PCT': eff_get('MIN_TAKE_PROFIT_PRICE_PCT', 0.02), # 默认2%(防止ATR过小时计算出不切实际的微小止盈距离)
|
||
'USE_ATR_STOP_LOSS': eff_get('USE_ATR_STOP_LOSS', True), # 是否使用ATR动态止损
|
||
'ATR_STOP_LOSS_MULTIPLIER': eff_get('ATR_STOP_LOSS_MULTIPLIER', 2.0), # ATR止损倍数2.0(2026-01-29优化:从1.5提高到2.0,减少被正常波动扫出)
|
||
'ATR_TAKE_PROFIT_MULTIPLIER': eff_get('ATR_TAKE_PROFIT_MULTIPLIER', 2.0), # ATR止盈倍数2.0(2026-01-27优化:降低止盈目标,更容易触发)
|
||
'RISK_REWARD_RATIO': eff_get('RISK_REWARD_RATIO', 3.0), # 盈亏比3:1(2026-01-27优化:降低,更容易触发,保证胜率)
|
||
'ATR_PERIOD': eff_get('ATR_PERIOD', 14), # ATR计算周期
|
||
'USE_DYNAMIC_ATR_MULTIPLIER': eff_get('USE_DYNAMIC_ATR_MULTIPLIER', False), # 是否根据波动率动态调整ATR倍数
|
||
'ATR_MULTIPLIER_MIN': eff_get('ATR_MULTIPLIER_MIN', 1.5), # 动态ATR倍数最小值
|
||
'ATR_MULTIPLIER_MAX': eff_get('ATR_MULTIPLIER_MAX', 2.5), # 动态ATR倍数最大值
|
||
|
||
# 固定风险百分比仓位计算(凯利公式)
|
||
'USE_FIXED_RISK_SIZING': eff_get('USE_FIXED_RISK_SIZING', True), # 使用固定风险百分比计算仓位
|
||
'FIXED_RISK_PERCENT': eff_get('FIXED_RISK_PERCENT', 0.02), # 每笔单子承受的风险(2%)
|
||
|
||
# 市场扫描(30分钟主周期)
|
||
'SCAN_INTERVAL': eff_get('SCAN_INTERVAL', scan_interval_default), # 30分钟(增加交易机会)
|
||
'TOP_N_SYMBOLS': eff_get('TOP_N_SYMBOLS', 8), # 每次扫描后处理的交易对数量(增加到8,给更多选择余地)
|
||
'MAX_SCAN_SYMBOLS': eff_get('MAX_SCAN_SYMBOLS', 250), # 扫描的最大交易对数量(增加到250,提升覆盖率到46%)
|
||
'EXCLUDE_MAJOR_COINS': eff_get('EXCLUDE_MAJOR_COINS', True), # 是否排除主流币(BTC、ETH、BNB等),专注于山寨币
|
||
'KLINE_INTERVAL': eff_get('KLINE_INTERVAL', '1h'),
|
||
'PRIMARY_INTERVAL': eff_get('PRIMARY_INTERVAL', '1h'),
|
||
'CONFIRM_INTERVAL': eff_get('CONFIRM_INTERVAL', '4h'),
|
||
'ENTRY_INTERVAL': eff_get('ENTRY_INTERVAL', '15m'),
|
||
|
||
# 过滤条件
|
||
'MIN_VOLUME_24H': eff_get('MIN_VOLUME_24H', 10000000),
|
||
'MIN_VOLATILITY': eff_get('MIN_VOLATILITY', 0.02),
|
||
|
||
# 高胜率策略参数
|
||
# ⚠️ 2026-01-29优化:提高信号强度门槛(稳健模式从9到8),减少低质量信号,提升胜率
|
||
'MIN_SIGNAL_STRENGTH': eff_get('MIN_SIGNAL_STRENGTH', min_signal_default), # 默认值随 profile 调整(快速模式7,稳健模式8)
|
||
'LEVERAGE': eff_get('LEVERAGE', 10),
|
||
'USE_DYNAMIC_LEVERAGE': eff_get('USE_DYNAMIC_LEVERAGE', True),
|
||
'MAX_LEVERAGE': eff_get('MAX_LEVERAGE', 15), # 降低到15,更保守,配合更大的保证金
|
||
# 移动止损:默认关闭(避免过早截断利润,让利润奔跑)
|
||
'USE_TRAILING_STOP': eff_get('USE_TRAILING_STOP', True), # 默认启用(2026-01-27优化:启用移动止损,保护利润)
|
||
'TRAILING_STOP_ACTIVATION': eff_get('TRAILING_STOP_ACTIVATION', 0.05), # 默认5%(2026-01-27优化:更早保护利润,避免回吐)
|
||
'TRAILING_STOP_PROTECT': eff_get('TRAILING_STOP_PROTECT', 0.025), # 默认2.5%(2026-01-27优化:给回撤足够空间,避免被震荡扫出)
|
||
|
||
# 最小持仓时间锁(强制波段持仓纪律,避免分钟级平仓)
|
||
'MIN_HOLD_TIME_SEC': eff_get('MIN_HOLD_TIME_SEC', 1800), # 默认30分钟(1800秒)
|
||
|
||
# 自动交易过滤(用于提升胜率/控频)
|
||
# 说明:这两个 key 需要出现在 TRADING_CONFIG 中,否则 trading_system 在每次 reload_from_redis 后会丢失它们,
|
||
# 导致始终按默认值拦截自动交易(用户在配置页怎么开都没用)。
|
||
'AUTO_TRADE_ONLY_TRENDING': eff_get('AUTO_TRADE_ONLY_TRENDING', True),
|
||
'AUTO_TRADE_ALLOW_4H_NEUTRAL': eff_get('AUTO_TRADE_ALLOW_4H_NEUTRAL', allow_neutral_default),
|
||
|
||
# 智能入场/限价偏移(部分逻辑会直接读取 TRADING_CONFIG)
|
||
'LIMIT_ORDER_OFFSET_PCT': eff_get('LIMIT_ORDER_OFFSET_PCT', 0.5),
|
||
'SMART_ENTRY_ENABLED': eff_get('SMART_ENTRY_ENABLED', False),
|
||
'SMART_ENTRY_STRONG_SIGNAL': eff_get('SMART_ENTRY_STRONG_SIGNAL', min_signal_default),
|
||
'ENTRY_SYMBOL_COOLDOWN_SEC': eff_get('ENTRY_SYMBOL_COOLDOWN_SEC', cooldown_default),
|
||
'ENTRY_TIMEOUT_SEC': eff_get('ENTRY_TIMEOUT_SEC', 180),
|
||
'ENTRY_STEP_WAIT_SEC': eff_get('ENTRY_STEP_WAIT_SEC', 15),
|
||
'ENTRY_CHASE_MAX_STEPS': eff_get('ENTRY_CHASE_MAX_STEPS', 4),
|
||
'ENTRY_MARKET_FALLBACK_AFTER_SEC': eff_get('ENTRY_MARKET_FALLBACK_AFTER_SEC', 45),
|
||
'ENTRY_CONFIRM_TIMEOUT_SEC': eff_get('ENTRY_CONFIRM_TIMEOUT_SEC', 30),
|
||
'ENTRY_MAX_DRIFT_PCT_TRENDING': eff_get('ENTRY_MAX_DRIFT_PCT_TRENDING', 0.006),
|
||
'ENTRY_MAX_DRIFT_PCT_RANGING': eff_get('ENTRY_MAX_DRIFT_PCT_RANGING', 0.3),
|
||
|
||
# 动态过滤优化
|
||
'BETA_FILTER_ENABLED': eff_get('BETA_FILTER_ENABLED', True), # 大盘共振过滤:BTC/ETH下跌时屏蔽多单
|
||
'BETA_FILTER_THRESHOLD': eff_get('BETA_FILTER_THRESHOLD', -0.005), # -0.5%(2026-01-27优化:更敏感地过滤大盘风险,15分钟内跌幅超过0.5%即屏蔽多单)
|
||
|
||
# 趋势尾部入场过滤 & 15m 短周期方向过滤开关(由 profile 控制默认值)
|
||
'ENTRY_SHORT_INTERVAL': eff_get('ENTRY_SHORT_INTERVAL', '15m'),
|
||
'ENTRY_SHORT_TREND_FILTER_ENABLED': eff_get('ENTRY_SHORT_TREND_FILTER_ENABLED', short_filter_default),
|
||
'ENTRY_SHORT_TREND_MIN_PCT': eff_get('ENTRY_SHORT_TREND_MIN_PCT', 0.003),
|
||
'ENTRY_SHORT_CONFIRM_CANDLES': eff_get('ENTRY_SHORT_CONFIRM_CANDLES', 3),
|
||
'USE_TREND_ENTRY_FILTER': eff_get('USE_TREND_ENTRY_FILTER', True),
|
||
# ⚠️ 2026-01-29优化:收紧趋势尾部过滤(稳健模式从0.05到0.04),更严格避免追高杀跌
|
||
'MAX_TREND_MOVE_BEFORE_ENTRY': eff_get('MAX_TREND_MOVE_BEFORE_ENTRY', max_trend_move_default), # 快速模式0.08,稳健模式0.04
|
||
'TREND_STATE_TTL_SEC': eff_get('TREND_STATE_TTL_SEC', 3600),
|
||
'RECO_USE_TREND_ENTRY_FILTER': eff_get('RECO_USE_TREND_ENTRY_FILTER', True),
|
||
'RECO_MAX_TREND_MOVE_BEFORE_ENTRY': eff_get('RECO_MAX_TREND_MOVE_BEFORE_ENTRY', 0.04),
|
||
|
||
# 当前交易预设(让 trading_system 能知道是哪种模式)
|
||
'TRADING_PROFILE': profile,
|
||
|
||
}
|
||
|
||
def _sync_to_redis(self):
|
||
"""将配置同步到Redis缓存(账号维度)"""
|
||
if not self._redis_connected or not self._redis_client:
|
||
return
|
||
try:
|
||
payload = {k: json.dumps(v) for k, v in self._cache.items()}
|
||
self._redis_client.hset(self._redis_hash_key, mapping=payload)
|
||
self._redis_client.expire(self._redis_hash_key, 3600)
|
||
if self._legacy_hash_key:
|
||
self._redis_client.hset(self._legacy_hash_key, mapping=payload)
|
||
self._redis_client.expire(self._legacy_hash_key, 3600)
|
||
except Exception as e:
|
||
logger.debug(f"同步配置到Redis失败: {e}")
|
||
|
||
# 全局配置管理器实例(默认账号;trading_system 进程可通过 ATS_ACCOUNT_ID 指定)
|
||
try:
|
||
_default_account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1)
|
||
except Exception:
|
||
_default_account_id = 1
|
||
config_manager = ConfigManager.for_account(_default_account_id)
|
||
|
||
# 兼容原有config.py的接口
|
||
def get_config(key, default=None):
|
||
"""获取配置(兼容函数)"""
|
||
return config_manager.get(key, default)
|
||
|
||
def get_trading_config():
|
||
"""获取交易配置(兼容函数)"""
|
||
return config_manager.get_trading_config()
|