auto_trade_sys/trading_system/strategy.py
薇薇安 75653be44c a
2026-01-14 19:27:04 +08:00

413 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易策略模块 - 实现交易逻辑和优化
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .position_manager import PositionManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from position_manager import PositionManager
import config
logger = logging.getLogger(__name__)
class TradingStrategy:
"""交易策略类"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager,
position_manager: PositionManager
):
"""
初始化交易策略
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
position_manager: 仓位管理器
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
self.position_manager = position_manager
self.running = False
self._monitoring_started = False # 是否已启动监控
self._sync_task = None # 定期同步任务
async def execute_strategy(self):
"""
执行交易策略
"""
self.running = True
logger.info("交易策略开始执行...")
# 启动所有现有持仓的WebSocket实时监控
if not self._monitoring_started:
try:
await self.position_manager.start_all_position_monitoring()
self._monitoring_started = True
except Exception as e:
logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式")
# 启动定期同步任务(独立于市场扫描)
self._sync_task = asyncio.create_task(self._periodic_sync_positions())
logger.info("定期持仓同步任务已启动")
try:
while self.running:
# 1. 扫描市场找出涨跌幅最大的前N个货币对
top_symbols = await self.scanner.scan_market()
if not top_symbols:
logger.warning("未找到符合条件的交易对,等待下次扫描...")
await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL'])
continue
# 2. 对每个交易对执行交易逻辑(使用技术指标)
for symbol_info in top_symbols:
if not self.running:
break
symbol = symbol_info['symbol']
change_percent = symbol_info['changePercent']
direction = symbol_info['direction']
market_regime = symbol_info.get('marketRegime', 'unknown')
logger.info(
f"处理交易对: {symbol} "
f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})"
)
# 检查是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
continue
# 优化:结合成交量确认
if not await self._check_volume_confirmation(symbol_info):
logger.info(f"{symbol} 成交量确认失败,跳过")
continue
# 使用技术指标判断交易信号(高胜率策略)
trade_signal = await self._analyze_trade_signal(symbol_info)
# 记录交易信号到数据库(只有当有明确方向时才记录)
signal_direction = trade_signal.get('direction')
if signal_direction: # 只有当方向不为空时才记录
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradingSignal
TradingSignal.create(
symbol=symbol,
signal_direction=signal_direction,
signal_strength=trade_signal.get('strength', 0),
signal_reason=trade_signal.get('reason', ''),
rsi=symbol_info.get('rsi'),
macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
market_regime=symbol_info.get('marketRegime')
)
except Exception as e:
logger.debug(f"记录交易信号失败: {e}")
if not trade_signal['should_trade']:
logger.info(
f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过"
)
continue
# 确定交易方向(基于技术指标)
trade_direction = trade_signal['direction']
entry_reason = trade_signal['reason']
logger.info(
f"{symbol} 交易信号: {trade_direction} | "
f"原因: {entry_reason} | "
f"信号强度: {trade_signal.get('strength', 0)}/10"
)
# 开仓(使用改进的仓位管理)
position = await self.position_manager.open_position(
symbol=symbol,
change_percent=change_percent,
leverage=config.TRADING_CONFIG.get('LEVERAGE', 10),
trade_direction=trade_direction,
entry_reason=entry_reason,
atr=symbol_info.get('atr')
)
if position:
logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})")
# 开仓成功后WebSocket监控会在position_manager中自动启动
else:
logger.warning(f"{symbol} 开仓失败")
# 避免同时处理太多交易对
await asyncio.sleep(1)
# 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致)
try:
await self.position_manager.sync_positions_with_binance()
except Exception as e:
logger.warning(f"持仓状态同步失败: {e}")
# 4. 检查止损止盈作为备用检查WebSocket实时监控是主要方式
# 注意如果启用了WebSocket实时监控这里主要是作为备用检查
closed = await self.position_manager.check_stop_loss_take_profit()
if closed:
logger.info(f"定时检查触发平仓: {', '.join(closed)}")
# 5. 打印持仓摘要并记录账户快照
summary = await self.position_manager.get_position_summary()
if summary:
logger.info(
f"持仓摘要: {summary['totalPositions']} 个持仓, "
f"总盈亏: {summary['totalPnL']:.2f} USDT, "
f"可用余额: {summary['availableBalance']:.2f} USDT"
)
# 输出价格缓存统计
cache_size = len(self.client._price_cache)
if cache_size > 0:
logger.info(f"价格缓存: {cache_size}个交易对")
# 记录账户快照到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import AccountSnapshot
AccountSnapshot.create(
total_balance=summary['totalBalance'],
available_balance=summary['availableBalance'],
total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])),
total_pnl=summary['totalPnL'],
open_positions=summary['totalPositions']
)
except Exception as e:
logger.debug(f"记录账户快照失败: {e}")
# 重新加载配置(确保使用最新配置)
try:
if hasattr(config, 'reload_config'):
config.reload_config()
logger.info("配置已重新加载,将使用最新配置进行下次扫描")
except Exception as e:
logger.debug(f"重新加载配置失败: {e}")
# 等待下次扫描
scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600)
logger.info(f"等待 {scan_interval} 秒后进行下次扫描...")
await asyncio.sleep(scan_interval)
except Exception as e:
logger.error(f"策略执行出错: {e}", exc_info=True)
finally:
self.running = False
# 停止定期同步任务
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
logger.info("定期持仓同步任务已停止")
# 停止所有持仓的WebSocket监控
try:
await self.position_manager.stop_all_position_monitoring()
except Exception as e:
logger.warning(f"停止持仓监控时出错: {e}")
logger.info("交易策略已停止")
async def _check_volume_confirmation(self, symbol_info: Dict) -> bool:
"""
成交量确认 - 确保有足够的成交量支撑
Args:
symbol_info: 交易对信息
Returns:
是否通过确认
"""
volume_24h = symbol_info.get('volume24h', 0)
min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H']
if volume_24h < min_volume:
return False
return True
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
"""
使用技术指标分析交易信号(高胜率策略)
Args:
symbol_info: 交易对信息(包含技术指标)
Returns:
交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int}
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
signal_strength = 0
reasons = []
direction = None
# 策略1均值回归震荡市场高胜率
if market_regime == 'ranging':
# RSI超卖做多信号
if rsi and rsi < 30:
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
# RSI超买做空信号
elif rsi and rsi > 70:
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
# 布林带下轨,做多信号
if bollinger and current_price <= bollinger['lower']:
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
# 布林带上轨,做空信号
elif bollinger and current_price >= bollinger['upper']:
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
# MACD金叉做多信号
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
# MACD死叉做空信号
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
# 均线系统
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50: # 下降趋势
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
# 策略3综合信号提高胜率
# 多个指标同时确认时,信号更强
confirmations = 0
# RSI确认
if rsi:
if direction == 'BUY' and rsi < 50:
confirmations += 1
elif direction == 'SELL' and rsi > 50:
confirmations += 1
# MACD确认
if macd:
if direction == 'BUY' and macd['histogram'] > 0:
confirmations += 1
elif direction == 'SELL' and macd['histogram'] < 0:
confirmations += 1
# 布林带确认
if bollinger:
if direction == 'BUY' and current_price < bollinger['middle']:
confirmations += 1
elif direction == 'SELL' and current_price > bollinger['middle']:
confirmations += 1
# 多个指标确认时,增加信号强度
if confirmations >= 2:
signal_strength += 2
reasons.append(f"{confirmations}个指标确认")
# 判断是否应该交易(信号强度 >= 5 才交易,提高胜率)
should_trade = signal_strength >= config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 5)
if not should_trade and direction:
reasons.append(f"信号强度不足({signal_strength}/10)")
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength
}
async def _periodic_sync_positions(self):
"""
定期同步币安持仓状态(独立任务,不依赖市场扫描)
"""
sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟
logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}")
while self.running:
try:
await asyncio.sleep(sync_interval)
if not self.running:
break
logger.debug("执行定期持仓状态同步...")
await self.position_manager.sync_positions_with_binance()
except asyncio.CancelledError:
logger.info("定期持仓同步任务已取消")
break
except Exception as e:
logger.error(f"定期持仓同步任务出错: {e}")
# 出错后等待一段时间再继续
await asyncio.sleep(60)
def stop(self):
"""停止策略"""
self.running = False
logger.info("正在停止交易策略...")