auto_trade_sys/trading_system/main.py
薇薇安 3b0ff0227e a
2026-01-21 17:15:58 +08:00

326 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
主程序 - 币安自动交易系统入口
"""
import asyncio
import logging
import sys
from pathlib import Path
# 启动方式兼容:
# - python trading_system/main.py__package__ 为空,需从同目录导入)
# - python -m trading_system.main__package__='trading_system',必须用相对导入)
if __package__ in (None, ""):
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from position_manager import PositionManager
from strategy import TradingStrategy
import config
else:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .position_manager import PositionManager
from .strategy import TradingStrategy
from . import config
# 配置日志(支持相对路径)
log_file = config.LOG_FILE
if not Path(log_file).is_absolute():
# 如果是相对路径,相对于项目根目录
project_root = Path(__file__).parent.parent
log_file = project_root / log_file
# 设置日志时间格式为北京时间UTC+8
import time
from datetime import timezone, timedelta
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
# 转换为北京时间UTC+8
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
from datetime import datetime
# 创建格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 创建处理器
file_handler = logging.FileHandler(str(log_file), encoding='utf-8')
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 配置日志
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
handlers=[file_handler, console_handler]
)
# 追加:将 ERROR 日志写入 Redis不影响现有文件/控制台日志)
try:
# 兼容两种启动方式:
# - 直接运行python trading_system/main.py
# - 模块运行python -m trading_system.main
try:
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
except Exception:
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_cfg = RedisLogConfig(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
service="trading_system",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
# 让 handler 自己按组筛选error/warning/info这里只需要放宽到 INFO
redis_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(redis_handler)
# 诊断:启动时快速检测一次 Redis 可用性(失败不影响运行)
try:
client = redis_handler._get_redis() # noqa: SLF001仅用于诊断
if client is None:
logger = logging.getLogger(__name__)
logger.warning(
f"⚠ Redis 日志写入未启用无法连接或缺少依赖。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
)
else:
logger = logging.getLogger(__name__)
logger.info(
f"✓ Redis 日志写入已启用。REDIS_URL={getattr(config, 'REDIS_URL', None)}"
)
except Exception:
pass
except Exception:
# Redis handler 仅用于增强监控,失败不影响交易系统启动
pass
logger = logging.getLogger(__name__)
async def main():
"""主函数"""
logger.info("=" * 60)
logger.info("币安自动交易系统启动")
logger.info("=" * 60)
# 检查配置管理器状态
logger.info("检查配置管理器状态...")
# 强制重新初始化配置管理器(确保能读取到数据库配置)
try:
logger.info("重新初始化配置管理器...")
# 重置全局变量,强制重新初始化
config._config_manager = None
config.USE_DB_CONFIG = False
config._init_config_manager()
if config._config_manager:
config._config_manager.reload()
logger.info(f"✓ 配置管理器初始化成功,已加载 {len(config._config_manager._cache)} 个配置项")
# 打印一些关键配置项,用于调试
test_key = config._config_manager.get('BINANCE_API_KEY')
test_secret = config._config_manager.get('BINANCE_API_SECRET')
logger.info(f"从数据库读取: BINANCE_API_KEY存在={bool(test_key)}, BINANCE_API_SECRET存在={bool(test_secret)}")
if test_key:
logger.info(f"BINANCE_API_KEY前4位: {test_key[:4]}...")
else:
logger.warning("⚠ 配置管理器初始化失败返回None")
except Exception as e:
logger.error(f"配置管理器初始化异常: {e}", exc_info=True)
import traceback
logger.error(traceback.format_exc())
if config.USE_DB_CONFIG:
logger.info("✓ 使用数据库配置")
# 尝试重新加载配置(确保获取最新值)
try:
config.reload_config()
logger.info("配置已重新加载")
except Exception as e:
logger.warning(f"重新加载配置失败: {e}", exc_info=True)
else:
logger.warning("⚠ 未使用数据库配置,将使用环境变量和默认配置")
logger.warning("如果前端已配置API密钥请检查")
logger.warning("1. backend目录是否存在")
logger.warning("2. 数据库连接是否正常")
logger.warning("3. config_manager模块是否可以正常导入")
logger.warning("4. 数据库配置表中是否有BINANCE_API_KEY和BINANCE_API_SECRET")
# 强制重新加载配置(确保使用最新值)
try:
config.reload_config()
logger.info("配置已重新加载")
except Exception as e:
logger.warning(f"重新加载配置失败: {e}", exc_info=True)
# 检查API密钥重新获取确保是最新值
api_key = config._get_config_value('BINANCE_API_KEY', '')
api_secret = config._get_config_value('BINANCE_API_SECRET', '')
# 如果从配置管理器获取失败尝试直接从config_manager获取
if (not api_key or api_key == 'your_api_key_here') and config._config_manager:
try:
api_key = config._config_manager.get('BINANCE_API_KEY', '')
logger.info(f"直接从config_manager获取API_KEY: 存在={bool(api_key)}")
except Exception as e:
logger.warning(f"从config_manager获取API_KEY失败: {e}")
if (not api_secret or api_secret == 'your_api_secret_here') and config._config_manager:
try:
api_secret = config._config_manager.get('BINANCE_API_SECRET', '')
logger.info(f"直接从config_manager获取API_SECRET: 存在={bool(api_secret)}")
except Exception as e:
logger.warning(f"从config_manager获取API_SECRET失败: {e}")
logger.info(f"API密钥检查: KEY存在={bool(api_key)}, SECRET存在={bool(api_secret)}")
if api_key and api_key != 'your_api_key_here' and len(api_key) > 4:
logger.info(f"API密钥前4位: {api_key[:4]}...")
else:
logger.warning(f"⚠ API密钥未正确加载当前值: {api_key}")
if config._config_manager:
logger.info(f"配置管理器缓存中的键: {list(config._config_manager._cache.keys())[:10]}")
if not api_key or not api_secret or api_key == 'your_api_key_here' or api_secret == 'your_api_secret_here':
logger.error("=" * 60)
logger.error("API密钥未配置")
logger.error("=" * 60)
if config.USE_DB_CONFIG:
logger.error("配置管理器已启用但未从数据库读取到API密钥")
logger.error("请检查:")
logger.error("1. 前端配置界面是否已设置BINANCE_API_KEY和BINANCE_API_SECRET")
logger.error("2. 数据库trading_config表中是否有这些配置项")
logger.error("3. 数据库连接是否正常")
else:
logger.error("请设置 BINANCE_API_KEY 和 BINANCE_API_SECRET 环境变量")
logger.error("或在 config.py 中直接配置")
logger.error("或确保backend目录存在以便从数据库读取配置")
logger.error("=" * 60)
return
# 更新config模块的API密钥确保使用最新值
config.BINANCE_API_KEY = api_key
config.BINANCE_API_SECRET = api_secret
import os
config.USE_TESTNET = config._get_config_value('USE_TESTNET', False) if config._get_config_value('USE_TESTNET') is not None else os.getenv('USE_TESTNET', 'False').lower() == 'true'
logger.info(f"最终使用的API密钥: KEY前4位={api_key[:4] if api_key and len(api_key) > 4 else 'N/A'}..., SECRET前4位={api_secret[:4] if api_secret and len(api_secret) > 4 else 'N/A'}..., 测试网={config.USE_TESTNET}")
# 初始化组件
client = None
try:
# 1. 初始化币安客户端
logger.info("初始化币安客户端...")
# 再次确认API密钥使用最新值
api_key = config._get_config_value('BINANCE_API_KEY', config.BINANCE_API_KEY)
api_secret = config._get_config_value('BINANCE_API_SECRET', config.BINANCE_API_SECRET)
use_testnet = config._get_config_value('USE_TESTNET', config.USE_TESTNET)
if isinstance(use_testnet, str):
use_testnet = use_testnet.lower() in ('true', '1', 'yes', 'on')
elif not isinstance(use_testnet, bool):
use_testnet = bool(use_testnet)
logger.info(f"测试网模式: {use_testnet}")
logger.info(f"连接超时: {config.CONNECTION_TIMEOUT}")
logger.info(f"重试次数: {config.CONNECTION_RETRIES}")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
await client.connect()
# 2. 检查账户余额
logger.info("检查账户余额...")
balance = await client.get_account_balance()
if balance['total'] == 0 and balance['available'] == 0:
logger.error("=" * 60)
logger.error("无法获取账户余额可能是API权限问题")
logger.error("请检查:")
logger.error("1. API密钥是否正确")
logger.error("2. API密钥是否启用了'合约交易'权限")
logger.error("3. IP地址是否在白名单中如果设置了IP限制")
logger.error("4. 测试网/生产网环境是否匹配")
logger.error("=" * 60)
return
logger.info(
f"账户余额: 总余额 {balance['total']:.2f} USDT, "
f"可用余额 {balance['available']:.2f} USDT"
)
if balance['available'] <= 0:
logger.error("账户可用余额不足,无法交易")
logger.error(f"总余额: {balance['total']:.2f} USDT")
logger.error("请先充值到合约账户")
return
# 4. 初始化各个模块
logger.info("初始化交易模块...")
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
position_manager = PositionManager(client, risk_manager)
# 初始化推荐器(用于自动生成推荐)
recommender = None
try:
from trade_recommender import TradeRecommender
recommender = TradeRecommender(client, scanner, risk_manager)
logger.info("✓ 推荐器已初始化,将自动生成交易推荐")
except Exception as e:
logger.warning(f"⚠ 推荐器初始化失败: {e},将不生成推荐")
logger.debug(f"错误详情: {type(e).__name__}: {e}")
strategy = TradingStrategy(client, scanner, risk_manager, position_manager, recommender=recommender)
# 4. 打印配置信息
logger.info("交易配置:")
logger.info(f" 单笔最大仓位: {config.TRADING_CONFIG['MAX_POSITION_PERCENT']*100:.1f}%")
logger.info(f" 总仓位上限: {config.TRADING_CONFIG['MAX_TOTAL_POSITION_PERCENT']*100:.1f}%")
logger.info(f" 最小涨跌幅阈值: {config.TRADING_CONFIG['MIN_CHANGE_PERCENT']:.1f}%")
logger.info(f" 扫描间隔: {config.TRADING_CONFIG['SCAN_INTERVAL']}")
logger.info(f" 扫描交易对数量: {config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500)} (0=全部)")
logger.info(f" 处理交易对数量: {config.TRADING_CONFIG['TOP_N_SYMBOLS']}")
logger.info(f" 止损: {config.TRADING_CONFIG['STOP_LOSS_PERCENT']*100:.1f}%")
logger.info(f" 止盈: {config.TRADING_CONFIG['TAKE_PROFIT_PERCENT']*100:.1f}%")
logger.info(f" 测试网模式: {config.USE_TESTNET}")
logger.info("=" * 60)
# 5. 启动交易策略
logger.info("启动交易策略...")
await strategy.execute_strategy()
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭...")
except Exception as e:
logger.error(f"程序运行出错: {e}", exc_info=True)
finally:
# 清理资源
if client:
await client.disconnect()
logger.info("程序已退出")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"程序异常退出: {e}", exc_info=True)