auto_trade_sys/trading_system/strategy.py
薇薇安 b00bce9650 a
2026-01-15 14:50:04 +08:00

506 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易策略模块 - 实现交易逻辑和优化
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .position_manager import PositionManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from position_manager import PositionManager
import config
logger = logging.getLogger(__name__)
class TradingStrategy:
"""交易策略类"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager,
position_manager: PositionManager,
recommender=None # 推荐器(可选)
):
"""
初始化交易策略
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
position_manager: 仓位管理器
recommender: 推荐器(可选,如果提供则自动生成推荐)
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
self.position_manager = position_manager
self.recommender = recommender # 推荐器
self.running = False
self._monitoring_started = False # 是否已启动监控
self._sync_task = None # 定期同步任务
async def execute_strategy(self):
"""
执行交易策略
"""
self.running = True
logger.info("交易策略开始执行...")
# 启动所有现有持仓的WebSocket实时监控
if not self._monitoring_started:
try:
await self.position_manager.start_all_position_monitoring()
self._monitoring_started = True
except Exception as e:
logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式")
# 启动定期同步任务(独立于市场扫描)
self._sync_task = asyncio.create_task(self._periodic_sync_positions())
logger.info("定期持仓同步任务已启动")
try:
while self.running:
# 1. 扫描市场找出涨跌幅最大的前N个货币对
top_symbols = await self.scanner.scan_market()
if not top_symbols:
logger.warning("未找到符合条件的交易对,等待下次扫描...")
await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL'])
continue
# 2. 对每个交易对执行交易逻辑(使用技术指标)
for symbol_info in top_symbols:
if not self.running:
break
symbol = symbol_info['symbol']
change_percent = symbol_info['changePercent']
direction = symbol_info['direction']
market_regime = symbol_info.get('marketRegime', 'unknown')
logger.info(
f"处理交易对: {symbol} "
f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})"
)
# 使用技术指标判断交易信号(高胜率策略)
trade_signal = await self._analyze_trade_signal(symbol_info)
# 记录交易信号到数据库(只有当有明确方向时才记录)
signal_direction = trade_signal.get('direction')
if signal_direction: # 只有当方向不为空时才记录
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradingSignal
TradingSignal.create(
symbol=symbol,
signal_direction=signal_direction,
signal_strength=trade_signal.get('strength', 0),
signal_reason=trade_signal.get('reason', ''),
rsi=symbol_info.get('rsi'),
macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
market_regime=symbol_info.get('marketRegime')
)
except Exception as e:
logger.debug(f"记录交易信号失败: {e}")
# 自动生成推荐(即使不满足自动交易条件,只要信号强度>=5就生成推荐
# 注意:推荐生成不检查是否已有持仓,因为推荐是供手动参考的
if self.recommender and signal_direction and trade_signal.get('strength', 0) >= 5:
try:
await self.recommender._create_recommendation(symbol_info, trade_signal)
logger.debug(f"{symbol} 自动生成推荐成功 (信号强度: {trade_signal.get('strength', 0)}/10)")
except Exception as e:
logger.warning(f"自动生成推荐失败 {symbol}: {e}")
# 检查是否应该自动交易(已有持仓则跳过自动交易,但推荐已生成)
if not await self.risk_manager.should_trade(symbol, change_percent):
continue
# 优化:结合成交量确认
if not await self._check_volume_confirmation(symbol_info):
logger.info(f"{symbol} 成交量确认失败,跳过")
continue
if not trade_signal['should_trade']:
logger.info(
f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过自动交易"
)
continue
# 确定交易方向(基于技术指标)
trade_direction = trade_signal['direction']
entry_reason = trade_signal['reason']
signal_strength = trade_signal.get('strength', 0)
logger.info(
f"{symbol} 交易信号: {trade_direction} | "
f"原因: {entry_reason} | "
f"信号强度: {signal_strength}/10"
)
# 根据信号强度计算动态杠杆(高质量信号使用更高杠杆)
# 同时检查交易对支持的最大杠杆限制
dynamic_leverage = await self.risk_manager.calculate_dynamic_leverage(signal_strength, symbol)
logger.info(
f"{symbol} 使用动态杠杆: {dynamic_leverage}x "
f"(信号强度: {signal_strength}/10)"
)
# 开仓(使用改进的仓位管理)
position = await self.position_manager.open_position(
symbol=symbol,
change_percent=change_percent,
leverage=dynamic_leverage,
trade_direction=trade_direction,
entry_reason=entry_reason,
atr=symbol_info.get('atr'),
klines=symbol_info.get('klines'), # 传递K线数据用于动态止损
bollinger=symbol_info.get('bollinger') # 传递布林带数据用于动态止损
)
if position:
logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})")
# 开仓成功后WebSocket监控会在position_manager中自动启动
else:
logger.warning(f"{symbol} 开仓失败")
# 避免同时处理太多交易对
await asyncio.sleep(1)
# 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致)
try:
await self.position_manager.sync_positions_with_binance()
# 同步后,确保所有持仓都有监控(包括手动开仓的)
await self.position_manager.start_all_position_monitoring()
except Exception as e:
logger.warning(f"持仓状态同步失败: {e}")
# 4. 检查止损止盈作为备用检查WebSocket实时监控是主要方式
# 注意如果启用了WebSocket实时监控这里主要是作为备用检查
closed = await self.position_manager.check_stop_loss_take_profit()
if closed:
logger.info(f"定时检查触发平仓: {', '.join(closed)}")
# 5. 打印持仓摘要并记录账户快照
summary = await self.position_manager.get_position_summary()
if summary:
logger.info(
f"持仓摘要: {summary['totalPositions']} 个持仓, "
f"总盈亏: {summary['totalPnL']:.2f} USDT, "
f"可用余额: {summary['availableBalance']:.2f} USDT"
)
# 输出价格缓存统计
cache_size = len(self.client._price_cache)
if cache_size > 0:
logger.info(f"价格缓存: {cache_size}个交易对")
# 记录账户快照到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import AccountSnapshot
AccountSnapshot.create(
total_balance=summary['totalBalance'],
available_balance=summary['availableBalance'],
total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])),
total_pnl=summary['totalPnL'],
open_positions=summary['totalPositions']
)
except Exception as e:
logger.debug(f"记录账户快照失败: {e}")
# 重新加载配置(确保使用最新配置)
try:
if hasattr(config, 'reload_config'):
config.reload_config()
logger.info("配置已重新加载,将使用最新配置进行下次扫描")
except Exception as e:
logger.debug(f"重新加载配置失败: {e}")
# 等待下次扫描
scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600)
logger.info(f"等待 {scan_interval} 秒后进行下次扫描...")
await asyncio.sleep(scan_interval)
except Exception as e:
logger.error(f"策略执行出错: {e}", exc_info=True)
finally:
self.running = False
# 停止定期同步任务
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
logger.info("定期持仓同步任务已停止")
# 停止所有持仓的WebSocket监控
try:
await self.position_manager.stop_all_position_monitoring()
except Exception as e:
logger.warning(f"停止持仓监控时出错: {e}")
logger.info("交易策略已停止")
async def _check_volume_confirmation(self, symbol_info: Dict) -> bool:
"""
成交量确认 - 确保有足够的成交量支撑
Args:
symbol_info: 交易对信息
Returns:
是否通过确认
"""
volume_24h = symbol_info.get('volume24h', 0)
min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H']
if volume_24h < min_volume:
return False
return True
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
"""
使用技术指标分析交易信号(高胜率策略)
实施多周期共振4H定方向1H和15min找点位
Args:
symbol_info: 交易对信息(包含技术指标)
Returns:
交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int}
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
# 多周期共振检查4H定方向
price_4h = symbol_info.get('price_4h', current_price)
ema20_4h = symbol_info.get('ema20_4h')
# 判断4H周期趋势方向
trend_4h = None # 'up', 'down', 'neutral'
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
signal_strength = 0
reasons = []
direction = None
# 多周期共振检查4H定方向禁止逆势交易
if trend_4h == 'up':
# 4H趋势向上只允许做多信号
logger.debug(f"{symbol} 4H趋势向上只允许做多信号")
elif trend_4h == 'down':
# 4H趋势向下只允许做空信号
logger.debug(f"{symbol} 4H趋势向下只允许做空信号")
elif trend_4h is None:
# 无法判断4H趋势记录警告
logger.warning(f"{symbol} 无法判断4H趋势可能影响信号质量")
# 策略1均值回归震荡市场高胜率
if market_regime == 'ranging':
# RSI超卖做多信号需4H趋势向上或中性
if rsi and rsi < 30:
if trend_4h in ('up', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
else:
reasons.append(f"RSI超卖但4H趋势向下禁止逆势做多")
# RSI超买做空信号需4H趋势向下或中性
elif rsi and rsi > 70:
if trend_4h in ('down', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
else:
reasons.append(f"RSI超买但4H趋势向上禁止逆势做空")
# 布林带下轨做多信号需4H趋势向上或中性
if bollinger and current_price <= bollinger['lower']:
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
else:
reasons.append("触及布林带下轨但4H趋势向下禁止逆势做多")
# 布林带上轨做空信号需4H趋势向下或中性
elif bollinger and current_price >= bollinger['upper']:
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
else:
reasons.append("触及布林带上轨但4H趋势向上禁止逆势做空")
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
# MACD金叉做多信号需4H趋势向上或中性
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
else:
reasons.append("MACD金叉但4H趋势向下禁止逆势做多")
# MACD死叉做空信号需4H趋势向下或中性
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
else:
reasons.append("MACD死叉但4H趋势向上禁止逆势做空")
# 均线系统
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
if trend_4h in ('up', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
else:
reasons.append("1H均线向上但4H趋势向下禁止逆势做多")
elif current_price < ema20 < ema50: # 下降趋势
if trend_4h in ('down', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
else:
reasons.append("1H均线向下但4H趋势向上禁止逆势做空")
# 策略3综合信号提高胜率
# 多个指标同时确认时,信号更强
confirmations = 0
# RSI确认
if rsi:
if direction == 'BUY' and rsi < 50:
confirmations += 1
elif direction == 'SELL' and rsi > 50:
confirmations += 1
# MACD确认
if macd:
if direction == 'BUY' and macd['histogram'] > 0:
confirmations += 1
elif direction == 'SELL' and macd['histogram'] < 0:
confirmations += 1
# 布林带确认
if bollinger:
if direction == 'BUY' and current_price < bollinger['middle']:
confirmations += 1
elif direction == 'SELL' and current_price > bollinger['middle']:
confirmations += 1
# 多个指标确认时,增加信号强度
if confirmations >= 2:
signal_strength += 2
reasons.append(f"{confirmations}个指标确认")
# 多周期共振加分如果4H趋势与信号方向一致增加信号强度
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += 2
reasons.append("4H周期共振确认")
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
# 逆势信号,降低信号强度或直接拒绝
signal_strength -= 3
reasons.append("⚠️ 逆4H趋势信号强度降低")
# 判断是否应该交易(信号强度 >= 7 才交易,提高胜率)
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
should_trade = signal_strength >= min_signal_strength
# 如果信号方向与4H趋势相反直接拒绝交易
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
should_trade = False
reasons.append("❌ 禁止逆4H趋势交易")
if not should_trade and direction:
reasons.append(f"信号强度不足({signal_strength}/10)")
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength,
'trend_4h': trend_4h # 添加4H趋势信息供推荐器使用
}
async def _periodic_sync_positions(self):
"""
定期同步币安持仓状态(独立任务,不依赖市场扫描)
"""
sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟
logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}")
while self.running:
try:
await asyncio.sleep(sync_interval)
if not self.running:
break
logger.debug("执行定期持仓状态同步...")
await self.position_manager.sync_positions_with_binance()
except asyncio.CancelledError:
logger.info("定期持仓同步任务已取消")
break
except Exception as e:
logger.error(f"定期持仓同步任务出错: {e}")
# 出错后等待一段时间再继续
await asyncio.sleep(60)
def stop(self):
"""停止策略"""
self.running = False
logger.info("正在停止交易策略...")