auto_trade_sys/backend/config_manager.py
薇薇安 ed994e6e8e a
2026-01-26 15:50:06 +08:00

844 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
配置管理器 - 从数据库读取配置兼容原有config.py
支持Redis缓存实现配置即时生效
"""
import sys
import os
import json
import re
from pathlib import Path
from typing import Optional, Any
# 加载.env文件
try:
from dotenv import load_dotenv
backend_dir = Path(__file__).parent
project_root = backend_dir.parent
env_files = [
backend_dir / '.env',
project_root / '.env',
]
for env_file in env_files:
if env_file.exists():
load_dotenv(env_file, override=True)
break
else:
load_dotenv(project_root / '.env', override=False)
except ImportError:
pass
except Exception:
pass
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 延迟导入避免在trading_system中导入时因为缺少依赖而失败
try:
from database.models import TradingConfig, Account
except ImportError as e:
TradingConfig = None
Account = None
import logging
logger = logging.getLogger(__name__)
logger.warning(f"无法导入TradingConfig: {e},配置管理器将无法使用数据库")
import logging
logger = logging.getLogger(__name__)
# 平台兜底策略核心使用全局配置表global_strategy_config普通用户账号只允许调整“风险旋钮”
# - 风险旋钮:每个账号独立(仓位/频次等)
# - 其它策略参数:统一从全局配置表读取,避免每个用户乱改导致策略不可控
# 注意不再依赖account_id=1全局配置存储在独立的global_strategy_config表中
RISK_KNOBS_KEYS = {
"MIN_MARGIN_USDT",
"MIN_POSITION_PERCENT",
"MAX_POSITION_PERCENT",
"MAX_TOTAL_POSITION_PERCENT",
"AUTO_TRADE_ENABLED",
"MAX_OPEN_POSITIONS",
"MAX_DAILY_ENTRIES",
}
# 尝试导入同步Redis客户端用于配置缓存
try:
import redis
REDIS_SYNC_AVAILABLE = True
except ImportError:
REDIS_SYNC_AVAILABLE = False
redis = None
class GlobalStrategyConfigManager:
"""全局策略配置管理器(独立于账户,管理员专用)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self._cache = {}
self._redis_client: Optional[redis.Redis] = None
self._redis_connected = False
self._redis_hash_key = "global_strategy_config" # 独立的Redis键
self._init_redis()
self._load_from_db()
def _init_redis(self):
"""初始化Redis客户端同步"""
if not REDIS_SYNC_AVAILABLE:
logger.debug("redis-py未安装全局配置缓存将不使用Redis")
return
try:
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'
redis_username = os.getenv('REDIS_USERNAME', None)
redis_password = os.getenv('REDIS_PASSWORD', None)
if not redis_url or not isinstance(redis_url, str):
redis_url = 'redis://localhost:6379'
if redis_use_tls and not redis_url.startswith('rediss://'):
if redis_url.startswith('redis://'):
redis_url = redis_url.replace('redis://', 'rediss://', 1)
connection_kwargs = {
'username': redis_username,
'password': redis_password,
'decode_responses': True
}
if redis_url.startswith('rediss://') or redis_use_tls:
ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required')
ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None)
connection_kwargs['select'] = int(os.getenv('REDIS_SELECT', 0))
connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
if ssl_ca_certs:
connection_kwargs['ssl_ca_certs'] = ssl_ca_certs
if ssl_cert_reqs == 'none':
connection_kwargs['ssl_check_hostname'] = False
elif ssl_cert_reqs == 'required':
connection_kwargs['ssl_check_hostname'] = True
else:
connection_kwargs['ssl_check_hostname'] = False
self._redis_client = redis.from_url(redis_url, **connection_kwargs)
self._redis_client.ping()
self._redis_connected = True
logger.info("✓ 全局策略配置Redis缓存连接成功")
except Exception as e:
logger.debug(f"全局策略配置Redis缓存连接失败: {e},将使用数据库缓存")
self._redis_client = None
self._redis_connected = False
def _get_from_redis(self, key: str) -> Optional[Any]:
"""从Redis获取全局配置值"""
if not self._redis_connected or not self._redis_client:
return None
try:
value = self._redis_client.hget(self._redis_hash_key, key)
if value is not None and value != '':
return ConfigManager._coerce_redis_value(value)
except Exception as e:
logger.debug(f"从Redis获取全局配置失败 {key}: {e}")
try:
self._redis_client.ping()
self._redis_connected = True
except:
self._redis_connected = False
return None
def _set_to_redis(self, key: str, value: Any):
"""设置全局配置到Redis"""
if not self._redis_connected or not self._redis_client:
return False
try:
if isinstance(value, (dict, list, bool, int, float)):
value_str = json.dumps(value, ensure_ascii=False)
else:
value_str = str(value)
self._redis_client.hset(self._redis_hash_key, key, value_str)
self._redis_client.expire(self._redis_hash_key, 3600)
return True
except Exception as e:
logger.debug(f"设置全局配置到Redis失败 {key}: {e}")
try:
self._redis_client.ping()
self._redis_connected = True
except:
self._redis_connected = False
return False
def _load_from_db(self):
"""从数据库加载全局配置"""
try:
from database.models import GlobalStrategyConfig
except ImportError:
logger.warning("GlobalStrategyConfig未导入无法从数据库加载全局配置")
self._cache = {}
return
try:
# 先尝试从Redis加载
if self._redis_connected and self._redis_client:
try:
self._redis_client.ping()
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
if redis_configs and len(redis_configs) > 0:
for key, value_str in redis_configs.items():
self._cache[key] = ConfigManager._coerce_redis_value(value_str)
logger.info(f"从Redis加载了 {len(self._cache)} 个全局配置项")
return
except Exception as e:
logger.debug(f"从Redis加载全局配置失败: {e},回退到数据库")
try:
self._redis_client.ping()
except:
self._redis_connected = False
# 从数据库加载
configs = GlobalStrategyConfig.get_all()
for config in configs:
key = config['config_key']
# 使用TradingConfig的转换方法GlobalStrategyConfig复用
from database.models import TradingConfig
value = TradingConfig._convert_value(
config['config_value'],
config['config_type']
)
self._cache[key] = value
self._set_to_redis(key, value)
logger.info(f"从数据库加载了 {len(self._cache)} 个全局配置项已同步到Redis")
except Exception as e:
logger.warning(f"从数据库加载全局配置失败,使用默认配置: {e}")
self._cache = {}
def get(self, key: str, default: Any = None) -> Any:
"""获取全局配置值"""
# 1. 优先从Redis缓存读取
if self._redis_connected and self._redis_client:
redis_value = self._get_from_redis(key)
if redis_value is not None:
self._cache[key] = redis_value
return redis_value
# 2. 从本地缓存读取
if key in self._cache:
return self._cache[key]
# 3. 从数据库读取
try:
from database.models import GlobalStrategyConfig
db_value = GlobalStrategyConfig.get_value(key)
if db_value is not None:
self._cache[key] = db_value
self._set_to_redis(key, db_value)
return db_value
except Exception:
pass
# 4. 从环境变量读取
env_value = os.getenv(key)
if env_value is not None:
return env_value
# 5. 返回默认值
return default
def reload_from_redis(self):
"""强制从Redis重新加载全局配置"""
if not self._redis_connected or not self._redis_client:
return
try:
self._redis_client.ping()
except Exception as e:
logger.debug(f"Redis连接不可用: {e}跳过从Redis重新加载")
self._redis_connected = False
return
try:
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
if redis_configs and len(redis_configs) > 0:
self._cache = {}
for key, value_str in redis_configs.items():
self._cache[key] = ConfigManager._coerce_redis_value(value_str)
logger.debug(f"从Redis重新加载了 {len(self._cache)} 个全局配置项")
except Exception as e:
logger.debug(f"从Redis重新加载全局配置失败: {e},保持现有缓存")
class ConfigManager:
"""配置管理器 - 优先从Redis缓存读取其次从数据库读取回退到环境变量和默认值"""
_instances = {}
def __init__(self, account_id: int = 1):
self.account_id = int(account_id or 1)
self._cache = {}
self._redis_client: Optional[redis.Redis] = None
self._redis_connected = False
self._redis_hash_key = f"trading_config:{self.account_id}"
self._legacy_hash_key = "trading_config" if self.account_id == 1 else None
self._init_redis()
self._load_from_db()
@classmethod
def for_account(cls, account_id: int):
aid = int(account_id or 1)
inst = cls._instances.get(aid)
if inst:
return inst
inst = cls(account_id=aid)
cls._instances[aid] = inst
return inst
def _init_redis(self):
"""初始化Redis客户端同步"""
if not REDIS_SYNC_AVAILABLE:
logger.debug("redis-py未安装配置缓存将不使用Redis")
return
try:
# 从环境变量或配置获取Redis连接信息
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
redis_use_tls = os.getenv('REDIS_USE_TLS', 'False').lower() == 'true'
redis_username = os.getenv('REDIS_USERNAME', None)
redis_password = os.getenv('REDIS_PASSWORD', None)
# 验证URL格式
if not redis_url or not isinstance(redis_url, str):
logger.warning(f"Redis URL无效: {redis_url},使用默认值")
redis_url = 'redis://localhost:6379'
# 如果使用TLS但URL不是rediss://,自动转换
if redis_use_tls and not redis_url.startswith('rediss://'):
if redis_url.startswith('redis://'):
redis_url = redis_url.replace('redis://', 'rediss://', 1)
else:
# 如果URL格式不正确构建新的URL
logger.warning(f"Redis URL格式不正确且需要TLS: {redis_url}尝试构建rediss://URL")
# 尝试从URL中提取主机和端口
if '://' in redis_url:
parts = redis_url.split('://', 1)
host_port = parts[1].split('/')[0] if '/' in parts[1] else parts[1]
redis_url = f"rediss://{host_port}"
else:
# 如果完全没有scheme添加rediss://
redis_url = f"rediss://{redis_url}"
logger.info(f"已自动转换Redis URL为TLS格式: {redis_url}")
# 解析Redis URL
# redis-py的同步客户端也支持通过ssl_cert_reqs等参数配置TLS
# 当URL是rediss://时会自动启用SSL
connection_kwargs = {
'username': redis_username,
'password': redis_password,
'decode_responses': True
}
if redis_url.startswith('rediss://') or redis_use_tls:
# TLS连接 - 使用redis-py支持的SSL参数
# 从环境变量获取SSL配置如果未设置使用默认值
ssl_cert_reqs = os.getenv('REDIS_SSL_CERT_REQS', 'required')
ssl_ca_certs = os.getenv('REDIS_SSL_CA_CERTS', None)
connection_kwargs['select'] = os.getenv('REDIS_SELECT', 0)
if connection_kwargs['select'] is not None:
connection_kwargs['select'] = int(connection_kwargs['select'])
else:
connection_kwargs['select'] = 0
logger.info(f"使用 Redis 数据库: {connection_kwargs['select']}")
# 设置SSL参数
connection_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
if ssl_ca_certs:
connection_kwargs['ssl_ca_certs'] = ssl_ca_certs
# 根据ssl_cert_reqs设置主机名验证
if ssl_cert_reqs == 'none':
connection_kwargs['ssl_check_hostname'] = False
elif ssl_cert_reqs == 'required':
connection_kwargs['ssl_check_hostname'] = True
else: # optional
connection_kwargs['ssl_check_hostname'] = False
logger.info(f"使用 TLS 连接 Redis: {redis_url} (ssl_cert_reqs={ssl_cert_reqs})")
# 创建Redis客户端同步
self._redis_client = redis.from_url(
redis_url,
**connection_kwargs
)
# 测试连接
self._redis_client.ping()
self._redis_connected = True
logger.info("✓ Redis配置缓存连接成功")
except Exception as e:
logger.debug(f"Redis配置缓存连接失败: {e},将使用数据库缓存")
self._redis_client = None
self._redis_connected = False
def _get_from_redis(self, key: str) -> Optional[Any]:
"""从Redis获取配置值"""
if not self._redis_connected or not self._redis_client:
return None
try:
# 使用账号维度 Hash 存储所有配置
value = self._redis_client.hget(self._redis_hash_key, key)
if (value is None or value == '') and self._legacy_hash_key:
value = self._redis_client.hget(self._legacy_hash_key, key)
if value is not None and value != '':
return self._coerce_redis_value(value)
except Exception as e:
logger.debug(f"从Redis获取配置失败 {key}: {e}")
# 连接失败时,尝试重新连接
try:
self._redis_client.ping()
self._redis_connected = True
except:
self._redis_connected = False
return None
@staticmethod
def _coerce_redis_value(value_str: Any) -> Any:
"""
将 Redis Hash 中的字符串值尽量还原为正确类型。
兼容历史写法:
- 之前 bool 会被写成 "True"/"False"(非 JSON导致读取后变成字符串进而在逻辑判断里永远为 True。
- 现在会优先把 bool/int/float/dict/list 用 JSON 序列化写入(见 _set_to_redis
"""
if value_str is None:
return None
# redis-py decode_responses=True 时是 str否则可能是 bytes
if isinstance(value_str, bytes):
try:
value_str = value_str.decode('utf-8', errors='ignore')
except Exception:
value_str = str(value_str)
s = str(value_str).strip()
if s == '':
return ''
# 1) JSON 优先dict/list/bool/number 都能覆盖)
try:
return json.loads(s)
except Exception:
pass
sl = s.lower()
# 2) bool 兼容(历史 "True"/"False"
if sl in ('true', 'false', '1', '0', 'yes', 'no', 'on', 'off'):
return sl in ('true', '1', 'yes', 'on')
# 3) number 兼容(避免把 "1h" 之类误判)
# int: -?\d+
if re.fullmatch(r'-?\d+', s):
try:
return int(s)
except Exception:
return s
# float: -?\d+\.\d+
if re.fullmatch(r'-?\d+\.\d+', s):
try:
return float(s)
except Exception:
return s
return s
def _set_to_redis(self, key: str, value: Any):
"""设置配置到Redis账号维度 + legacy兼容"""
if not self._redis_connected or not self._redis_client:
return False
try:
# 将值序列化:复杂类型/基础类型使用 JSON避免 bool 被写成 "False" 字符串后逻辑误判
if isinstance(value, (dict, list, bool, int, float)):
value_str = json.dumps(value, ensure_ascii=False)
else:
value_str = str(value)
self._redis_client.hset(self._redis_hash_key, key, value_str)
self._redis_client.expire(self._redis_hash_key, 3600)
if self._legacy_hash_key:
self._redis_client.hset(self._legacy_hash_key, key, value_str)
self._redis_client.expire(self._legacy_hash_key, 3600)
return True
except Exception as e:
logger.debug(f"设置配置到Redis失败 {key}: {e}")
# 连接失败时,尝试重新连接
try:
self._redis_client.ping()
self._redis_connected = True
# 重试一次
if isinstance(value, (dict, list, bool, int, float)):
value_str = json.dumps(value, ensure_ascii=False)
else:
value_str = str(value)
self._redis_client.hset(self._redis_hash_key, key, value_str)
self._redis_client.expire(self._redis_hash_key, 3600)
if self._legacy_hash_key:
self._redis_client.hset(self._legacy_hash_key, key, value_str)
self._redis_client.expire(self._legacy_hash_key, 3600)
return True
except:
self._redis_connected = False
return False
def _load_all_to_redis(self):
"""将所有配置加载到Redis"""
if not self._redis_connected or not self._redis_client:
return
try:
# 批量设置所有配置到Redis账号维度
pipe = self._redis_client.pipeline()
for key, value in self._cache.items():
if isinstance(value, (dict, list, bool, int, float)):
value_str = json.dumps(value, ensure_ascii=False)
else:
value_str = str(value)
pipe.hset(self._redis_hash_key, key, value_str)
pipe.expire(self._redis_hash_key, 3600)
if self._legacy_hash_key:
for key, value in self._cache.items():
if isinstance(value, (dict, list, bool, int, float)):
value_str = json.dumps(value, ensure_ascii=False)
else:
value_str = str(value)
pipe.hset(self._legacy_hash_key, key, value_str)
pipe.expire(self._legacy_hash_key, 3600)
pipe.execute()
logger.debug(f"已将 {len(self._cache)} 个配置项同步到Redis")
except Exception as e:
logger.debug(f"同步配置到Redis失败: {e}")
def _load_from_db(self):
"""从数据库加载配置如果Redis中没有"""
if TradingConfig is None:
logger.warning("TradingConfig未导入无法从数据库加载配置")
self._cache = {}
return
try:
# 先尝试从Redis加载所有配置
if self._redis_connected and self._redis_client:
try:
# 测试连接是否真正可用
self._redis_client.ping()
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
if (not redis_configs) and self._legacy_hash_key:
redis_configs = self._redis_client.hgetall(self._legacy_hash_key)
if redis_configs and len(redis_configs) > 0:
# 解析Redis中的配置
for key, value_str in redis_configs.items():
self._cache[key] = self._coerce_redis_value(value_str)
logger.info(f"从Redis加载了 {len(self._cache)} 个配置项")
return
else:
# Redis中没有配置需要从数据库加载并同步到Redis
logger.debug("Redis中没有配置数据从数据库加载并同步到Redis")
except Exception as e:
logger.debug(f"从Redis加载配置失败: {e},回退到数据库")
# 连接失败时,标记为未连接
try:
self._redis_client.ping()
except:
self._redis_connected = False
# 从数据库加载配置仅在Redis不可用或Redis中没有数据时
configs = TradingConfig.get_all(account_id=self.account_id)
for config in configs:
key = config['config_key']
value = TradingConfig._convert_value(
config['config_value'],
config['config_type']
)
self._cache[key] = value
# 同时写入Redis缓存
self._set_to_redis(key, value)
logger.info(f"从数据库加载了 {len(self._cache)} 个配置项已同步到Redis")
except Exception as e:
logger.warning(f"从数据库加载配置失败,使用默认配置: {e}")
self._cache = {}
def get(self, key, default=None):
"""获取配置值"""
# 账号私有API Key/Secret/Testnet 从 accounts 表读取(不走 trading_config
if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None:
try:
api_key, api_secret, use_testnet, status = Account.get_credentials(self.account_id)
logger.debug(f"ConfigManager.get({key}, account_id={self.account_id}): api_key存在={bool(api_key)}, api_secret存在={bool(api_secret)}, status={status}")
if key == "BINANCE_API_KEY":
# 如果 api_key 为空字符串,返回 None 而不是 default避免返回 'your_api_key_here'
if not api_key or api_key.strip() == "":
logger.warning(f"ConfigManager.get(BINANCE_API_KEY, account_id={self.account_id}): API密钥为空字符串")
return None # 返回 None让调用方知道密钥未配置
return api_key
if key == "BINANCE_API_SECRET":
# 如果 api_secret 为空字符串,返回 None 而不是 default避免返回 'your_api_secret_here'
if not api_secret or api_secret.strip() == "":
logger.warning(f"ConfigManager.get(BINANCE_API_SECRET, account_id={self.account_id}): API密钥Secret为空字符串")
return None # 返回 None让调用方知道密钥未配置
return api_secret
return bool(use_testnet)
except Exception as e:
# 回退到后续逻辑(旧数据/无表)
logger.warning(f"ConfigManager.get({key}, account_id={self.account_id}): Account.get_credentials 失败: {e}")
pass
# 1. 优先从Redis缓存读取最新
# 注意只在Redis连接正常时尝试读取避免频繁连接失败
if self._redis_connected and self._redis_client:
redis_value = self._get_from_redis(key)
if redis_value is not None:
# 同时更新本地缓存
self._cache[key] = redis_value
return redis_value
# 2. 从本地缓存读取
if key in self._cache:
return self._cache[key]
# 3. 从环境变量读取
env_value = os.getenv(key)
if env_value is not None:
return env_value
# 4. 返回默认值
return default
def set(self, key, value, config_type='string', category='general', description=None):
"""设置配置同时更新数据库、Redis缓存和本地缓存"""
# 账号私有API Key/Secret/Testnet 写入 accounts 表
if key in ("BINANCE_API_KEY", "BINANCE_API_SECRET", "USE_TESTNET") and Account is not None:
try:
if key == "BINANCE_API_KEY":
Account.update_credentials(self.account_id, api_key=str(value or ""))
elif key == "BINANCE_API_SECRET":
Account.update_credentials(self.account_id, api_secret=str(value or ""))
else:
Account.update_credentials(self.account_id, use_testnet=bool(value))
self._cache[key] = value
return
except Exception as e:
logger.error(f"更新账号API配置失败: {e}")
raise
if TradingConfig is None:
logger.warning("TradingConfig未导入无法更新数据库配置")
self._cache[key] = value
# 仍然尝试更新Redis
self._set_to_redis(key, value)
return
try:
# 1. 更新数据库
TradingConfig.set(key, value, config_type, category, description, account_id=self.account_id)
# 2. 更新本地缓存
self._cache[key] = value
# 3. 更新Redis缓存确保trading_system能立即读取到最新配置
self._set_to_redis(key, value)
logger.info(f"配置已更新: {key} = {value} (已同步到数据库和Redis)")
except Exception as e:
logger.error(f"更新配置失败: {e}")
raise
def reload(self):
"""重新加载配置优先从Redis其次从数据库"""
self._cache = {}
self._load_from_db()
def reload_from_redis(self):
"""强制从Redis重新加载配置用于trading_system即时获取最新配置"""
# 如果Redis不可用不重新加载保持现有缓存
if not self._redis_connected or not self._redis_client:
logger.debug("Redis未连接跳过从Redis重新加载保持现有缓存")
return
try:
# 测试连接是否真正可用
self._redis_client.ping()
except Exception as e:
logger.debug(f"Redis连接不可用: {e}跳过从Redis重新加载")
self._redis_connected = False
return
try:
redis_configs = self._redis_client.hgetall(self._redis_hash_key)
if (not redis_configs) and self._legacy_hash_key:
redis_configs = self._redis_client.hgetall(self._legacy_hash_key)
if redis_configs and len(redis_configs) > 0:
self._cache = {} # 清空缓存
for key, value_str in redis_configs.items():
self._cache[key] = self._coerce_redis_value(value_str)
logger.debug(f"从Redis重新加载了 {len(self._cache)} 个配置项")
else:
# Redis中没有配置但不回退到数据库避免频繁从数据库加载
logger.debug("Redis中没有配置数据保持现有缓存")
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e},保持现有缓存")
# 连接失败时,标记为未连接
try:
self._redis_client.ping()
except:
self._redis_connected = False
def get_trading_config(self):
"""获取交易配置字典兼容原有config.py的TRADING_CONFIG"""
# 全局策略配置管理器从独立的global_strategy_config表读取
global_config_mgr = GlobalStrategyConfigManager()
try:
global_config_mgr.reload_from_redis()
except Exception:
pass
def eff_get(key: str, default: Any):
"""
策略核心从全局配置表读取global_strategy_config
风险旋钮:从当前账号读取。
"""
# API key/secret/testnet 永远按账号读取(在 get() 内部已处理)
if key in RISK_KNOBS_KEYS:
return self.get(key, default)
# 从全局配置表读取
try:
return global_config_mgr.get(key, default)
except Exception:
return self.get(key, default)
return {
# 仓位控制
'MAX_POSITION_PERCENT': eff_get('MAX_POSITION_PERCENT', 0.08), # 单笔最大保证金占比
'MAX_TOTAL_POSITION_PERCENT': eff_get('MAX_TOTAL_POSITION_PERCENT', 0.40), # 总保证金占比上限
'MIN_POSITION_PERCENT': eff_get('MIN_POSITION_PERCENT', 0.02), # 最小保证金占比
'MIN_MARGIN_USDT': eff_get('MIN_MARGIN_USDT', 5.0), # 最小保证金USDT
# 用户风险旋钮:自动交易开关/频次控制
'AUTO_TRADE_ENABLED': eff_get('AUTO_TRADE_ENABLED', True),
'MAX_OPEN_POSITIONS': eff_get('MAX_OPEN_POSITIONS', 3),
'MAX_DAILY_ENTRIES': eff_get('MAX_DAILY_ENTRIES', 8),
# 涨跌幅阈值
'MIN_CHANGE_PERCENT': eff_get('MIN_CHANGE_PERCENT', 2.0),
# 风险控制
'STOP_LOSS_PERCENT': eff_get('STOP_LOSS_PERCENT', 0.10), # 默认10%
'TAKE_PROFIT_PERCENT': eff_get('TAKE_PROFIT_PERCENT', 0.30), # 默认30%(降低止盈目标,更容易触发平仓)
'MIN_STOP_LOSS_PRICE_PCT': eff_get('MIN_STOP_LOSS_PRICE_PCT', 0.02), # 默认2%
'MIN_TAKE_PROFIT_PRICE_PCT': eff_get('MIN_TAKE_PROFIT_PRICE_PCT', 0.02), # 默认2%防止ATR过小时计算出不切实际的微小止盈距离
'USE_ATR_STOP_LOSS': eff_get('USE_ATR_STOP_LOSS', True), # 是否使用ATR动态止损
'ATR_STOP_LOSS_MULTIPLIER': eff_get('ATR_STOP_LOSS_MULTIPLIER', 2.0), # ATR止损倍数2.0(容忍山寨币高波动)
'ATR_TAKE_PROFIT_MULTIPLIER': eff_get('ATR_TAKE_PROFIT_MULTIPLIER', 4.0), # ATR止盈倍数4.0配合RISK_REWARD_RATIO 4.0
'RISK_REWARD_RATIO': eff_get('RISK_REWARD_RATIO', 4.0), # 盈亏比4:1山寨币必须追求大赢家
'ATR_PERIOD': eff_get('ATR_PERIOD', 14), # ATR计算周期
'USE_DYNAMIC_ATR_MULTIPLIER': eff_get('USE_DYNAMIC_ATR_MULTIPLIER', False), # 是否根据波动率动态调整ATR倍数
'ATR_MULTIPLIER_MIN': eff_get('ATR_MULTIPLIER_MIN', 1.5), # 动态ATR倍数最小值
'ATR_MULTIPLIER_MAX': eff_get('ATR_MULTIPLIER_MAX', 2.5), # 动态ATR倍数最大值
# 固定风险百分比仓位计算(凯利公式)
'USE_FIXED_RISK_SIZING': eff_get('USE_FIXED_RISK_SIZING', True), # 使用固定风险百分比计算仓位
'FIXED_RISK_PERCENT': eff_get('FIXED_RISK_PERCENT', 0.02), # 每笔单子承受的风险2%
# 市场扫描30分钟主周期
'SCAN_INTERVAL': eff_get('SCAN_INTERVAL', 1800), # 30分钟增加交易机会
'TOP_N_SYMBOLS': eff_get('TOP_N_SYMBOLS', 8), # 每次扫描后处理的交易对数量增加到8给更多选择余地
'MAX_SCAN_SYMBOLS': eff_get('MAX_SCAN_SYMBOLS', 250), # 扫描的最大交易对数量增加到250提升覆盖率到46%
'EXCLUDE_MAJOR_COINS': eff_get('EXCLUDE_MAJOR_COINS', True), # 是否排除主流币BTC、ETH、BNB等专注于山寨币
'KLINE_INTERVAL': eff_get('KLINE_INTERVAL', '1h'),
'PRIMARY_INTERVAL': eff_get('PRIMARY_INTERVAL', '1h'),
'CONFIRM_INTERVAL': eff_get('CONFIRM_INTERVAL', '4h'),
'ENTRY_INTERVAL': eff_get('ENTRY_INTERVAL', '15m'),
# 过滤条件
'MIN_VOLUME_24H': eff_get('MIN_VOLUME_24H', 10000000),
'MIN_VOLATILITY': eff_get('MIN_VOLATILITY', 0.02),
# 高胜率策略参数
'MIN_SIGNAL_STRENGTH': eff_get('MIN_SIGNAL_STRENGTH', 5),
'LEVERAGE': eff_get('LEVERAGE', 10),
'USE_DYNAMIC_LEVERAGE': eff_get('USE_DYNAMIC_LEVERAGE', True),
'MAX_LEVERAGE': eff_get('MAX_LEVERAGE', 15), # 降低到15更保守配合更大的保证金
# 移动止损:默认关闭(避免过早截断利润,让利润奔跑)
'USE_TRAILING_STOP': eff_get('USE_TRAILING_STOP', False),
'TRAILING_STOP_ACTIVATION': eff_get('TRAILING_STOP_ACTIVATION', 0.10), # 默认10%(给趋势更多空间)
'TRAILING_STOP_PROTECT': eff_get('TRAILING_STOP_PROTECT', 0.05), # 默认5%(保护更多利润)
# 最小持仓时间锁(强制波段持仓纪律,避免分钟级平仓)
'MIN_HOLD_TIME_SEC': eff_get('MIN_HOLD_TIME_SEC', 1800), # 默认30分钟1800秒
# 自动交易过滤(用于提升胜率/控频)
# 说明:这两个 key 需要出现在 TRADING_CONFIG 中,否则 trading_system 在每次 reload_from_redis 后会丢失它们,
# 导致始终按默认值拦截自动交易(用户在配置页怎么开都没用)。
'AUTO_TRADE_ONLY_TRENDING': eff_get('AUTO_TRADE_ONLY_TRENDING', True),
'AUTO_TRADE_ALLOW_4H_NEUTRAL': eff_get('AUTO_TRADE_ALLOW_4H_NEUTRAL', False),
# 智能入场/限价偏移(部分逻辑会直接读取 TRADING_CONFIG
'LIMIT_ORDER_OFFSET_PCT': eff_get('LIMIT_ORDER_OFFSET_PCT', 0.5),
'SMART_ENTRY_ENABLED': eff_get('SMART_ENTRY_ENABLED', False),
'SMART_ENTRY_STRONG_SIGNAL': eff_get('SMART_ENTRY_STRONG_SIGNAL', 8),
'ENTRY_SYMBOL_COOLDOWN_SEC': eff_get('ENTRY_SYMBOL_COOLDOWN_SEC', 120),
'ENTRY_TIMEOUT_SEC': eff_get('ENTRY_TIMEOUT_SEC', 180),
'ENTRY_STEP_WAIT_SEC': eff_get('ENTRY_STEP_WAIT_SEC', 15),
'ENTRY_CHASE_MAX_STEPS': eff_get('ENTRY_CHASE_MAX_STEPS', 4),
'ENTRY_MARKET_FALLBACK_AFTER_SEC': eff_get('ENTRY_MARKET_FALLBACK_AFTER_SEC', 45),
'ENTRY_CONFIRM_TIMEOUT_SEC': eff_get('ENTRY_CONFIRM_TIMEOUT_SEC', 30),
'ENTRY_MAX_DRIFT_PCT_TRENDING': eff_get('ENTRY_MAX_DRIFT_PCT_TRENDING', 0.6),
'ENTRY_MAX_DRIFT_PCT_RANGING': eff_get('ENTRY_MAX_DRIFT_PCT_RANGING', 0.3),
}
def _sync_to_redis(self):
"""将配置同步到Redis缓存账号维度"""
if not self._redis_connected or not self._redis_client:
return
try:
payload = {k: json.dumps(v) for k, v in self._cache.items()}
self._redis_client.hset(self._redis_hash_key, mapping=payload)
self._redis_client.expire(self._redis_hash_key, 3600)
if self._legacy_hash_key:
self._redis_client.hset(self._legacy_hash_key, mapping=payload)
self._redis_client.expire(self._legacy_hash_key, 3600)
except Exception as e:
logger.debug(f"同步配置到Redis失败: {e}")
# 全局配置管理器实例默认账号trading_system 进程可通过 ATS_ACCOUNT_ID 指定)
try:
_default_account_id = int(os.getenv("ATS_ACCOUNT_ID") or os.getenv("ACCOUNT_ID") or 1)
except Exception:
_default_account_id = 1
config_manager = ConfigManager.for_account(_default_account_id)
# 兼容原有config.py的接口
def get_config(key, default=None):
"""获取配置(兼容函数)"""
return config_manager.get(key, default)
def get_trading_config():
"""获取交易配置(兼容函数)"""
return config_manager.get_trading_config()