547 lines
25 KiB
Python
547 lines
25 KiB
Python
"""
|
||
交易策略模块 - 实现交易逻辑和优化
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import List, Dict, Optional
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .market_scanner import MarketScanner
|
||
from .risk_manager import RiskManager
|
||
from .position_manager import PositionManager
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from market_scanner import MarketScanner
|
||
from risk_manager import RiskManager
|
||
from position_manager import PositionManager
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TradingStrategy:
|
||
"""交易策略类"""
|
||
|
||
def __init__(
|
||
self,
|
||
client: BinanceClient,
|
||
scanner: MarketScanner,
|
||
risk_manager: RiskManager,
|
||
position_manager: PositionManager,
|
||
recommender=None # 推荐器(可选)
|
||
):
|
||
"""
|
||
初始化交易策略
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
scanner: 市场扫描器
|
||
risk_manager: 风险管理器
|
||
position_manager: 仓位管理器
|
||
recommender: 推荐器(可选,如果提供则自动生成推荐)
|
||
"""
|
||
self.client = client
|
||
self.scanner = scanner
|
||
self.risk_manager = risk_manager
|
||
self.position_manager = position_manager
|
||
self.recommender = recommender # 推荐器
|
||
self.running = False
|
||
self._monitoring_started = False # 是否已启动监控
|
||
self._sync_task = None # 定期同步任务
|
||
|
||
async def execute_strategy(self):
|
||
"""
|
||
执行交易策略
|
||
"""
|
||
self.running = True
|
||
logger.info("交易策略开始执行...")
|
||
|
||
# 启动所有现有持仓的WebSocket实时监控
|
||
if not self._monitoring_started:
|
||
try:
|
||
await self.position_manager.start_all_position_monitoring()
|
||
self._monitoring_started = True
|
||
except Exception as e:
|
||
logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式")
|
||
|
||
# 启动定期同步任务(独立于市场扫描)
|
||
self._sync_task = asyncio.create_task(self._periodic_sync_positions())
|
||
logger.info("定期持仓同步任务已启动")
|
||
|
||
try:
|
||
while self.running:
|
||
# 0. 定期从Redis重新加载配置(确保配置修改能即时生效)
|
||
# 每次循环开始时从Redis重新加载,确保使用最新配置
|
||
try:
|
||
if config._config_manager:
|
||
config._config_manager.reload_from_redis()
|
||
config.TRADING_CONFIG = config._get_trading_config()
|
||
logger.debug("配置已从Redis重新加载")
|
||
except Exception as e:
|
||
logger.warning(f"从Redis重新加载配置失败: {e}")
|
||
|
||
# 1. 扫描市场,找出涨跌幅最大的前N个货币对
|
||
top_symbols = await self.scanner.scan_market()
|
||
|
||
if not top_symbols:
|
||
logger.warning("未找到符合条件的交易对,等待下次扫描...")
|
||
await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL'])
|
||
continue
|
||
|
||
# 2. 对每个交易对执行交易逻辑(使用技术指标)
|
||
for symbol_info in top_symbols:
|
||
if not self.running:
|
||
break
|
||
|
||
symbol = symbol_info['symbol']
|
||
change_percent = symbol_info['changePercent']
|
||
direction = symbol_info['direction']
|
||
market_regime = symbol_info.get('marketRegime', 'unknown')
|
||
|
||
logger.info(
|
||
f"处理交易对: {symbol} "
|
||
f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})"
|
||
)
|
||
|
||
# 使用技术指标判断交易信号(高胜率策略)
|
||
trade_signal = await self._analyze_trade_signal(symbol_info)
|
||
|
||
# 记录交易信号到数据库(只有当有明确方向时才记录)
|
||
signal_direction = trade_signal.get('direction')
|
||
if signal_direction: # 只有当方向不为空时才记录
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import TradingSignal
|
||
TradingSignal.create(
|
||
symbol=symbol,
|
||
signal_direction=signal_direction,
|
||
signal_strength=trade_signal.get('strength', 0),
|
||
signal_reason=trade_signal.get('reason', ''),
|
||
rsi=symbol_info.get('rsi'),
|
||
macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
|
||
market_regime=symbol_info.get('marketRegime')
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"记录交易信号失败: {e}")
|
||
|
||
# 自动生成推荐(即使不满足自动交易条件,只要信号强度>=5就生成推荐)
|
||
# 注意:推荐生成不检查是否已有持仓,因为推荐是供手动参考的
|
||
if self.recommender and signal_direction and trade_signal.get('strength', 0) >= 5:
|
||
try:
|
||
await self.recommender._create_recommendation(symbol_info, trade_signal)
|
||
logger.debug(f"✓ {symbol} 自动生成推荐成功 (信号强度: {trade_signal.get('strength', 0)}/10)")
|
||
except Exception as e:
|
||
logger.warning(f"自动生成推荐失败 {symbol}: {e}")
|
||
|
||
# 用户风险旋钮:自动交易总开关(关闭则只生成推荐)
|
||
auto_enabled = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ENABLED", True))
|
||
if not auto_enabled:
|
||
logger.info(f"{symbol} 自动交易已关闭(AUTO_TRADE_ENABLED=false),跳过自动下单(推荐已生成)")
|
||
continue
|
||
|
||
# 提升胜率:可配置的“仅 trending 自动交易”过滤
|
||
only_trending = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ONLY_TRENDING", True))
|
||
if only_trending and market_regime != 'trending':
|
||
logger.info(
|
||
f"{symbol} 市场状态={market_regime},跳过自动交易(仅生成推荐)"
|
||
f"|原因:AUTO_TRADE_ONLY_TRENDING=true"
|
||
)
|
||
continue
|
||
|
||
# 检查是否应该自动交易(已有持仓则跳过自动交易,但推荐已生成)
|
||
if not await self.risk_manager.should_trade(symbol, change_percent):
|
||
continue
|
||
|
||
# 优化:结合成交量确认
|
||
if not await self._check_volume_confirmation(symbol_info):
|
||
logger.info(f"{symbol} 成交量确认失败,跳过")
|
||
continue
|
||
|
||
if not trade_signal['should_trade']:
|
||
logger.info(
|
||
f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过自动交易"
|
||
)
|
||
continue
|
||
|
||
# 确定交易方向(基于技术指标)
|
||
trade_direction = trade_signal['direction']
|
||
entry_reason = trade_signal['reason']
|
||
|
||
signal_strength = trade_signal.get('strength', 0)
|
||
logger.info(
|
||
f"{symbol} 交易信号: {trade_direction} | "
|
||
f"原因: {entry_reason} | "
|
||
f"信号强度: {signal_strength}/10"
|
||
)
|
||
|
||
# 根据信号强度计算动态杠杆(高质量信号使用更高杠杆)
|
||
# 同时检查交易对支持的最大杠杆限制
|
||
dynamic_leverage = await self.risk_manager.calculate_dynamic_leverage(signal_strength, symbol)
|
||
logger.info(
|
||
f"{symbol} 使用动态杠杆: {dynamic_leverage}x "
|
||
f"(信号强度: {signal_strength}/10)"
|
||
)
|
||
|
||
# 开仓(使用改进的仓位管理)
|
||
position = await self.position_manager.open_position(
|
||
symbol=symbol,
|
||
change_percent=change_percent,
|
||
leverage=dynamic_leverage,
|
||
trade_direction=trade_direction,
|
||
entry_reason=entry_reason,
|
||
signal_strength=signal_strength,
|
||
market_regime=market_regime,
|
||
trend_4h=trade_signal.get('trend_4h'),
|
||
atr=symbol_info.get('atr'),
|
||
klines=symbol_info.get('klines'), # 传递K线数据用于动态止损
|
||
bollinger=symbol_info.get('bollinger') # 传递布林带数据用于动态止损
|
||
)
|
||
|
||
if position:
|
||
logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})")
|
||
# 开仓成功后,WebSocket监控会在position_manager中自动启动
|
||
else:
|
||
logger.warning(f"{symbol} 开仓失败")
|
||
|
||
# 避免同时处理太多交易对
|
||
await asyncio.sleep(1)
|
||
|
||
# 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致)
|
||
try:
|
||
await self.position_manager.sync_positions_with_binance()
|
||
# 同步后,确保所有持仓都有监控(包括手动开仓的)
|
||
await self.position_manager.start_all_position_monitoring()
|
||
except Exception as e:
|
||
logger.warning(f"持仓状态同步失败: {e}")
|
||
|
||
# 4. 检查止损止盈(作为备用检查,WebSocket实时监控是主要方式)
|
||
# 注意:如果启用了WebSocket实时监控,这里主要是作为备用检查
|
||
closed = await self.position_manager.check_stop_loss_take_profit()
|
||
if closed:
|
||
logger.info(f"定时检查触发平仓: {', '.join(closed)}")
|
||
|
||
# 5. 打印持仓摘要并记录账户快照
|
||
summary = await self.position_manager.get_position_summary()
|
||
if summary:
|
||
logger.info(
|
||
f"持仓摘要: {summary['totalPositions']} 个持仓, "
|
||
f"总盈亏: {summary['totalPnL']:.2f} USDT, "
|
||
f"可用余额: {summary['availableBalance']:.2f} USDT"
|
||
)
|
||
|
||
# 输出价格缓存统计
|
||
cache_size = len(self.client._price_cache)
|
||
if cache_size > 0:
|
||
logger.info(f"价格缓存: {cache_size}个交易对")
|
||
|
||
# 记录账户快照到数据库
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import AccountSnapshot
|
||
AccountSnapshot.create(
|
||
total_balance=summary['totalBalance'],
|
||
available_balance=summary['availableBalance'],
|
||
total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])),
|
||
total_pnl=summary['totalPnL'],
|
||
open_positions=summary['totalPositions']
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"记录账户快照失败: {e}")
|
||
|
||
# 重新加载配置(确保使用最新配置)
|
||
try:
|
||
if hasattr(config, 'reload_config'):
|
||
config.reload_config()
|
||
logger.info("配置已重新加载,将使用最新配置进行下次扫描")
|
||
except Exception as e:
|
||
logger.debug(f"重新加载配置失败: {e}")
|
||
|
||
# 等待下次扫描
|
||
scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600)
|
||
logger.info(f"等待 {scan_interval} 秒后进行下次扫描...")
|
||
await asyncio.sleep(scan_interval)
|
||
|
||
except Exception as e:
|
||
logger.error(f"策略执行出错: {e}", exc_info=True)
|
||
finally:
|
||
self.running = False
|
||
# 停止定期同步任务
|
||
if self._sync_task:
|
||
self._sync_task.cancel()
|
||
try:
|
||
await self._sync_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("定期持仓同步任务已停止")
|
||
# 停止所有持仓的WebSocket监控
|
||
try:
|
||
await self.position_manager.stop_all_position_monitoring()
|
||
except Exception as e:
|
||
logger.warning(f"停止持仓监控时出错: {e}")
|
||
logger.info("交易策略已停止")
|
||
|
||
async def _check_volume_confirmation(self, symbol_info: Dict) -> bool:
|
||
"""
|
||
成交量确认 - 确保有足够的成交量支撑
|
||
|
||
Args:
|
||
symbol_info: 交易对信息
|
||
|
||
Returns:
|
||
是否通过确认
|
||
"""
|
||
volume_24h = symbol_info.get('volume24h', 0)
|
||
min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H']
|
||
|
||
if volume_24h < min_volume:
|
||
return False
|
||
|
||
return True
|
||
|
||
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
|
||
"""
|
||
使用技术指标分析交易信号(高胜率策略)
|
||
实施多周期共振:4H定方向,1H和15min找点位
|
||
|
||
Args:
|
||
symbol_info: 交易对信息(包含技术指标)
|
||
|
||
Returns:
|
||
交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int}
|
||
"""
|
||
symbol = symbol_info['symbol']
|
||
current_price = symbol_info['price']
|
||
rsi = symbol_info.get('rsi')
|
||
macd = symbol_info.get('macd')
|
||
bollinger = symbol_info.get('bollinger')
|
||
market_regime = symbol_info.get('marketRegime', 'unknown')
|
||
ema20 = symbol_info.get('ema20')
|
||
ema50 = symbol_info.get('ema50')
|
||
|
||
# 多周期共振检查:4H定方向(使用多指标投票机制)
|
||
price_4h = symbol_info.get('price_4h', current_price)
|
||
ema20_4h = symbol_info.get('ema20_4h')
|
||
ema50_4h = symbol_info.get('ema50_4h')
|
||
macd_4h = symbol_info.get('macd_4h')
|
||
|
||
# 判断4H周期趋势方向(多指标投票)
|
||
trend_4h = self._judge_trend_4h(price_4h, ema20_4h, ema50_4h, macd_4h)
|
||
|
||
signal_strength = 0
|
||
reasons = []
|
||
direction = None
|
||
|
||
# 多周期共振检查:4H定方向,禁止逆势交易
|
||
if trend_4h == 'up':
|
||
# 4H趋势向上,只允许做多信号
|
||
logger.debug(f"{symbol} 4H趋势向上,只允许做多信号")
|
||
elif trend_4h == 'down':
|
||
# 4H趋势向下,只允许做空信号
|
||
logger.debug(f"{symbol} 4H趋势向下,只允许做空信号")
|
||
elif trend_4h == 'neutral':
|
||
# 4H趋势中性,记录警告
|
||
logger.warning(f"{symbol} 4H趋势中性,信号质量可能降低")
|
||
|
||
# 简化策略:只做趋势跟踪,移除均值回归策略(避免信号冲突)
|
||
# 策略权重配置(根据文档建议)
|
||
TREND_SIGNAL_WEIGHTS = {
|
||
'macd_cross': 5, # MACD金叉/死叉
|
||
'ema_cross': 4, # EMA20上穿/下穿EMA50
|
||
'price_above_ema20': 3, # 价格在EMA20之上/下
|
||
'4h_trend_confirmation': 2, # 4H趋势确认
|
||
}
|
||
|
||
# 趋势跟踪策略(不再区分市场状态,统一使用趋势指标)
|
||
# MACD金叉/死叉(权重最高)
|
||
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
|
||
# MACD金叉,做多信号(需4H趋势向上或中性)
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
|
||
reasons.append("MACD金叉")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
else:
|
||
reasons.append("MACD金叉但4H趋势向下,禁止逆势做多")
|
||
|
||
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
|
||
# MACD死叉,做空信号(需4H趋势向下或中性)
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
|
||
reasons.append("MACD死叉")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
else:
|
||
reasons.append("MACD死叉但4H趋势向上,禁止逆势做空")
|
||
|
||
# EMA均线系统
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50: # 上升趋势
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
|
||
reasons.append("EMA20上穿EMA50,上升趋势")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
else:
|
||
reasons.append("1H均线向上但4H趋势向下,禁止逆势做多")
|
||
elif current_price < ema20 < ema50: # 下降趋势
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
|
||
reasons.append("EMA20下穿EMA50,下降趋势")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
else:
|
||
reasons.append("1H均线向下但4H趋势向上,禁止逆势做空")
|
||
|
||
# 价格与EMA20关系
|
||
if ema20:
|
||
if current_price > ema20:
|
||
if trend_4h in ('up', 'neutral', None) and direction == 'BUY':
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
|
||
reasons.append("价格在EMA20之上")
|
||
elif current_price < ema20:
|
||
if trend_4h in ('down', 'neutral', None) and direction == 'SELL':
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
|
||
reasons.append("价格在EMA20之下")
|
||
|
||
# 4H趋势确认加分
|
||
if direction and trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
|
||
signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation']
|
||
reasons.append("4H周期共振确认")
|
||
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
# 逆势信号,直接拒绝
|
||
signal_strength = 0
|
||
reasons.append("❌ 逆4H趋势,拒绝交易")
|
||
|
||
# 判断是否应该交易(信号强度 >= 7 才交易,提高胜率)
|
||
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
|
||
# 强度上限归一到 0-10,避免出现 12/10 这种误导显示
|
||
try:
|
||
signal_strength = max(0, min(int(signal_strength), 10))
|
||
except Exception:
|
||
pass
|
||
should_trade = signal_strength >= min_signal_strength and direction is not None
|
||
|
||
# 提升胜率:4H趋势中性时不做自动交易(只保留推荐/观察)
|
||
# 经验上,中性趋势下“趋势跟踪”信号更容易被来回扫损,导致胜率显著降低与交易次数激增。
|
||
allow_neutral = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ALLOW_4H_NEUTRAL", False))
|
||
if (trend_4h == 'neutral') and (not allow_neutral):
|
||
if should_trade:
|
||
reasons.append("❌ 4H趋势中性(为提升胜率:仅生成推荐,不自动交易)")
|
||
should_trade = False
|
||
|
||
# 如果信号方向与4H趋势相反,直接拒绝交易
|
||
if direction and trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
should_trade = False
|
||
reasons.append("❌ 禁止逆4H趋势交易")
|
||
|
||
# 只在“真的强度不足”时追加该原因;避免出现“强度>=阈值但被其它过滤挡住”仍提示强度不足
|
||
if direction and (signal_strength < min_signal_strength):
|
||
reasons.append(f"信号强度不足({signal_strength}/10,需要≥{min_signal_strength})")
|
||
|
||
return {
|
||
'should_trade': should_trade,
|
||
'direction': direction,
|
||
'reason': ', '.join(reasons) if reasons else '无明确信号',
|
||
'strength': signal_strength,
|
||
'trend_4h': trend_4h, # 添加4H趋势信息,供推荐器使用
|
||
'strategy_type': 'trend_following' # 标记策略类型
|
||
}
|
||
|
||
def _judge_trend_4h(self, price_4h: float, ema20_4h: Optional[float], ema50_4h: Optional[float], macd_4h: Optional[Dict]) -> str:
|
||
"""
|
||
使用多指标投票机制判断4H趋势(避免单一指标误导)
|
||
|
||
Args:
|
||
price_4h: 4H周期价格
|
||
ema20_4h: 4H周期EMA20
|
||
ema50_4h: 4H周期EMA50
|
||
macd_4h: 4H周期MACD数据
|
||
|
||
Returns:
|
||
'up', 'down', 'neutral'
|
||
"""
|
||
if ema20_4h is None:
|
||
return 'neutral'
|
||
|
||
score = 0
|
||
total_indicators = 0
|
||
|
||
# 指标1:价格 vs EMA20
|
||
if price_4h > ema20_4h:
|
||
score += 1
|
||
elif price_4h < ema20_4h:
|
||
score -= 1
|
||
total_indicators += 1
|
||
|
||
# 指标2:EMA20 vs EMA50
|
||
if ema50_4h is not None:
|
||
if ema20_4h > ema50_4h:
|
||
score += 1
|
||
elif ema20_4h < ema50_4h:
|
||
score -= 1
|
||
total_indicators += 1
|
||
|
||
# 指标3:MACD histogram
|
||
if macd_4h and isinstance(macd_4h, dict):
|
||
macd_hist = macd_4h.get('histogram', 0)
|
||
if macd_hist > 0:
|
||
score += 1
|
||
elif macd_hist < 0:
|
||
score -= 1
|
||
total_indicators += 1
|
||
|
||
# 多数投票:如果score >= 2,趋势向上;如果score <= -2,趋势向下;否则中性
|
||
if total_indicators == 0:
|
||
return 'neutral'
|
||
|
||
# 至少需要2个指标确认
|
||
if score >= 2:
|
||
return 'up'
|
||
elif score <= -2:
|
||
return 'down'
|
||
else:
|
||
return 'neutral'
|
||
|
||
async def _periodic_sync_positions(self):
|
||
"""
|
||
定期同步币安持仓状态(独立任务,不依赖市场扫描)
|
||
"""
|
||
sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟
|
||
logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}秒")
|
||
|
||
while self.running:
|
||
try:
|
||
await asyncio.sleep(sync_interval)
|
||
|
||
if not self.running:
|
||
break
|
||
|
||
logger.debug("执行定期持仓状态同步...")
|
||
await self.position_manager.sync_positions_with_binance()
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("定期持仓同步任务已取消")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"定期持仓同步任务出错: {e}")
|
||
# 出错后等待一段时间再继续
|
||
await asyncio.sleep(60)
|
||
|
||
def stop(self):
|
||
"""停止策略"""
|
||
self.running = False
|
||
logger.info("正在停止交易策略...")
|