""" 交易策略模块 - 实现交易逻辑和优化 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from .position_manager import PositionManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from position_manager import PositionManager import config logger = logging.getLogger(__name__) class TradingStrategy: """交易策略类""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager, position_manager: PositionManager, recommender=None # 推荐器(可选) ): """ 初始化交易策略 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 position_manager: 仓位管理器 recommender: 推荐器(可选,如果提供则自动生成推荐) """ self.client = client self.scanner = scanner self.risk_manager = risk_manager self.position_manager = position_manager self.recommender = recommender # 推荐器 self.running = False self._monitoring_started = False # 是否已启动监控 self._sync_task = None # 定期同步任务 async def execute_strategy(self): """ 执行交易策略 """ self.running = True logger.info("交易策略开始执行...") # 启动所有现有持仓的WebSocket实时监控 if not self._monitoring_started: try: await self.position_manager.start_all_position_monitoring() self._monitoring_started = True except Exception as e: logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式") # 启动定期同步任务(独立于市场扫描) self._sync_task = asyncio.create_task(self._periodic_sync_positions()) logger.info("定期持仓同步任务已启动") try: while self.running: # 0. 定期从Redis重新加载配置(确保配置修改能即时生效) # 每次循环开始时从Redis重新加载,确保使用最新配置 try: if config._config_manager: config._config_manager.reload_from_redis() config.TRADING_CONFIG = config._get_trading_config() logger.debug("配置已从Redis重新加载") except Exception as e: logger.warning(f"从Redis重新加载配置失败: {e}") # 1. 扫描市场,找出涨跌幅最大的前N个货币对 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对,等待下次扫描...") await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) continue # 2. 对每个交易对执行交易逻辑(使用技术指标) for symbol_info in top_symbols: if not self.running: break symbol = symbol_info['symbol'] change_percent = symbol_info['changePercent'] direction = symbol_info['direction'] market_regime = symbol_info.get('marketRegime', 'unknown') logger.info( f"处理交易对: {symbol} " f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})" ) # 使用技术指标判断交易信号(高胜率策略) trade_signal = await self._analyze_trade_signal(symbol_info) # 记录交易信号到数据库(只有当有明确方向时才记录) signal_direction = trade_signal.get('direction') if signal_direction: # 只有当方向不为空时才记录 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradingSignal TradingSignal.create( symbol=symbol, signal_direction=signal_direction, signal_strength=trade_signal.get('strength', 0), signal_reason=trade_signal.get('reason', ''), rsi=symbol_info.get('rsi'), macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, market_regime=symbol_info.get('marketRegime') ) except Exception as e: logger.debug(f"记录交易信号失败: {e}") # 自动生成推荐(即使不满足自动交易条件,只要信号强度>=5就生成推荐) # 注意:推荐生成不检查是否已有持仓,因为推荐是供手动参考的 if self.recommender and signal_direction and trade_signal.get('strength', 0) >= 5: try: await self.recommender._create_recommendation(symbol_info, trade_signal) logger.debug(f"✓ {symbol} 自动生成推荐成功 (信号强度: {trade_signal.get('strength', 0)}/10)") except Exception as e: logger.warning(f"自动生成推荐失败 {symbol}: {e}") # 提升胜率:可配置的“仅 trending 自动交易”过滤 only_trending = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ONLY_TRENDING", True)) if only_trending and market_regime != 'trending': logger.info(f"{symbol} 市场状态={market_regime},跳过自动交易(仅生成推荐)") continue # 检查是否应该自动交易(已有持仓则跳过自动交易,但推荐已生成) if not await self.risk_manager.should_trade(symbol, change_percent): continue # 优化:结合成交量确认 if not await self._check_volume_confirmation(symbol_info): logger.info(f"{symbol} 成交量确认失败,跳过") continue if not trade_signal['should_trade']: logger.info( f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过自动交易" ) continue # 确定交易方向(基于技术指标) trade_direction = trade_signal['direction'] entry_reason = trade_signal['reason'] signal_strength = trade_signal.get('strength', 0) logger.info( f"{symbol} 交易信号: {trade_direction} | " f"原因: {entry_reason} | " f"信号强度: {signal_strength}/10" ) # 根据信号强度计算动态杠杆(高质量信号使用更高杠杆) # 同时检查交易对支持的最大杠杆限制 dynamic_leverage = await self.risk_manager.calculate_dynamic_leverage(signal_strength, symbol) logger.info( f"{symbol} 使用动态杠杆: {dynamic_leverage}x " f"(信号强度: {signal_strength}/10)" ) # 开仓(使用改进的仓位管理) position = await self.position_manager.open_position( symbol=symbol, change_percent=change_percent, leverage=dynamic_leverage, trade_direction=trade_direction, entry_reason=entry_reason, signal_strength=signal_strength, market_regime=market_regime, trend_4h=trade_signal.get('trend_4h'), atr=symbol_info.get('atr'), klines=symbol_info.get('klines'), # 传递K线数据用于动态止损 bollinger=symbol_info.get('bollinger') # 传递布林带数据用于动态止损 ) if position: logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})") # 开仓成功后,WebSocket监控会在position_manager中自动启动 else: logger.warning(f"{symbol} 开仓失败") # 避免同时处理太多交易对 await asyncio.sleep(1) # 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致) try: await self.position_manager.sync_positions_with_binance() # 同步后,确保所有持仓都有监控(包括手动开仓的) await self.position_manager.start_all_position_monitoring() except Exception as e: logger.warning(f"持仓状态同步失败: {e}") # 4. 检查止损止盈(作为备用检查,WebSocket实时监控是主要方式) # 注意:如果启用了WebSocket实时监控,这里主要是作为备用检查 closed = await self.position_manager.check_stop_loss_take_profit() if closed: logger.info(f"定时检查触发平仓: {', '.join(closed)}") # 5. 打印持仓摘要并记录账户快照 summary = await self.position_manager.get_position_summary() if summary: logger.info( f"持仓摘要: {summary['totalPositions']} 个持仓, " f"总盈亏: {summary['totalPnL']:.2f} USDT, " f"可用余额: {summary['availableBalance']:.2f} USDT" ) # 输出价格缓存统计 cache_size = len(self.client._price_cache) if cache_size > 0: logger.info(f"价格缓存: {cache_size}个交易对") # 记录账户快照到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import AccountSnapshot AccountSnapshot.create( total_balance=summary['totalBalance'], available_balance=summary['availableBalance'], total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])), total_pnl=summary['totalPnL'], open_positions=summary['totalPositions'] ) except Exception as e: logger.debug(f"记录账户快照失败: {e}") # 重新加载配置(确保使用最新配置) try: if hasattr(config, 'reload_config'): config.reload_config() logger.info("配置已重新加载,将使用最新配置进行下次扫描") except Exception as e: logger.debug(f"重新加载配置失败: {e}") # 等待下次扫描 scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600) logger.info(f"等待 {scan_interval} 秒后进行下次扫描...") await asyncio.sleep(scan_interval) except Exception as e: logger.error(f"策略执行出错: {e}", exc_info=True) finally: self.running = False # 停止定期同步任务 if self._sync_task: self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass logger.info("定期持仓同步任务已停止") # 停止所有持仓的WebSocket监控 try: await self.position_manager.stop_all_position_monitoring() except Exception as e: logger.warning(f"停止持仓监控时出错: {e}") logger.info("交易策略已停止") async def _check_volume_confirmation(self, symbol_info: Dict) -> bool: """ 成交量确认 - 确保有足够的成交量支撑 Args: symbol_info: 交易对信息 Returns: 是否通过确认 """ volume_24h = symbol_info.get('volume24h', 0) min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H'] if volume_24h < min_volume: return False return True async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: """ 使用技术指标分析交易信号(高胜率策略) 实施多周期共振:4H定方向,1H和15min找点位 Args: symbol_info: 交易对信息(包含技术指标) Returns: 交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int} """ symbol = symbol_info['symbol'] current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') # 多周期共振检查:4H定方向(使用多指标投票机制) price_4h = symbol_info.get('price_4h', current_price) ema20_4h = symbol_info.get('ema20_4h') ema50_4h = symbol_info.get('ema50_4h') macd_4h = symbol_info.get('macd_4h') # 判断4H周期趋势方向(多指标投票) trend_4h = self._judge_trend_4h(price_4h, ema20_4h, ema50_4h, macd_4h) signal_strength = 0 reasons = [] direction = None # 多周期共振检查:4H定方向,禁止逆势交易 if trend_4h == 'up': # 4H趋势向上,只允许做多信号 logger.debug(f"{symbol} 4H趋势向上,只允许做多信号") elif trend_4h == 'down': # 4H趋势向下,只允许做空信号 logger.debug(f"{symbol} 4H趋势向下,只允许做空信号") elif trend_4h == 'neutral': # 4H趋势中性,记录警告 logger.warning(f"{symbol} 4H趋势中性,信号质量可能降低") # 简化策略:只做趋势跟踪,移除均值回归策略(避免信号冲突) # 策略权重配置(根据文档建议) TREND_SIGNAL_WEIGHTS = { 'macd_cross': 5, # MACD金叉/死叉 'ema_cross': 4, # EMA20上穿/下穿EMA50 'price_above_ema20': 3, # 价格在EMA20之上/下 '4h_trend_confirmation': 2, # 4H趋势确认 } # 趋势跟踪策略(不再区分市场状态,统一使用趋势指标) # MACD金叉/死叉(权重最高) if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0: # MACD金叉,做多信号(需4H趋势向上或中性) if trend_4h in ('up', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] reasons.append("MACD金叉") if direction is None: direction = 'BUY' else: reasons.append("MACD金叉但4H趋势向下,禁止逆势做多") elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0: # MACD死叉,做空信号(需4H趋势向下或中性) if trend_4h in ('down', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] reasons.append("MACD死叉") if direction is None: direction = 'SELL' else: reasons.append("MACD死叉但4H趋势向上,禁止逆势做空") # EMA均线系统 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 if trend_4h in ('up', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] reasons.append("EMA20上穿EMA50,上升趋势") if direction is None: direction = 'BUY' else: reasons.append("1H均线向上但4H趋势向下,禁止逆势做多") elif current_price < ema20 < ema50: # 下降趋势 if trend_4h in ('down', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] reasons.append("EMA20下穿EMA50,下降趋势") if direction is None: direction = 'SELL' else: reasons.append("1H均线向下但4H趋势向上,禁止逆势做空") # 价格与EMA20关系 if ema20: if current_price > ema20: if trend_4h in ('up', 'neutral', None) and direction == 'BUY': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] reasons.append("价格在EMA20之上") elif current_price < ema20: if trend_4h in ('down', 'neutral', None) and direction == 'SELL': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] reasons.append("价格在EMA20之下") # 4H趋势确认加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation'] reasons.append("4H周期共振确认") elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): # 逆势信号,直接拒绝 signal_strength = 0 reasons.append("❌ 逆4H趋势,拒绝交易") # 判断是否应该交易(信号强度 >= 7 才交易,提高胜率) min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) # 强度上限归一到 0-10,避免出现 12/10 这种误导显示 try: signal_strength = max(0, min(int(signal_strength), 10)) except Exception: pass should_trade = signal_strength >= min_signal_strength and direction is not None # 提升胜率:4H趋势中性时不做自动交易(只保留推荐/观察) # 经验上,中性趋势下“趋势跟踪”信号更容易被来回扫损,导致胜率显著降低与交易次数激增。 allow_neutral = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ALLOW_4H_NEUTRAL", False)) if (trend_4h == 'neutral') and (not allow_neutral): if should_trade: reasons.append("❌ 4H趋势中性(为提升胜率:仅生成推荐,不自动交易)") should_trade = False # 如果信号方向与4H趋势相反,直接拒绝交易 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): should_trade = False reasons.append("❌ 禁止逆4H趋势交易") # 只在“真的强度不足”时追加该原因;避免出现“强度>=阈值但被其它过滤挡住”仍提示强度不足 if direction and (signal_strength < min_signal_strength): reasons.append(f"信号强度不足({signal_strength}/10,需要≥{min_signal_strength})") return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength, 'trend_4h': trend_4h, # 添加4H趋势信息,供推荐器使用 'strategy_type': 'trend_following' # 标记策略类型 } def _judge_trend_4h(self, price_4h: float, ema20_4h: Optional[float], ema50_4h: Optional[float], macd_4h: Optional[Dict]) -> str: """ 使用多指标投票机制判断4H趋势(避免单一指标误导) Args: price_4h: 4H周期价格 ema20_4h: 4H周期EMA20 ema50_4h: 4H周期EMA50 macd_4h: 4H周期MACD数据 Returns: 'up', 'down', 'neutral' """ if ema20_4h is None: return 'neutral' score = 0 total_indicators = 0 # 指标1:价格 vs EMA20 if price_4h > ema20_4h: score += 1 elif price_4h < ema20_4h: score -= 1 total_indicators += 1 # 指标2:EMA20 vs EMA50 if ema50_4h is not None: if ema20_4h > ema50_4h: score += 1 elif ema20_4h < ema50_4h: score -= 1 total_indicators += 1 # 指标3:MACD histogram if macd_4h and isinstance(macd_4h, dict): macd_hist = macd_4h.get('histogram', 0) if macd_hist > 0: score += 1 elif macd_hist < 0: score -= 1 total_indicators += 1 # 多数投票:如果score >= 2,趋势向上;如果score <= -2,趋势向下;否则中性 if total_indicators == 0: return 'neutral' # 至少需要2个指标确认 if score >= 2: return 'up' elif score <= -2: return 'down' else: return 'neutral' async def _periodic_sync_positions(self): """ 定期同步币安持仓状态(独立任务,不依赖市场扫描) """ sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟 logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}秒") while self.running: try: await asyncio.sleep(sync_interval) if not self.running: break logger.debug("执行定期持仓状态同步...") await self.position_manager.sync_positions_with_binance() except asyncio.CancelledError: logger.info("定期持仓同步任务已取消") break except Exception as e: logger.error(f"定期持仓同步任务出错: {e}") # 出错后等待一段时间再继续 await asyncio.sleep(60) def stop(self): """停止策略""" self.running = False logger.info("正在停止交易策略...")