""" 交易策略模块 - 实现交易逻辑和优化 """ import asyncio import logging from typing import List, Dict, Optional from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from position_manager import PositionManager import config logger = logging.getLogger(__name__) class TradingStrategy: """交易策略类""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager, position_manager: PositionManager ): """ 初始化交易策略 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 position_manager: 仓位管理器 """ self.client = client self.scanner = scanner self.risk_manager = risk_manager self.position_manager = position_manager self.running = False async def execute_strategy(self): """ 执行交易策略 """ self.running = True logger.info("交易策略开始执行...") try: while self.running: # 1. 扫描市场,找出涨跌幅最大的前N个货币对 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对,等待下次扫描...") await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) continue # 2. 对每个交易对执行交易逻辑 for symbol_info in top_symbols: if not self.running: break symbol = symbol_info['symbol'] change_percent = symbol_info['changePercent'] direction = symbol_info['direction'] logger.info( f"处理交易对: {symbol} " f"({direction} {change_percent:.2f}%)" ) # 检查是否应该交易 if not await self.risk_manager.should_trade(symbol, change_percent): continue # 优化:结合成交量确认 if not await self._check_volume_confirmation(symbol_info): logger.info(f"{symbol} 成交量确认失败,跳过") continue # 优化:趋势确认(可选) if not await self._check_trend_confirmation(symbol, change_percent): logger.info(f"{symbol} 趋势确认失败,跳过") continue # 开仓 position = await self.position_manager.open_position( symbol=symbol, change_percent=change_percent, leverage=10 # 默认10倍杠杆 ) if position: logger.info(f"{symbol} 开仓成功") else: logger.warning(f"{symbol} 开仓失败") # 避免同时处理太多交易对 await asyncio.sleep(1) # 3. 检查止损止盈 await self.position_manager.check_stop_loss_take_profit() # 4. 打印持仓摘要 summary = await self.position_manager.get_position_summary() if summary: logger.info( f"持仓摘要: {summary['totalPositions']} 个持仓, " f"总盈亏: {summary['totalPnL']:.2f} USDT, " f"可用余额: {summary['availableBalance']:.2f} USDT" ) # 等待下次扫描 logger.info(f"等待 {config.TRADING_CONFIG['SCAN_INTERVAL']} 秒后进行下次扫描...") await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) except Exception as e: logger.error(f"策略执行出错: {e}", exc_info=True) finally: self.running = False logger.info("交易策略已停止") async def _check_volume_confirmation(self, symbol_info: Dict) -> bool: """ 成交量确认 - 确保有足够的成交量支撑 Args: symbol_info: 交易对信息 Returns: 是否通过确认 """ volume_24h = symbol_info.get('volume24h', 0) min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H'] if volume_24h < min_volume: return False return True async def _check_trend_confirmation( self, symbol: str, change_percent: float ) -> bool: """ 趋势确认 - 结合多时间周期确认趋势 Args: symbol: 交易对 change_percent: 5分钟涨跌幅 Returns: 是否通过确认 """ try: # 获取15分钟K线数据,确认更大周期的趋势 klines_15m = await self.client.get_klines( symbol=symbol, interval='15m', limit=2 ) if len(klines_15m) < 2: return True # 如果无法获取,默认通过 # 计算15分钟涨跌幅 current_price_15m = float(klines_15m[-1][4]) prev_price_15m = float(klines_15m[-2][4]) if prev_price_15m == 0: return True change_15m = ((current_price_15m - prev_price_15m) / prev_price_15m) * 100 # 5分钟和15分钟趋势一致时,确认通过 if (change_percent > 0 and change_15m > 0) or (change_percent < 0 and change_15m < 0): return True # 如果5分钟涨跌幅很大(>5%),即使15分钟不一致也允许 if abs(change_percent) > 5: return True return False except Exception as e: logger.debug(f"趋势确认失败 {symbol}: {e}") return True # 出错时默认通过 def stop(self): """停止策略""" self.running = False logger.info("正在停止交易策略...")