""" 交易策略模块 - 实现交易逻辑和优化 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from .position_manager import PositionManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from position_manager import PositionManager import config logger = logging.getLogger(__name__) class TradingStrategy: """交易策略类""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager, position_manager: PositionManager ): """ 初始化交易策略 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 position_manager: 仓位管理器 """ self.client = client self.scanner = scanner self.risk_manager = risk_manager self.position_manager = position_manager self.running = False self._monitoring_started = False # 是否已启动监控 self._sync_task = None # 定期同步任务 async def execute_strategy(self): """ 执行交易策略 """ self.running = True logger.info("交易策略开始执行...") # 启动所有现有持仓的WebSocket实时监控 if not self._monitoring_started: try: await self.position_manager.start_all_position_monitoring() self._monitoring_started = True except Exception as e: logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式") # 启动定期同步任务(独立于市场扫描) self._sync_task = asyncio.create_task(self._periodic_sync_positions()) logger.info("定期持仓同步任务已启动") try: while self.running: # 1. 扫描市场,找出涨跌幅最大的前N个货币对 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对,等待下次扫描...") await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) continue # 2. 对每个交易对执行交易逻辑(使用技术指标) for symbol_info in top_symbols: if not self.running: break symbol = symbol_info['symbol'] change_percent = symbol_info['changePercent'] direction = symbol_info['direction'] market_regime = symbol_info.get('marketRegime', 'unknown') logger.info( f"处理交易对: {symbol} " f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})" ) # 检查是否应该交易 if not await self.risk_manager.should_trade(symbol, change_percent): continue # 优化:结合成交量确认 if not await self._check_volume_confirmation(symbol_info): logger.info(f"{symbol} 成交量确认失败,跳过") continue # 使用技术指标判断交易信号(高胜率策略) trade_signal = await self._analyze_trade_signal(symbol_info) # 记录交易信号到数据库(只有当有明确方向时才记录) signal_direction = trade_signal.get('direction') if signal_direction: # 只有当方向不为空时才记录 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradingSignal TradingSignal.create( symbol=symbol, signal_direction=signal_direction, signal_strength=trade_signal.get('strength', 0), signal_reason=trade_signal.get('reason', ''), rsi=symbol_info.get('rsi'), macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, market_regime=symbol_info.get('marketRegime') ) except Exception as e: logger.debug(f"记录交易信号失败: {e}") if not trade_signal['should_trade']: logger.info( f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过" ) continue # 确定交易方向(基于技术指标) trade_direction = trade_signal['direction'] entry_reason = trade_signal['reason'] logger.info( f"{symbol} 交易信号: {trade_direction} | " f"原因: {entry_reason} | " f"信号强度: {trade_signal.get('strength', 0)}/10" ) # 开仓(使用改进的仓位管理) position = await self.position_manager.open_position( symbol=symbol, change_percent=change_percent, leverage=config.TRADING_CONFIG.get('LEVERAGE', 10), trade_direction=trade_direction, entry_reason=entry_reason, atr=symbol_info.get('atr') ) if position: logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})") # 开仓成功后,WebSocket监控会在position_manager中自动启动 else: logger.warning(f"{symbol} 开仓失败") # 避免同时处理太多交易对 await asyncio.sleep(1) # 3. 同步币安实际持仓状态与数据库(定期同步,确保状态一致) try: await self.position_manager.sync_positions_with_binance() # 同步后,确保所有持仓都有监控(包括手动开仓的) await self.position_manager.start_all_position_monitoring() except Exception as e: logger.warning(f"持仓状态同步失败: {e}") # 4. 检查止损止盈(作为备用检查,WebSocket实时监控是主要方式) # 注意:如果启用了WebSocket实时监控,这里主要是作为备用检查 closed = await self.position_manager.check_stop_loss_take_profit() if closed: logger.info(f"定时检查触发平仓: {', '.join(closed)}") # 5. 打印持仓摘要并记录账户快照 summary = await self.position_manager.get_position_summary() if summary: logger.info( f"持仓摘要: {summary['totalPositions']} 个持仓, " f"总盈亏: {summary['totalPnL']:.2f} USDT, " f"可用余额: {summary['availableBalance']:.2f} USDT" ) # 输出价格缓存统计 cache_size = len(self.client._price_cache) if cache_size > 0: logger.info(f"价格缓存: {cache_size}个交易对") # 记录账户快照到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import AccountSnapshot AccountSnapshot.create( total_balance=summary['totalBalance'], available_balance=summary['availableBalance'], total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])), total_pnl=summary['totalPnL'], open_positions=summary['totalPositions'] ) except Exception as e: logger.debug(f"记录账户快照失败: {e}") # 重新加载配置(确保使用最新配置) try: if hasattr(config, 'reload_config'): config.reload_config() logger.info("配置已重新加载,将使用最新配置进行下次扫描") except Exception as e: logger.debug(f"重新加载配置失败: {e}") # 等待下次扫描 scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600) logger.info(f"等待 {scan_interval} 秒后进行下次扫描...") await asyncio.sleep(scan_interval) except Exception as e: logger.error(f"策略执行出错: {e}", exc_info=True) finally: self.running = False # 停止定期同步任务 if self._sync_task: self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass logger.info("定期持仓同步任务已停止") # 停止所有持仓的WebSocket监控 try: await self.position_manager.stop_all_position_monitoring() except Exception as e: logger.warning(f"停止持仓监控时出错: {e}") logger.info("交易策略已停止") async def _check_volume_confirmation(self, symbol_info: Dict) -> bool: """ 成交量确认 - 确保有足够的成交量支撑 Args: symbol_info: 交易对信息 Returns: 是否通过确认 """ volume_24h = symbol_info.get('volume24h', 0) min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H'] if volume_24h < min_volume: return False return True async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: """ 使用技术指标分析交易信号(高胜率策略) Args: symbol_info: 交易对信息(包含技术指标) Returns: 交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int} """ symbol = symbol_info['symbol'] current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') signal_strength = 0 reasons = [] direction = None # 策略1:均值回归(震荡市场,高胜率) if market_regime == 'ranging': # RSI超卖,做多信号 if rsi and rsi < 30: signal_strength += 4 reasons.append(f"RSI超卖({rsi:.1f})") if direction is None: direction = 'BUY' # RSI超买,做空信号 elif rsi and rsi > 70: signal_strength += 4 reasons.append(f"RSI超买({rsi:.1f})") if direction is None: direction = 'SELL' # 布林带下轨,做多信号 if bollinger and current_price <= bollinger['lower']: signal_strength += 3 reasons.append("触及布林带下轨") if direction is None: direction = 'BUY' # 布林带上轨,做空信号 elif bollinger and current_price >= bollinger['upper']: signal_strength += 3 reasons.append("触及布林带上轨") if direction is None: direction = 'SELL' # 策略2:趋势跟踪(趋势市场) elif market_regime == 'trending': # MACD金叉,做多信号 if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0: signal_strength += 3 reasons.append("MACD金叉") if direction is None: direction = 'BUY' # MACD死叉,做空信号 elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0: signal_strength += 3 reasons.append("MACD死叉") if direction is None: direction = 'SELL' # 均线系统 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 signal_strength += 2 reasons.append("价格在均线之上") if direction is None: direction = 'BUY' elif current_price < ema20 < ema50: # 下降趋势 signal_strength += 2 reasons.append("价格在均线之下") if direction is None: direction = 'SELL' # 策略3:综合信号(提高胜率) # 多个指标同时确认时,信号更强 confirmations = 0 # RSI确认 if rsi: if direction == 'BUY' and rsi < 50: confirmations += 1 elif direction == 'SELL' and rsi > 50: confirmations += 1 # MACD确认 if macd: if direction == 'BUY' and macd['histogram'] > 0: confirmations += 1 elif direction == 'SELL' and macd['histogram'] < 0: confirmations += 1 # 布林带确认 if bollinger: if direction == 'BUY' and current_price < bollinger['middle']: confirmations += 1 elif direction == 'SELL' and current_price > bollinger['middle']: confirmations += 1 # 多个指标确认时,增加信号强度 if confirmations >= 2: signal_strength += 2 reasons.append(f"{confirmations}个指标确认") # 判断是否应该交易(信号强度 >= 5 才交易,提高胜率) should_trade = signal_strength >= config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 5) if not should_trade and direction: reasons.append(f"信号强度不足({signal_strength}/10)") return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength } async def _periodic_sync_positions(self): """ 定期同步币安持仓状态(独立任务,不依赖市场扫描) """ sync_interval = config.TRADING_CONFIG.get('POSITION_SYNC_INTERVAL', 300) # 默认5分钟 logger.info(f"定期持仓同步任务启动,同步间隔: {sync_interval}秒") while self.running: try: await asyncio.sleep(sync_interval) if not self.running: break logger.debug("执行定期持仓状态同步...") await self.position_manager.sync_positions_with_binance() except asyncio.CancelledError: logger.info("定期持仓同步任务已取消") break except Exception as e: logger.error(f"定期持仓同步任务出错: {e}") # 出错后等待一段时间再继续 await asyncio.sleep(60) def stop(self): """停止策略""" self.running = False logger.info("正在停止交易策略...")