auto_trade_sys/trading_system/main.py
薇薇安 87e7865cbb a
2026-01-21 22:48:01 +08:00

356 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
主程序 - 币安自动交易系统入口
"""
import asyncio
import logging
import sys
from pathlib import Path
# 启动方式兼容(更鲁棒):
# - supervisor 推荐python -m trading_system.main相对导入
# - 手动调试python trading_system/main.py同目录导入
# - 其它非常规启动方式:尽量通过补齐 sys.path 避免本地模块找不到
try:
from .binance_client import BinanceClient # type: ignore
from .market_scanner import MarketScanner # type: ignore
from .risk_manager import RiskManager # type: ignore
from .position_manager import PositionManager # type: ignore
from .strategy import TradingStrategy # type: ignore
from . import config # type: ignore
except Exception:
_here = Path(__file__).resolve().parent
_root = _here.parent
# 某些 supervisor/启动脚本可能会导致 sys.path 没包含 trading_system 目录
if str(_here) not in sys.path:
sys.path.insert(0, str(_here))
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
from binance_client import BinanceClient # type: ignore
from market_scanner import MarketScanner # type: ignore
from risk_manager import RiskManager # type: ignore
from position_manager import PositionManager # type: ignore
from strategy import TradingStrategy # type: ignore
import config # type: ignore
# 配置日志(支持相对路径)
log_file = config.LOG_FILE
if not Path(log_file).is_absolute():
# 如果是相对路径,相对于项目根目录
project_root = Path(__file__).parent.parent
log_file = project_root / log_file
# 设置日志时间格式为北京时间UTC+8
import time
from datetime import timezone, timedelta
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
# 转换为北京时间UTC+8
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
from datetime import datetime
# 创建格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 创建处理器
file_handler = logging.FileHandler(str(log_file), encoding='utf-8')
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 配置日志
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
handlers=[file_handler, console_handler]
)
# 追加:将 ERROR 日志写入 Redis不影响现有文件/控制台日志)
try:
# 兼容两种启动方式:
# - 直接运行python trading_system/main.py
# - 模块运行python -m trading_system.main
try:
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
except Exception:
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_cfg = RedisLogConfig(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
service="trading_system",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
# 让 handler 自己按组筛选error/warning/info这里只需要放宽到 INFO
redis_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(redis_handler)
# 诊断:启动时快速检测一次 Redis 可用性(失败不影响运行)
try:
client = redis_handler._get_redis() # noqa: SLF001仅用于诊断
if client is None:
logger = logging.getLogger(__name__)
logger.warning(
f"⚠ Redis 日志写入未启用无法连接或缺少依赖。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
)
else:
logger = logging.getLogger(__name__)
logger.info(
f"✓ Redis 日志写入已启用。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
)
except Exception:
pass
except Exception:
# Redis handler 仅用于增强监控,失败不影响交易系统启动
pass
logger = logging.getLogger(__name__)
async def main():
"""主函数"""
logger.info("=" * 60)
logger.info("币安自动交易系统启动")
logger.info("=" * 60)
# 检查配置管理器状态
logger.info("检查配置管理器状态...")
# 强制重新初始化配置管理器(确保能读取到数据库配置)
try:
logger.info("重新初始化配置管理器...")
# 重置全局变量,强制重新初始化
config._config_manager = None
config.USE_DB_CONFIG = False
config._init_config_manager()
if config._config_manager:
config._config_manager.reload()
logger.info(f"✓ 配置管理器初始化成功,已加载 {len(config._config_manager._cache)} 个配置项")
# 打印一些关键配置项,用于调试
test_key = config._config_manager.get('BINANCE_API_KEY')
test_secret = config._config_manager.get('BINANCE_API_SECRET')
logger.info(f"从数据库读取: BINANCE_API_KEY存在={bool(test_key)}, BINANCE_API_SECRET存在={bool(test_secret)}")
if test_key:
logger.info(f"BINANCE_API_KEY前4位: {test_key[:4]}...")
else:
logger.warning("⚠ 配置管理器初始化失败返回None")
except Exception as e:
logger.error(f"配置管理器初始化异常: {e}", exc_info=True)
import traceback
logger.error(traceback.format_exc())
if config.USE_DB_CONFIG:
logger.info("✓ 使用数据库配置")
# 尝试重新加载配置(确保获取最新值)
try:
config.reload_config()
logger.info("配置已重新加载")
except Exception as e:
logger.warning(f"重新加载配置失败: {e}", exc_info=True)
else:
logger.warning("⚠ 未使用数据库配置,将使用环境变量和默认配置")
logger.warning("如果前端已配置API密钥请检查")
logger.warning("1. backend目录是否存在")
logger.warning("2. 数据库连接是否正常")
logger.warning("3. config_manager模块是否可以正常导入")
logger.warning("4. 数据库配置表中是否有BINANCE_API_KEY和BINANCE_API_SECRET")
# 强制重新加载配置(确保使用最新值)
try:
config.reload_config()
logger.info("配置已重新加载")
except Exception as e:
logger.warning(f"重新加载配置失败: {e}", exc_info=True)
# 检查API密钥重新获取确保是最新值
api_key = config._get_config_value('BINANCE_API_KEY', '')
api_secret = config._get_config_value('BINANCE_API_SECRET', '')
# 如果从配置管理器获取失败尝试直接从config_manager获取
if (not api_key or api_key == 'your_api_key_here') and config._config_manager:
try:
api_key = config._config_manager.get('BINANCE_API_KEY', '')
logger.info(f"直接从config_manager获取API_KEY: 存在={bool(api_key)}")
except Exception as e:
logger.warning(f"从config_manager获取API_KEY失败: {e}")
if (not api_secret or api_secret == 'your_api_secret_here') and config._config_manager:
try:
api_secret = config._config_manager.get('BINANCE_API_SECRET', '')
logger.info(f"直接从config_manager获取API_SECRET: 存在={bool(api_secret)}")
except Exception as e:
logger.warning(f"从config_manager获取API_SECRET失败: {e}")
logger.info(f"API密钥检查: KEY存在={bool(api_key)}, SECRET存在={bool(api_secret)}")
if api_key and api_key != 'your_api_key_here' and len(api_key) > 4:
logger.info(f"API密钥前4位: {api_key[:4]}...")
else:
logger.warning(f"⚠ API密钥未正确加载当前值: {api_key}")
if config._config_manager:
logger.info(f"配置管理器缓存中的键: {list(config._config_manager._cache.keys())[:10]}")
if not api_key or not api_secret or api_key == 'your_api_key_here' or api_secret == 'your_api_secret_here':
logger.error("=" * 60)
logger.error("API密钥未配置")
logger.error("=" * 60)
if config.USE_DB_CONFIG:
logger.error("配置管理器已启用但未从数据库读取到API密钥")
logger.error("请检查:")
logger.error("1. 前端配置界面是否已设置BINANCE_API_KEY和BINANCE_API_SECRET")
logger.error("2. 数据库trading_config表中是否有这些配置项")
logger.error("3. 数据库连接是否正常")
else:
logger.error("请设置 BINANCE_API_KEY 和 BINANCE_API_SECRET 环境变量")
logger.error("或在 config.py 中直接配置")
logger.error("或确保backend目录存在以便从数据库读取配置")
logger.error("=" * 60)
return
# 更新config模块的API密钥确保使用最新值
config.BINANCE_API_KEY = api_key
config.BINANCE_API_SECRET = api_secret
import os
config.USE_TESTNET = config._get_config_value('USE_TESTNET', False) if config._get_config_value('USE_TESTNET') is not None else os.getenv('USE_TESTNET', 'False').lower() == 'true'
logger.info(f"最终使用的API密钥: KEY前4位={api_key[:4] if api_key and len(api_key) > 4 else 'N/A'}..., SECRET前4位={api_secret[:4] if api_secret and len(api_secret) > 4 else 'N/A'}..., 测试网={config.USE_TESTNET}")
# 初始化组件
client = None
try:
# 1. 初始化币安客户端
logger.info("初始化币安客户端...")
# 再次确认API密钥使用最新值
api_key = config._get_config_value('BINANCE_API_KEY', config.BINANCE_API_KEY)
api_secret = config._get_config_value('BINANCE_API_SECRET', config.BINANCE_API_SECRET)
use_testnet = config._get_config_value('USE_TESTNET', config.USE_TESTNET)
if isinstance(use_testnet, str):
use_testnet = use_testnet.lower() in ('true', '1', 'yes', 'on')
elif not isinstance(use_testnet, bool):
use_testnet = bool(use_testnet)
logger.info(f"测试网模式: {use_testnet}")
logger.info(f"连接超时: {config.CONNECTION_TIMEOUT}")
logger.info(f"重试次数: {config.CONNECTION_RETRIES}")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
await client.connect()
# 2. 检查账户余额/权限
logger.info("检查账户余额...")
balance = await client.get_account_balance()
# 若底层调用失败(例如 -2015 / -1022 / IP白名单这里给出明确错误并以 code=2 退出supervisor 不会反复拉起)
if isinstance(balance, dict) and balance.get("ok") is False:
code = balance.get("error_code")
msg = balance.get("error_msg") or ""
logger.error("=" * 60)
logger.error(f"获取账户余额失败(可能是权限/白名单/环境不匹配。error_code={code}, error={msg}")
logger.error("请检查:")
logger.error("1) API Key/Secret 是否正确(不要有空格/换行)")
logger.error("2) API Key 是否启用了【合约交易USDT-M Futures】权限")
logger.error("3) 若设置了 IP 白名单,请把服务器出口 IP 加进去")
logger.error("4) 测试网/生产网是否匹配(账号的 USE_TESTNET 设置要与 Key 所属环境一致)")
logger.error("=" * 60)
raise SystemExit(2)
total = float((balance or {}).get("total") or 0.0)
available = float((balance or {}).get("available") or 0.0)
logger.info(f"账户余额: 总余额 {total:.2f} USDT, 可用余额 {available:.2f} USDT")
# 若余额为 0不直接退出保持进程运行并周期性重试便于充值后自动恢复
if available <= 0:
logger.error("=" * 60)
logger.error("账户可用余额不足(<=0交易策略不会启动将每 60 秒重试一次余额读取。")
logger.error(f"当前余额: total={total:.2f} USDT, available={available:.2f} USDT")
logger.error("提示:若你确信余额不为 0但仍显示为 0请优先检查 API 权限/IP 白名单/测试网配置。")
logger.error("=" * 60)
while True:
await asyncio.sleep(60)
try:
b2 = await client.get_account_balance()
if isinstance(b2, dict) and b2.get("ok") is False:
logger.error(f"余额重试失败error_code={b2.get('error_code')}, error={b2.get('error_msg')}")
continue
total = float((b2 or {}).get("total") or 0.0)
available = float((b2 or {}).get("available") or 0.0)
logger.info(f"余额重试: total={total:.2f}, available={available:.2f}")
if available > 0:
logger.info("检测到可用余额 > 0开始启动交易策略。")
break
except Exception as e:
logger.error(f"余额重试异常: {e}", exc_info=True)
continue
# 4. 初始化各个模块
logger.info("初始化交易模块...")
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
position_manager = PositionManager(client, risk_manager)
# 初始化推荐器(用于自动生成推荐)
recommender = None
try:
from trade_recommender import TradeRecommender
recommender = TradeRecommender(client, scanner, risk_manager)
logger.info("✓ 推荐器已初始化,将自动生成交易推荐")
except Exception as e:
logger.warning(f"⚠ 推荐器初始化失败: {e},将不生成推荐")
logger.debug(f"错误详情: {type(e).__name__}: {e}")
strategy = TradingStrategy(client, scanner, risk_manager, position_manager, recommender=recommender)
# 4. 打印配置信息
logger.info("交易配置:")
logger.info(f" 单笔最大仓位: {config.TRADING_CONFIG['MAX_POSITION_PERCENT']*100:.1f}%")
logger.info(f" 总仓位上限: {config.TRADING_CONFIG['MAX_TOTAL_POSITION_PERCENT']*100:.1f}%")
logger.info(f" 最小涨跌幅阈值: {config.TRADING_CONFIG['MIN_CHANGE_PERCENT']:.1f}%")
logger.info(f" 扫描间隔: {config.TRADING_CONFIG['SCAN_INTERVAL']}")
logger.info(f" 扫描交易对数量: {config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500)} (0=全部)")
logger.info(f" 处理交易对数量: {config.TRADING_CONFIG['TOP_N_SYMBOLS']}")
logger.info(f" 止损: {config.TRADING_CONFIG['STOP_LOSS_PERCENT']*100:.1f}%")
logger.info(f" 止盈: {config.TRADING_CONFIG['TAKE_PROFIT_PERCENT']*100:.1f}%")
logger.info(f" 测试网模式: {config.USE_TESTNET}")
logger.info("=" * 60)
# 5. 启动交易策略
logger.info("启动交易策略...")
await strategy.execute_strategy()
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭...")
except Exception as e:
logger.error(f"程序运行出错: {e}", exc_info=True)
raise
finally:
# 清理资源
if client:
await client.disconnect()
logger.info("程序已退出")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"程序异常退出: {e}", exc_info=True)
raise