""" 交易策略模块 - 实现交易逻辑和优化 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from .position_manager import PositionManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from position_manager import PositionManager import config logger = logging.getLogger(__name__) class TradingStrategy: """交易策略类""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager, position_manager: PositionManager, recommender=None # 推荐器(可选) ): """ 初始化交易策略 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 position_manager: 仓位管理器 recommender: 推荐器(可选,如果提供则自动生成推荐) """ self.client = client self.scanner = scanner self.risk_manager = risk_manager self.position_manager = position_manager self.recommender = recommender # 推荐器 self.running = False self._monitoring_started = False # 是否已启动监控 self._sync_task = None # 定期同步任务 async def execute_strategy(self): """ 执行交易策略 """ self.running = True logger.info("交易策略开始执行...") # 启动所有现有持仓的WebSocket实时监控 if not self._monitoring_started: try: await self.position_manager.start_all_position_monitoring() self._monitoring_started = True except Exception as e: logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式") # 启动定期同步任务(独立于市场扫描) self._sync_task = asyncio.create_task(self._periodic_sync_positions()) logger.info("定期持仓同步任务已启动") try: while self.running: # 0. 定期从Redis重新加载配置(确保配置修改能即时生效) # 每次循环开始时从Redis重新加载,确保使用最新配置 try: if config._config_manager: config._config_manager.reload_from_redis() config.TRADING_CONFIG = config._get_trading_config() logger.debug("配置已从Redis重新加载") except Exception as e: logger.warning(f"从Redis重新加载配置失败: {e}") # 1. 扫描市场,找出涨跌幅最大的前N个货币对 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对,等待下次扫描...") await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) continue # 2. 对每个交易对执行交易逻辑(使用技术指标) for symbol_info in top_symbols: if not self.running: break symbol = symbol_info['symbol'] change_percent = symbol_info['changePercent'] direction = symbol_info['direction'] market_regime = symbol_info.get('marketRegime', 'unknown') logger.info( f"处理交易对: {symbol} " f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})" ) # 使用技术指标判断交易信号(高胜率策略) trade_signal = await self._analyze_trade_signal(symbol_info) # 记录交易信号到数据库(只有当有明确方向时才记录) signal_direction = trade_signal.get('direction') if signal_direction: # 只有当方向不为空时才记录 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradingSignal TradingSignal.create( symbol=symbol, signal_direction=signal_direction, signal_strength=trade_signal.get('strength', 0), signal_reason=trade_signal.get('reason', ''), rsi=symbol_info.get('rsi'), macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, market_regime=symbol_info.get('marketRegime') ) except Exception as e: logger.debug(f"记录交易信号失败: {e}") # 自动生成推荐(即使不满足自动交易条件,只要信号强度>=5就生成推荐) # 注意:推荐生成不检查是否已有持仓,因为推荐是供手动参考的 if self.recommender and signal_direction and trade_signal.get('strength', 0) >= 5: try: await self.recommender._create_recommendation(symbol_info, trade_signal) logger.debug(f"✓ {symbol} 自动生成推荐成功 (信号强度: {trade_signal.get('strength', 0)}/10)") except Exception as e: logger.warning(f"自动生成推荐失败 {symbol}: {e}") # 用户风险旋钮:自动交易总开关(关闭则只生成推荐) auto_enabled = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ENABLED", True)) if not auto_enabled: logger.info(f"{symbol} 自动交易已关闭(AUTO_TRADE_ENABLED=false),跳过自动下单(推荐已生成)") continue # 提升胜率:可配置的“仅 trending 自动交易”过滤 only_trending = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ONLY_TRENDING", True)) if only_trending and market_regime != 'trending': logger.info( f"{symbol} 市场状态={market_regime},跳过自动交易(仅生成推荐)" f"|原因:AUTO_TRADE_ONLY_TRENDING=true" ) continue # 检查是否应该自动交易(已有持仓则跳过自动交易,但推荐已生成) if not await self.risk_manager.should_trade(symbol, change_percent): continue # 优化:结合成交量确认 if not await self._check_volume_confirmation(symbol_info): logger.info(f"{symbol} 成交量确认失败,跳过") continue if not trade_signal['should_trade']: logger.info( f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过自动交易" ) continue # 确定交易方向(基于技术指标) trade_direction = trade_signal['direction'] entry_reason = trade_signal['reason'] signal_strength = trade_signal.get('strength', 0) logger.info( f"{symbol} 交易信号: {trade_direction} | " f"原因: {entry_reason} | " f"信号强度: {signal_strength}/10" ) # 根据信号强度计算动态杠杆(高质量信号使用更高杠杆) # ⚠️ 优化:同时检查交易对支持的最大杠杆限制和小众币限制 dynamic_leverage = await self.risk_manager.calculate_dynamic_leverage( signal_strength, symbol, atr=symbol_info.get('atr'), entry_price=symbol_info.get('price') ) logger.info( f"{symbol} 使用动态杠杆: {dynamic_leverage}x " f"(信号强度: {signal_strength}/10)" ) # 开仓(使用改进的仓位管理) position = await self.position_manager.open_position( symbol=symbol, change_percent=change_percent, leverage=dynamic_leverage, trade_direction=trade_direction, entry_reason=entry_reason, signal_strength=signal_strength, market_regime=market_regime, trend_4h=trade_signal.get('trend_4h'), atr=symbol_info.get('atr'), klines=symbol_info.get('klines'), # 传递K线数据用于动态止损 bollinger=symbol_info.get('bollinger') # 传递布林带数据用于动态止损 ) if position: logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})") # 开仓成功后,WebSocket监控会在position_manager中自动启动 else: logger.warning(f"{symbol} 开仓失败") # 避免同时处理太多交易对 await asyncio.sleep(1) # 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致) try: await self.position_manager.sync_positions_with_binance() # 同步后,确保所有持仓都有监控(包括手动开仓的) await self.position_manager.start_all_position_monitoring() except Exception as e: logger.warning(f"持仓状态同步失败: {e}") # 4. 检查止损止盈(作为备用检查,WebSocket实时监控是主要方式) # 注意:如果启用了WebSocket实时监控,这里主要是作为备用检查 closed = await self.position_manager.check_stop_loss_take_profit() if closed: logger.info(f"定时检查触发平仓: {', '.join(closed)}") # 5. 打印持仓摘要并记录账户快照 summary = await self.position_manager.get_position_summary() if summary: logger.info( f"持仓摘要: {summary['totalPositions']} 个持仓, " f"总盈亏: {summary['totalPnL']:.2f} USDT, " f"可用余额: {summary['availableBalance']:.2f} USDT" ) # 输出价格缓存统计 cache_size = len(self.client._price_cache) if cache_size > 0: logger.info(f"价格缓存: {cache_size}个交易对") # 记录账户快照到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import AccountSnapshot AccountSnapshot.create( total_balance=summary['totalBalance'], available_balance=summary['availableBalance'], total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])), total_pnl=summary['totalPnL'], open_positions=summary['totalPositions'] ) except Exception as e: logger.debug(f"记录账户快照失败: {e}") # 重新加载配置(确保使用最新配置) try: if hasattr(config, 'reload_config'): config.reload_config() logger.info("配置已重新加载,将使用最新配置进行下次扫描") except Exception as e: logger.debug(f"重新加载配置失败: {e}") # 等待下次扫描 scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600) logger.info(f"等待 {scan_interval} 秒后进行下次扫描...") await asyncio.sleep(scan_interval) except Exception as e: logger.error(f"策略执行出错: {e}", exc_info=True) finally: self.running = False # 停止定期同步任务 if self._sync_task: self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass logger.info("定期持仓同步任务已停止") # 停止所有持仓的WebSocket监控 try: await self.position_manager.stop_all_position_monitoring() except Exception as e: logger.warning(f"停止持仓监控时出错: {e}") logger.info("交易策略已停止") async def _check_volume_confirmation(self, symbol_info: Dict) -> bool: """ 成交量确认 - 确保有足够的成交量支撑 Args: symbol_info: 交易对信息 Returns: 是否通过确认 """ volume_24h = symbol_info.get('volume24h', 0) min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H'] if volume_24h < min_volume: return False return True async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: """ 使用技术指标分析交易信号(高胜率策略) 实施多周期共振:4H定方向,1H和15min找点位 Args: symbol_info: 交易对信息(包含技术指标) Returns: 交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int} """ symbol = symbol_info['symbol'] # ⚠️ 优化1:大盘共振(Beta Filter)- 当BTC/ETH剧烈下跌时,屏蔽所有多单 beta_filter_enabled = config.TRADING_CONFIG.get('BETA_FILTER_ENABLED', True) if beta_filter_enabled: beta_filter_result = await self._check_beta_filter() if beta_filter_result['should_block_buy']: # 如果大盘暴跌,屏蔽所有多单 if symbol_info.get('direction') == 'BUY' or (symbol_info.get('changePercent', 0) > 0): return { 'should_trade': False, 'direction': None, 'reason': f"❌ 大盘共振过滤:{beta_filter_result['reason']},屏蔽所有多单", 'strength': 0, 'trend_4h': symbol_info.get('trend_4h') } logger.debug(f"{symbol} 大盘共振检查通过(做空信号不受影响)") current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') # 多周期共振检查:4H定方向(使用多指标投票机制) price_4h = symbol_info.get('price_4h', current_price) ema20_4h = symbol_info.get('ema20_4h') ema50_4h = symbol_info.get('ema50_4h') macd_4h = symbol_info.get('macd_4h') # 判断4H周期趋势方向(多指标投票) trend_4h = self._judge_trend_4h(price_4h, ema20_4h, ema50_4h, macd_4h) signal_strength = 0 reasons = [] direction = None # 多周期共振检查:4H定方向,禁止逆势交易 if trend_4h == 'up': # 4H趋势向上,只允许做多信号 logger.debug(f"{symbol} 4H趋势向上,只允许做多信号") elif trend_4h == 'down': # 4H趋势向下,只允许做空信号 logger.debug(f"{symbol} 4H趋势向下,只允许做空信号") elif trend_4h == 'neutral': # 4H趋势中性,记录警告 logger.warning(f"{symbol} 4H趋势中性,信号质量可能降低") # 简化策略:只做趋势跟踪,移除均值回归策略(避免信号冲突) # 策略权重配置(根据文档建议) TREND_SIGNAL_WEIGHTS = { 'macd_cross': 5, # MACD金叉/死叉 'ema_cross': 4, # EMA20上穿/下穿EMA50 'price_above_ema20': 3, # 价格在EMA20之上/下 '4h_trend_confirmation': 2, # 4H趋势确认 } # 趋势跟踪策略(不再区分市场状态,统一使用趋势指标) # MACD金叉/死叉(权重最高) if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0: # MACD金叉,做多信号(需4H趋势向上或中性) if trend_4h in ('up', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] reasons.append("MACD金叉") if direction is None: direction = 'BUY' else: reasons.append("MACD金叉但4H趋势向下,禁止逆势做多") elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0: # MACD死叉,做空信号(需4H趋势向下或中性) if trend_4h in ('down', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross'] reasons.append("MACD死叉") if direction is None: direction = 'SELL' else: reasons.append("MACD死叉但4H趋势向上,禁止逆势做空") # EMA均线系统 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 if trend_4h in ('up', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] reasons.append("EMA20上穿EMA50,上升趋势") if direction is None: direction = 'BUY' else: reasons.append("1H均线向上但4H趋势向下,禁止逆势做多") elif current_price < ema20 < ema50: # 下降趋势 if trend_4h in ('down', 'neutral', None): signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross'] reasons.append("EMA20下穿EMA50,下降趋势") if direction is None: direction = 'SELL' else: reasons.append("1H均线向下但4H趋势向上,禁止逆势做空") # 价格与EMA20关系 if ema20: if current_price > ema20: if trend_4h in ('up', 'neutral', None) and direction == 'BUY': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] reasons.append("价格在EMA20之上") elif current_price < ema20: if trend_4h in ('down', 'neutral', None) and direction == 'SELL': signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20'] reasons.append("价格在EMA20之下") # 4H趋势确认加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation'] reasons.append("4H周期共振确认") elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): # 逆势信号,直接拒绝 signal_strength = 0 reasons.append("❌ 逆4H趋势,拒绝交易") # 判断是否应该交易(信号强度 >= MIN_SIGNAL_STRENGTH 才交易,提高胜率) min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) # 强度上限归一到 0-10,避免出现 12/10 这种误导显示 try: signal_strength = max(0, min(int(signal_strength), 10)) except Exception: pass should_trade = signal_strength >= min_signal_strength and direction is not None # ===== 15m 短周期方向过滤:避免“上涨中做空 / 下跌中做多” ===== # 思路: # - 使用最近 N 根 15m K 线的总涨跌幅来确认短期方向 # - 做多(BUY):要求短期合计涨幅 >= ENTRY_SHORT_TREND_MIN_PCT # - 做空(SELL):要求短期合计跌幅 <= -ENTRY_SHORT_TREND_MIN_PCT # - 否则认为短期方向与计划方向不一致,降低 should_trade,并在 reason 中标记 try: short_filter_enabled = bool(config.TRADING_CONFIG.get('ENTRY_SHORT_TREND_FILTER_ENABLED', False)) if short_filter_enabled and should_trade and direction in ('BUY', 'SELL'): short_interval = config.TRADING_CONFIG.get('ENTRY_SHORT_INTERVAL', '15m') periods = int(config.TRADING_CONFIG.get('ENTRY_SHORT_CONFIRM_CANDLES', 3) or 3) min_pct = float(config.TRADING_CONFIG.get('ENTRY_SHORT_TREND_MIN_PCT', 0.003) or 0.003) # 仅在4H趋势已经明确向上/向下时启用短周期过滤;中性趋势本身就不适合自动交易 if trend_4h in ('up', 'down'): recent_change = await self._get_symbol_change_period(symbol, short_interval, periods) if recent_change is not None: # recent_change 是这段时间内的总涨跌幅比例(正为上涨,负为下跌) if direction == 'BUY': # 做多:要求短期是上涨或至少不明显下跌 if recent_change < min_pct: should_trade = False reasons.append( f"❌ 短周期({short_interval}×{periods})合计跌幅/涨幅不足:{recent_change*100:.2f}% " f"(做多需≥{min_pct*100:.2f}%)" ) elif direction == 'SELL': # 做空:要求短期是下跌或至少不明显上涨 if recent_change > -min_pct: should_trade = False reasons.append( f"❌ 短周期({short_interval}×{periods})合计涨幅/跌幅不足:{recent_change*100:.2f}% " f"(做空需≤-{min_pct*100:.2f}%)" ) except Exception as e: logger.debug(f"{symbol} 短周期方向过滤失败(忽略,不影响其它过滤): {e}") # 提升胜率:4H趋势中性时不做自动交易(只保留推荐/观察) # 经验上,中性趋势下“趋势跟踪”信号更容易被来回扫损,导致胜率显著降低与交易次数激增。 allow_neutral = bool(config.TRADING_CONFIG.get("AUTO_TRADE_ALLOW_4H_NEUTRAL", False)) if (trend_4h == 'neutral') and (not allow_neutral): if should_trade: reasons.append("❌ 4H趋势中性(为提升胜率:仅生成推荐,不自动交易)") should_trade = False # 如果信号方向与4H趋势相反,直接拒绝交易(所有模式) if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): should_trade = False reasons.append("❌ 禁止逆4H趋势交易") # 只在“真的强度不足”时追加该原因;避免出现“强度>=阈值但被其它过滤挡住”仍提示强度不足 if direction and (signal_strength < min_signal_strength): reasons.append(f"信号强度不足({signal_strength}/10,需要≥{min_signal_strength})") return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength, 'trend_4h': trend_4h, # 添加4H趋势信息,供推荐器使用 'strategy_type': 'trend_following' # 标记策略类型 } async def _check_beta_filter(self) -> Dict: """ 大盘共振(Beta Filter)检查 当BTC或ETH在15min/1h周期剧烈下跌时,返回应该屏蔽多单的信号 Returns: {'should_block_buy': bool, 'reason': str} """ beta_filter_threshold = config.TRADING_CONFIG.get('BETA_FILTER_THRESHOLD', -0.03) # 默认-3% try: # 检查BTC和ETH的15min和1h周期 btcusdt_15m = await self._get_symbol_change_period('BTCUSDT', '15m', 5) # 最近5根K线 btcusdt_1h = await self._get_symbol_change_period('BTCUSDT', '1h', 3) # 最近3根K线 ethusdt_15m = await self._get_symbol_change_period('ETHUSDT', '15m', 5) ethusdt_1h = await self._get_symbol_change_period('ETHUSDT', '1h', 3) # 检查是否有剧烈下跌 btc_15m_down = btcusdt_15m and btcusdt_15m < beta_filter_threshold btc_1h_down = btcusdt_1h and btcusdt_1h < beta_filter_threshold eth_15m_down = ethusdt_15m and ethusdt_15m < beta_filter_threshold eth_1h_down = ethusdt_1h and ethusdt_1h < beta_filter_threshold if btc_15m_down or btc_1h_down or eth_15m_down or eth_1h_down: reasons = [] if btc_15m_down: reasons.append(f"BTC 15m下跌{btcusdt_15m*100:.2f}%") if btc_1h_down: reasons.append(f"BTC 1h下跌{btcusdt_1h*100:.2f}%") if eth_15m_down: reasons.append(f"ETH 15m下跌{ethusdt_15m*100:.2f}%") if eth_1h_down: reasons.append(f"ETH 1h下跌{ethusdt_1h*100:.2f}%") return { 'should_block_buy': True, 'reason': f"大盘暴跌:{', '.join(reasons)}" } return { 'should_block_buy': False, 'reason': '大盘正常' } except Exception as e: logger.warning(f"大盘共振检查失败: {e},允许交易(容错)") return { 'should_block_buy': False, 'reason': f'检查失败: {e}' } async def _get_symbol_change_period(self, symbol: str, interval: str, periods: int) -> Optional[float]: """ 获取指定交易对在指定周期内的涨跌幅 Args: symbol: 交易对 interval: K线周期(15m, 1h等) periods: 检查最近N根K线 Returns: 涨跌幅(负数表示下跌),如果获取失败返回None """ try: klines = await self.client.get_klines(symbol, interval, limit=periods + 1) if not klines or len(klines) < 2: return None # 计算最近N根K线的总涨跌幅 first_close = float(klines[0][4]) # 第一根K线的收盘价 last_close = float(klines[-1][4]) # 最后一根K线的收盘价 change = (last_close - first_close) / first_close return change except Exception as e: logger.debug(f"获取{symbol} {interval}周期涨跌幅失败: {e}") return None def _judge_trend_4h(self, price_4h: float, ema20_4h: Optional[float], ema50_4h: Optional[float], macd_4h: Optional[Dict]) -> str: """ 使用多指标投票机制判断4H趋势(避免单一指标误导) Args: price_4h: 4H周期价格 ema20_4h: 4H周期EMA20 ema50_4h: 4H周期EMA50 macd_4h: 4H周期MACD数据 Returns: 'up', 'down', 'neutral' """ if ema20_4h is None: return 'neutral' score = 0 total_indicators = 0 # 指标1:价格 vs EMA20 if price_4h > ema20_4h: score += 1 elif price_4h < ema20_4h: score -= 1 total_indicators += 1 # 指标2:EMA20 vs EMA50 if ema50_4h is not None: if ema20_4h > ema50_4h: score += 1 elif ema20_4h < ema50_4h: score -= 1 total_indicators += 1 # 指标3:MACD histogram if macd_4h and isinstance(macd_4h, dict): macd_hist = macd_4h.get('histogram', 0) if macd_hist > 0: score += 1 elif macd_hist < 0: score -= 1 total_indicators += 1 # 多数投票:如果score >= 2,趋势向上;如果score <= -2,趋势向下;否则中性 if total_indicators == 0: return 'neutral' # 至少需要2个指标确认 if score >= 2: return 'up' elif score <= -2: return 'down' else: return 'neutral' async def _periodic_sync_positions(self): """ 定期同步币安持仓状态(独立任务,不依赖市场扫描) """ sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟 logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}秒") while self.running: try: await asyncio.sleep(sync_interval) if not self.running: break logger.debug("执行定期持仓状态同步...") await self.position_manager.sync_positions_with_binance() except asyncio.CancelledError: logger.info("定期持仓同步任务已取消") break except Exception as e: logger.error(f"定期持仓同步任务出错: {e}") # 出错后等待一段时间再继续 await asyncio.sleep(60) def stop(self): """停止策略""" self.running = False logger.info("正在停止交易策略...")