diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index e9771b4..5bbb012 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -1,6 +1,7 @@ """ 仓位管理模块 - 管理持仓和订单 """ +import asyncio import logging from typing import Dict, List, Optional try: @@ -54,6 +55,8 @@ class PositionManager: self.client = client self.risk_manager = risk_manager self.active_positions: Dict[str, Dict] = {} + self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典 + self._monitoring_enabled = True # 是否启用实时监控 async def open_position( self, @@ -191,6 +194,10 @@ class PositionManager: f"(涨跌幅: {change_percent:.2f}%)" ) + # 启动WebSocket实时监控 + if self._monitoring_enabled: + await self._start_position_monitoring(symbol) + return position_info return None @@ -279,6 +286,9 @@ class PositionManager: elif not Trade: logger.warning(f"Trade模型未导入,无法更新 {symbol} 平仓记录") + # 停止WebSocket监控 + await self._stop_position_monitoring(symbol) + # 移除持仓记录 if symbol in self.active_positions: del self.active_positions[symbol] @@ -314,6 +324,7 @@ class PositionManager: current_position = position_dict[symbol] entry_price = position_info['entryPrice'] + quantity = position_info['quantity'] # 修复:获取quantity # 获取当前标记价格 current_price = current_position.get('markPrice', 0) if current_price == 0: @@ -504,3 +515,257 @@ class PositionManager: except Exception as e: logger.error(f"获取持仓摘要失败: {e}") return {} + + async def start_all_position_monitoring(self): + """ + 启动所有持仓的WebSocket实时监控 + """ + if not self._monitoring_enabled: + logger.info("实时监控已禁用,跳过启动") + return + + if not self.client.socket_manager: + logger.warning("WebSocket未初始化,无法启动实时监控") + return + + # 获取当前所有持仓 + positions = await self.client.get_open_positions() + for position in positions: + symbol = position['symbol'] + if symbol not in self._monitor_tasks and symbol in self.active_positions: + await self._start_position_monitoring(symbol) + + logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控") + + async def stop_all_position_monitoring(self): + """ + 停止所有持仓的WebSocket监控 + """ + symbols = list(self._monitor_tasks.keys()) + for symbol in symbols: + await self._stop_position_monitoring(symbol) + logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)") + + async def _start_position_monitoring(self, symbol: str): + """ + 启动单个持仓的WebSocket价格监控 + + Args: + symbol: 交易对 + """ + if symbol in self._monitor_tasks: + logger.debug(f"{symbol} 监控任务已存在,跳过") + return + + if not self.client.socket_manager: + logger.warning(f"{symbol} WebSocket未初始化,无法启动监控") + return + + try: + task = asyncio.create_task(self._monitor_position_price(symbol)) + self._monitor_tasks[symbol] = task + logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控") + except Exception as e: + logger.error(f"启动 {symbol} 监控失败: {e}") + + async def _stop_position_monitoring(self, symbol: str): + """ + 停止单个持仓的WebSocket监控 + + Args: + symbol: 交易对 + """ + if symbol not in self._monitor_tasks: + return + + task = self._monitor_tasks[symbol] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + del self._monitor_tasks[symbol] + logger.debug(f"已停止 {symbol} 的WebSocket监控") + + async def _monitor_position_price(self, symbol: str): + """ + 监控单个持仓的价格(WebSocket实时推送) + + Args: + symbol: 交易对 + """ + retry_count = 0 + max_retries = 5 + + while retry_count < max_retries: + try: + if symbol not in self.active_positions: + logger.info(f"{symbol} 持仓已不存在,停止监控") + break + + # 使用WebSocket订阅价格流 + async with self.client.socket_manager.futures_socket(symbol.lower()) as stream: + logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新") + retry_count = 0 # 连接成功,重置重试计数 + + async for msg in stream: + if symbol not in self.active_positions: + logger.info(f"{symbol} 持仓已不存在,停止监控") + break + + if 'data' in msg: + try: + current_price = float(msg['data']['c']) # 最新价格 + # 立即检查止损止盈 + await self._check_single_position(symbol, current_price) + except (KeyError, ValueError, TypeError) as e: + logger.debug(f"{symbol} 解析价格数据失败: {e}") + continue + + except asyncio.CancelledError: + logger.info(f"{symbol} 监控任务已取消") + break + except Exception as e: + retry_count += 1 + logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}") + + if retry_count < max_retries: + # 指数退避重试 + wait_time = min(2 ** retry_count, 30) + logger.info(f"{symbol} {wait_time}秒后重试连接...") + await asyncio.sleep(wait_time) + else: + logger.error(f"{symbol} WebSocket监控失败,已达到最大重试次数") + # 回退到定时检查模式 + logger.info(f"{symbol} 将使用定时检查模式(非实时)") + break + + # 清理任务 + if symbol in self._monitor_tasks: + del self._monitor_tasks[symbol] + + async def _check_single_position(self, symbol: str, current_price: float): + """ + 检查单个持仓的止损止盈(实时检查) + + Args: + symbol: 交易对 + current_price: 当前价格 + """ + position_info = self.active_positions.get(symbol) + if not position_info: + return + + entry_price = position_info['entryPrice'] + quantity = position_info['quantity'] + + # 计算当前盈亏 + if position_info['side'] == 'BUY': + pnl_percent = ((current_price - entry_price) / entry_price) * 100 + else: # SELL + pnl_percent = ((entry_price - current_price) / entry_price) * 100 + + # 更新最大盈利 + if pnl_percent > position_info.get('maxProfit', 0): + position_info['maxProfit'] = pnl_percent + + # 移动止损逻辑(盈利后保护利润) + use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True) + if use_trailing: + trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) + trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) + + if not position_info.get('trailingStopActivated', False): + # 盈利超过阈值后,激活移动止损 + if pnl_percent > trailing_activation * 100: + position_info['trailingStopActivated'] = True + # 将止损移至成本价(保本) + position_info['stopLoss'] = entry_price + logger.info( + f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} " + f"(盈利: {pnl_percent:.2f}%)" + ) + else: + # 盈利超过2%后,止损移至保护利润位 + if pnl_percent > 2.0: + if position_info['side'] == 'BUY': + new_stop_loss = entry_price * (1 + trailing_protect) + if new_stop_loss > position_info['stopLoss']: + position_info['stopLoss'] = new_stop_loss + logger.info( + f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} " + f"(保护{trailing_protect*100:.1f}%利润)" + ) + else: # SELL + new_stop_loss = entry_price * (1 - trailing_protect) + if new_stop_loss < position_info['stopLoss']: + position_info['stopLoss'] = new_stop_loss + logger.info( + f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} " + f"(保护{trailing_protect*100:.1f}%利润)" + ) + + # 检查止损 + stop_loss = position_info['stopLoss'] + should_close = False + exit_reason = None + + if position_info['side'] == 'BUY' and current_price <= stop_loss: + should_close = True + exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' + logger.warning( + f"{symbol} [实时监控] 触发止损: {current_price:.4f} <= {stop_loss:.4f} " + f"(盈亏: {pnl_percent:.2f}%)" + ) + elif position_info['side'] == 'SELL' and current_price >= stop_loss: + should_close = True + exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' + logger.warning( + f"{symbol} [实时监控] 触发止损: {current_price:.4f} >= {stop_loss:.4f} " + f"(盈亏: {pnl_percent:.2f}%)" + ) + + # 检查止盈 + if not should_close: + take_profit = position_info['takeProfit'] + if position_info['side'] == 'BUY' and current_price >= take_profit: + should_close = True + exit_reason = 'take_profit' + logger.info( + f"{symbol} [实时监控] 触发止盈: {current_price:.4f} >= {take_profit:.4f} " + f"(盈亏: {pnl_percent:.2f}%)" + ) + elif position_info['side'] == 'SELL' and current_price <= take_profit: + should_close = True + exit_reason = 'take_profit' + logger.info( + f"{symbol} [实时监控] 触发止盈: {current_price:.4f} <= {take_profit:.4f} " + f"(盈亏: {pnl_percent:.2f}%)" + ) + + # 如果触发止损止盈,执行平仓 + if should_close: + # 更新数据库 + if DB_AVAILABLE and Trade: + trade_id = position_info.get('tradeId') + if trade_id: + try: + if position_info['side'] == 'BUY': + pnl = (current_price - entry_price) * quantity + else: # SELL + pnl = (entry_price - current_price) * quantity + + Trade.update_exit( + trade_id=trade_id, + exit_price=current_price, + exit_reason=exit_reason, + pnl=pnl, + pnl_percent=pnl_percent + ) + except Exception as e: + logger.warning(f"更新{exit_reason}记录失败: {e}") + + # 执行平仓(注意:这里会停止监控,所以先更新数据库) + await self.close_position(symbol) \ No newline at end of file diff --git a/trading_system/strategy.py b/trading_system/strategy.py index 438f29f..bf69866 100644 --- a/trading_system/strategy.py +++ b/trading_system/strategy.py @@ -44,6 +44,7 @@ class TradingStrategy: self.risk_manager = risk_manager self.position_manager = position_manager self.running = False + self._monitoring_started = False # 是否已启动监控 async def execute_strategy(self): """ @@ -52,6 +53,14 @@ class TradingStrategy: self.running = True logger.info("交易策略开始执行...") + # 启动所有现有持仓的WebSocket实时监控 + if not self._monitoring_started: + try: + await self.position_manager.start_all_position_monitoring() + self._monitoring_started = True + except Exception as e: + logger.warning(f"启动持仓监控失败: {e},将使用定时检查模式") + try: while self.running: # 1. 扫描市场,找出涨跌幅最大的前N个货币对 @@ -138,14 +147,18 @@ class TradingStrategy: if position: logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})") + # 开仓成功后,WebSocket监控会在position_manager中自动启动 else: logger.warning(f"{symbol} 开仓失败") # 避免同时处理太多交易对 await asyncio.sleep(1) - # 3. 检查止损止盈 - await self.position_manager.check_stop_loss_take_profit() + # 3. 检查止损止盈(作为备用检查,WebSocket实时监控是主要方式) + # 注意:如果启用了WebSocket实时监控,这里主要是作为备用检查 + closed = await self.position_manager.check_stop_loss_take_profit() + if closed: + logger.info(f"定时检查触发平仓: {', '.join(closed)}") # 4. 打印持仓摘要并记录账户快照 summary = await self.position_manager.get_position_summary() @@ -197,6 +210,11 @@ class TradingStrategy: logger.error(f"策略执行出错: {e}", exc_info=True) finally: self.running = False + # 停止所有持仓的WebSocket监控 + try: + await self.position_manager.stop_all_position_monitoring() + except Exception as e: + logger.warning(f"停止持仓监控时出错: {e}") logger.info("交易策略已停止") async def _check_volume_confirmation(self, symbol_info: Dict) -> bool: