""" 仓位管理模块 - 管理持仓和订单 """ import asyncio import logging import json import aiohttp from typing import Dict, List, Optional try: from .binance_client import BinanceClient from .risk_manager import RiskManager from . import config except ImportError: from binance_client import BinanceClient from risk_manager import RiskManager import config logger = logging.getLogger(__name__) # 尝试导入数据库模型(如果可用) DB_AVAILABLE = False Trade = None try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import Trade DB_AVAILABLE = True logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库") else: logger.warning("⚠ backend目录不存在,无法使用数据库功能") DB_AVAILABLE = False except ImportError as e: logger.warning(f"⚠ 无法导入数据库模型: {e}") logger.warning(" 交易记录将不会保存到数据库") DB_AVAILABLE = False except Exception as e: logger.warning(f"⚠ 数据库初始化失败: {e}") logger.warning(" 交易记录将不会保存到数据库") DB_AVAILABLE = False class PositionManager: """仓位管理类""" def __init__(self, client: BinanceClient, risk_manager: RiskManager): """ 初始化仓位管理器 Args: client: 币安客户端 risk_manager: 风险管理器 """ self.client = client self.risk_manager = risk_manager self.active_positions: Dict[str, Dict] = {} self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典 self._monitoring_enabled = True # 是否启用实时监控 async def open_position( self, symbol: str, change_percent: float, leverage: int = 10, trade_direction: Optional[str] = None, entry_reason: str = '', atr: Optional[float] = None, klines: Optional[List] = None, bollinger: Optional[Dict] = None ) -> Optional[Dict]: """ 开仓 Args: symbol: 交易对 change_percent: 涨跌幅百分比 leverage: 杠杆倍数 Returns: 订单信息,失败返回None """ try: # 判断是否应该交易 if not await self.risk_manager.should_trade(symbol, change_percent): return None # 设置杠杆 await self.client.set_leverage(symbol, leverage) # 计算仓位大小(传入实际使用的杠杆) logger.info(f"开始为 {symbol} 计算仓位大小...") quantity = await self.risk_manager.calculate_position_size( symbol, change_percent, leverage=leverage ) if quantity is None: logger.warning(f"❌ {symbol} 仓位计算失败,跳过交易") logger.warning(f" 可能原因:") logger.warning(f" 1. 账户余额不足") logger.warning(f" 2. 单笔仓位超过限制") logger.warning(f" 3. 总仓位超过限制") logger.warning(f" 4. 无法获取价格数据") logger.warning(f" 5. 保证金不足最小要求(MIN_MARGIN_USDT)") return None logger.info(f"✓ {symbol} 仓位计算成功: {quantity:.4f}") # 确定交易方向(优先使用技术指标信号) if trade_direction: side = trade_direction else: side = 'BUY' if change_percent > 0 else 'SELL' # 获取当前价格 ticker = await self.client.get_ticker_24h(symbol) if not ticker: return None entry_price = ticker['price'] # 获取K线数据用于动态止损计算(从symbol_info中获取,如果可用) klines = None bollinger = None if 'klines' in locals() or 'symbol_info' in locals(): # 尝试从外部传入的symbol_info获取 pass # 计算基于支撑/阻力的动态止损 # 优先使用技术结构(支撑/阻力位、布林带) # 如果无法获取K线数据,回退到ATR或固定止损 if not klines: # 如果没有传入K线数据,尝试获取 try: primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') klines_data = await self.client.get_klines( symbol=symbol, interval=primary_interval, limit=20 # 获取最近20根K线用于计算支撑/阻力 ) klines = klines_data if len(klines_data) >= 10 else None except Exception as e: logger.debug(f"获取K线数据失败,使用固定止损: {e}") klines = None # 使用基于支撑/阻力的动态止损 if klines or bollinger or atr: stop_loss_price = self.risk_manager.get_stop_loss_price( entry_price, side, klines=klines, bollinger=bollinger, atr=atr ) # 计算动态止损百分比(用于计算止盈) if side == 'BUY': stop_loss_pct = (entry_price - stop_loss_price) / entry_price else: stop_loss_pct = (stop_loss_price - entry_price) / entry_price # 止盈为止损的2-2.5倍(提高盈亏比) take_profit_pct = stop_loss_pct * 2.5 take_profit_price = self.risk_manager.get_take_profit_price( entry_price, side, take_profit_pct=take_profit_pct ) else: # 回退到固定止损止盈 stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side) take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side) # 下单 order = await self.client.place_order( symbol=symbol, side=side, quantity=quantity, order_type='MARKET' ) if order: # 获取开仓订单号 entry_order_id = order.get('orderId') if entry_order_id: logger.info(f"{symbol} [开仓] 币安订单号: {entry_order_id}") # 等待订单成交,然后从币安获取实际成交价格 actual_entry_price = None try: # 等待一小段时间让订单成交 await asyncio.sleep(1) # 从币安获取订单详情,获取实际成交价格 try: order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=entry_order_id) if order_info: # 优先使用平均成交价格(avgPrice),如果没有则使用价格字段 actual_entry_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0)) if actual_entry_price > 0: logger.info(f"{symbol} [开仓] 从币安订单获取实际成交价格: {actual_entry_price:.4f} USDT") else: # 如果订单还没有完全成交,尝试从成交记录获取 if order_info.get('status') == 'FILLED' and order_info.get('fills'): # 计算加权平均成交价格 total_qty = 0 total_value = 0 for fill in order_info.get('fills', []): qty = float(fill.get('qty', 0)) price = float(fill.get('price', 0)) total_qty += qty total_value += qty * price if total_qty > 0: actual_entry_price = total_value / total_qty logger.info(f"{symbol} [开仓] 从成交记录计算平均成交价格: {actual_entry_price:.4f} USDT") except Exception as order_error: logger.warning(f"{symbol} [开仓] 获取订单详情失败: {order_error},使用备用方法") # 如果无法从订单获取价格,使用当前价格作为备用 if not actual_entry_price or actual_entry_price <= 0: ticker = await self.client.get_ticker_24h(symbol) if ticker: actual_entry_price = float(ticker['price']) logger.warning(f"{symbol} [开仓] 使用当前价格作为入场价格: {actual_entry_price:.4f} USDT") else: actual_entry_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0)) if actual_entry_price <= 0: logger.error(f"{symbol} [开仓] 无法获取入场价格,使用订单价格字段") actual_entry_price = float(order.get('price', 0)) or entry_price except Exception as price_error: logger.warning(f"{symbol} [开仓] 获取成交价格时出错: {price_error},使用当前价格") ticker = await self.client.get_ticker_24h(symbol) actual_entry_price = float(ticker['price']) if ticker else entry_price # 使用实际成交价格(如果获取成功) if actual_entry_price and actual_entry_price > 0: # 记录下单时的价格(用于对比) original_entry_price = entry_price entry_price = actual_entry_price logger.info(f"{symbol} [开仓] 使用实际成交价格: {entry_price:.4f} USDT (下单时价格: {original_entry_price:.4f})") # 记录到数据库(使用实际成交价格) trade_id = None if DB_AVAILABLE and Trade: try: logger.info(f"正在保存 {symbol} 交易记录到数据库...") trade_id = Trade.create( symbol=symbol, side=side, quantity=quantity, entry_price=entry_price, # 使用实际成交价格 leverage=leverage, entry_reason=entry_reason, entry_order_id=entry_order_id # 保存币安订单号 ) logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f})") except Exception as e: logger.error(f"❌ 保存交易记录到数据库失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") import traceback logger.error(f" 错误详情:\n{traceback.format_exc()}") elif not DB_AVAILABLE: logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录") elif not Trade: logger.warning(f"Trade模型未导入,无法保存 {symbol} 交易记录") # 计算分步止盈价格 # 第一目标:盈亏比1:1(保守,了结50%仓位) if side == 'BUY': take_profit_1 = entry_price + (entry_price - stop_loss_price) # 盈亏比1:1 else: take_profit_1 = entry_price - (stop_loss_price - entry_price) # 盈亏比1:1 # 第二目标:原始止盈价(激进,剩余50%仓位) take_profit_2 = take_profit_price # 记录持仓信息(包含动态止损止盈和分步止盈) position_info = { 'symbol': symbol, 'side': side, 'quantity': quantity, 'entryPrice': entry_price, 'changePercent': change_percent, 'orderId': order.get('orderId'), 'tradeId': trade_id, # 数据库交易ID 'stopLoss': stop_loss_price, 'takeProfit': take_profit_price, # 保留原始止盈价(第二目标) 'takeProfit1': take_profit_1, # 第一目标(盈亏比1:1,了结50%) 'takeProfit2': take_profit_2, # 第二目标(原始止盈价,剩余50%) 'partialProfitTaken': False, # 是否已部分止盈 'remainingQuantity': quantity, # 剩余仓位数量 'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损) 'leverage': leverage, 'entryReason': entry_reason, 'atr': atr, 'maxProfit': 0.0, # 记录最大盈利(用于移动止损) 'trailingStopActivated': False # 移动止损是否已激活 } self.active_positions[symbol] = position_info logger.info( f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} " f"(涨跌幅: {change_percent:.2f}%)" ) # 启动WebSocket实时监控 if self._monitoring_enabled: await self._start_position_monitoring(symbol) return position_info return None except Exception as e: logger.error(f"开仓失败 {symbol}: {e}") return None async def close_position(self, symbol: str, reason: str = 'manual') -> bool: """ 平仓 Args: symbol: 交易对 reason: 平仓原因(manual, stop_loss, take_profit, trailing_stop, sync) Returns: 是否成功 """ try: logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})") # 获取当前持仓 positions = await self.client.get_open_positions() position = next( (p for p in positions if p['symbol'] == symbol), None ) if not position: logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓") # 即使币安没有持仓,也要更新数据库状态 updated = False if DB_AVAILABLE and Trade and symbol in self.active_positions: position_info = self.active_positions[symbol] trade_id = position_info.get('tradeId') if trade_id: try: logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...") # 获取当前价格作为平仓价格 ticker = await self.client.get_ticker_24h(symbol) exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice']) # 确保所有值都是float类型 entry_price = float(position_info['entryPrice']) quantity = float(position_info['quantity']) if position_info['side'] == 'BUY': pnl = (exit_price - entry_price) * quantity pnl_percent = ((exit_price - entry_price) / entry_price) * 100 else: pnl = (entry_price - exit_price) * quantity pnl_percent = ((entry_price - exit_price) / entry_price) * 100 # 同步平仓时没有订单号,设为None Trade.update_exit( trade_id=trade_id, exit_price=exit_price, exit_reason=reason, pnl=pnl, pnl_percent=pnl_percent, exit_order_id=None # 同步平仓时没有订单号 ) logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新") updated = True except Exception as e: logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}") # 清理本地记录 await self._stop_position_monitoring(symbol) if symbol in self.active_positions: del self.active_positions[symbol] # 如果更新了数据库,返回成功;否则返回失败 return updated # 确定平仓方向(与开仓相反) position_amt = position['positionAmt'] side = 'SELL' if position_amt > 0 else 'BUY' quantity = abs(position_amt) logger.info( f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET " f"(持仓数量: {position_amt:.4f})" ) # 平仓(使用 reduceOnly=True 确保只减少持仓,不增加反向持仓) try: logger.debug(f"{symbol} [平仓] 调用 place_order: {side} {quantity:.4f} @ MARKET (reduceOnly=True)") order = await self.client.place_order( symbol=symbol, side=side, quantity=quantity, order_type='MARKET', reduce_only=True # 平仓时使用 reduceOnly=True ) logger.debug(f"{symbol} [平仓] place_order 返回: {order}") except Exception as order_error: logger.error(f"{symbol} [平仓] ❌ 下单失败: {order_error}") logger.error(f" 下单参数: symbol={symbol}, side={side}, quantity={quantity:.4f}, order_type=MARKET") logger.error(f" 错误类型: {type(order_error).__name__}") import traceback logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}") raise # 重新抛出异常,让外层捕获 if order: order_id = order.get('orderId') logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order_id})") # 等待订单成交,然后从币安获取实际成交价格 exit_price = None try: # 等待一小段时间让订单成交 await asyncio.sleep(1) # 从币安获取订单详情,获取实际成交价格 try: order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id) if order_info: # 优先使用平均成交价格(avgPrice),如果没有则使用价格字段 exit_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0)) if exit_price > 0: logger.info(f"{symbol} [平仓] 从币安订单获取实际成交价格: {exit_price:.4f} USDT") else: # 如果订单还没有完全成交,尝试从成交记录获取 if order_info.get('status') == 'FILLED' and order_info.get('fills'): # 计算加权平均成交价格 total_qty = 0 total_value = 0 for fill in order_info.get('fills', []): qty = float(fill.get('qty', 0)) price = float(fill.get('price', 0)) total_qty += qty total_value += qty * price if total_qty > 0: exit_price = total_value / total_qty logger.info(f"{symbol} [平仓] 从成交记录计算平均成交价格: {exit_price:.4f} USDT") except Exception as order_error: logger.warning(f"{symbol} [平仓] 获取订单详情失败: {order_error},使用备用方法") # 如果无法从订单获取价格,使用当前价格作为备用 if not exit_price or exit_price <= 0: ticker = await self.client.get_ticker_24h(symbol) if ticker: exit_price = float(ticker['price']) logger.warning(f"{symbol} [平仓] 使用当前价格作为平仓价格: {exit_price:.4f} USDT") else: exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0)) if exit_price <= 0: logger.error(f"{symbol} [平仓] 无法获取平仓价格,使用订单价格字段") exit_price = float(order.get('price', 0)) except Exception as price_error: logger.warning(f"{symbol} [平仓] 获取成交价格时出错: {price_error},使用当前价格") ticker = await self.client.get_ticker_24h(symbol) exit_price = float(ticker['price']) if ticker else float(order.get('price', 0)) # 更新数据库记录 if DB_AVAILABLE and Trade and symbol in self.active_positions: position_info = self.active_positions[symbol] trade_id = position_info.get('tradeId') if trade_id: try: logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...") # 计算盈亏(确保所有值都是float类型) entry_price = float(position_info['entryPrice']) quantity_float = float(quantity) exit_price_float = float(exit_price) if position_info['side'] == 'BUY': pnl = (exit_price_float - entry_price) * quantity_float pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100 else: # SELL pnl = (entry_price - exit_price_float) * quantity_float pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100 # 获取平仓订单号 exit_order_id = order.get('orderId') if exit_order_id: logger.info(f"{symbol} [平仓] 币安订单号: {exit_order_id}") Trade.update_exit( trade_id=trade_id, exit_price=exit_price_float, exit_reason=reason, pnl=pnl, pnl_percent=pnl_percent, exit_order_id=exit_order_id # 保存币安平仓订单号 ) logger.info( f"{symbol} [平仓] ✓ 数据库记录已更新 " f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})" ) except Exception as e: logger.error(f"❌ 更新平仓记录到数据库失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") import traceback logger.error(f" 错误详情:\n{traceback.format_exc()}") else: logger.warning(f"{symbol} 没有关联的数据库交易ID,无法更新平仓记录") elif not DB_AVAILABLE: logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录") elif not Trade: logger.warning(f"Trade模型未导入,无法更新 {symbol} 平仓记录") # 停止WebSocket监控 await self._stop_position_monitoring(symbol) # 移除持仓记录 if symbol in self.active_positions: del self.active_positions[symbol] logger.info( f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} " f"(原因: {reason})" ) return True else: # place_order 返回 None,说明下单失败 logger.error(f"{symbol} [平仓] ❌ 下单返回 None,可能的原因:") logger.error(f" 1. 订单名义价值不足(小于最小要求)") logger.error(f" 2. 数量精度调整后为 0 或负数") logger.error(f" 3. 无法获取价格信息") logger.error(f" 4. 其他下单错误(已在 place_order 中记录)") logger.error(f" 持仓信息: {side} {quantity:.4f} @ MARKET") # 尝试获取更多诊断信息 try: symbol_info = await self.client.get_symbol_info(symbol) ticker = await self.client.get_ticker_24h(symbol) if ticker: current_price = ticker['price'] notional_value = quantity * current_price min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0 logger.error(f" 当前价格: {current_price:.4f} USDT") logger.error(f" 订单名义价值: {notional_value:.2f} USDT") logger.error(f" 最小名义价值: {min_notional:.2f} USDT") if notional_value < min_notional: logger.error(f" ⚠ 订单名义价值不足,无法平仓") except Exception as diag_error: logger.warning(f" 无法获取诊断信息: {diag_error}") return False except Exception as e: logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}") logger.error(f" 错误类型: {type(e).__name__}") import traceback logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}") # 尝试清理本地记录(即使平仓失败) try: await self._stop_position_monitoring(symbol) if symbol in self.active_positions: del self.active_positions[symbol] logger.info(f"{symbol} [平仓] 已清理本地持仓记录") except Exception as cleanup_error: logger.warning(f"{symbol} [平仓] 清理本地记录时出错: {cleanup_error}") return False async def check_stop_loss_take_profit(self) -> List[str]: """ 检查止损止盈 Returns: 需要平仓的交易对列表 """ closed_positions = [] try: # 获取当前持仓 positions = await self.client.get_open_positions() position_dict = {p['symbol']: p for p in positions} for symbol, position_info in list(self.active_positions.items()): if symbol not in position_dict: # 持仓已不存在,移除记录 del self.active_positions[symbol] continue current_position = position_dict[symbol] entry_price = position_info['entryPrice'] quantity = position_info['quantity'] # 修复:获取quantity # 获取当前标记价格 current_price = current_position.get('markPrice', 0) if current_price == 0: # 如果标记价格为0,尝试从ticker获取 ticker = await self.client.get_ticker_24h(symbol) if ticker: current_price = ticker['price'] else: current_price = entry_price # 计算当前盈亏 if position_info['side'] == 'BUY': pnl_percent = ((current_price - entry_price) / entry_price) * 100 else: pnl_percent = ((entry_price - current_price) / entry_price) * 100 # 更新最大盈利 if pnl_percent > position_info.get('maxProfit', 0): position_info['maxProfit'] = pnl_percent # 移动止损逻辑(盈利后保护利润) use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True) if use_trailing: trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) if not position_info.get('trailingStopActivated', False): # 盈利超过阈值后,激活移动止损 if pnl_percent > trailing_activation * 100: position_info['trailingStopActivated'] = True # 将止损移至成本价(保本) position_info['stopLoss'] = entry_price logger.info( f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} " f"(盈利: {pnl_percent:.2f}%)" ) else: # 盈利超过2%后,止损移至保护利润位 if pnl_percent > 2.0: if position_info['side'] == 'BUY': new_stop_loss = entry_price * (1 + trailing_protect) if new_stop_loss > position_info['stopLoss']: position_info['stopLoss'] = new_stop_loss logger.info( f"{symbol} 移动止损更新: {new_stop_loss:.4f} " f"(保护{trailing_protect*100:.1f}%利润)" ) else: new_stop_loss = entry_price * (1 - trailing_protect) if new_stop_loss < position_info['stopLoss']: position_info['stopLoss'] = new_stop_loss logger.info( f"{symbol} 移动止损更新: {new_stop_loss:.4f} " f"(保护{trailing_protect*100:.1f}%利润)" ) # 检查止损(使用更新后的止损价) stop_loss = position_info.get('stopLoss') if stop_loss is None: logger.warning(f"{symbol} 止损价未设置,跳过止损检查") elif position_info['side'] == 'BUY' and current_price <= stop_loss: logger.warning( f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) # 确定平仓原因 exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止损记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue if stop_loss is not None and position_info['side'] == 'SELL' and current_price >= stop_loss: logger.warning( f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) # 确定平仓原因 exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止损记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue # 检查分步止盈 take_profit_1 = position_info.get('takeProfit1') # 第一目标(盈亏比1:1) take_profit_2 = position_info.get('takeProfit2', position_info.get('takeProfit')) # 第二目标 partial_profit_taken = position_info.get('partialProfitTaken', False) remaining_quantity = position_info.get('remainingQuantity', quantity) # 第一目标:盈亏比1:1,了结50%仓位 if not partial_profit_taken and take_profit_1 is not None: if position_info['side'] == 'BUY' and current_price >= take_profit_1: logger.info( f"{symbol} 触发第一目标止盈(盈亏比1:1): {current_price:.4f} >= {take_profit_1:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) # 部分平仓50% partial_quantity = quantity * 0.5 try: # 部分平仓 close_side = 'SELL' if position_info['side'] == 'BUY' else 'BUY' partial_order = await self.client.place_order( symbol=symbol, side=close_side, quantity=partial_quantity, order_type='MARKET' ) if partial_order: position_info['partialProfitTaken'] = True position_info['remainingQuantity'] = remaining_quantity - partial_quantity logger.info( f"{symbol} 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}" ) # 更新止损为成本价(保护剩余仓位) position_info['stopLoss'] = entry_price position_info['trailingStopActivated'] = True logger.info(f"{symbol} 剩余仓位止损移至成本价,配合移动止损博取更大利润") except Exception as e: logger.error(f"{symbol} 部分止盈失败: {e}") elif take_profit_1 is not None and position_info['side'] == 'SELL' and current_price <= take_profit_1: logger.info( f"{symbol} 触发第一目标止盈(盈亏比1:1): {current_price:.4f} <= {take_profit_1:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) # 部分平仓50% partial_quantity = quantity * 0.5 try: # 部分平仓 partial_order = await self.client.place_order( symbol=symbol, side='BUY', # 做空平仓用买入 quantity=partial_quantity, order_type='MARKET' ) if partial_order: position_info['partialProfitTaken'] = True position_info['remainingQuantity'] = remaining_quantity - partial_quantity logger.info( f"{symbol} 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}" ) # 更新止损为成本价(保护剩余仓位) position_info['stopLoss'] = entry_price position_info['trailingStopActivated'] = True logger.info(f"{symbol} 剩余仓位止损移至成本价,配合移动止损博取更大利润") except Exception as e: logger.error(f"{symbol} 部分止盈失败: {e}") # 第二目标:原始止盈价,平掉剩余仓位 if partial_profit_taken and take_profit_2 is not None: if position_info['side'] == 'BUY' and current_price >= take_profit_2: logger.info( f"{symbol} 触发第二目标止盈: {current_price:.4f} >= {take_profit_2:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) exit_reason = 'take_profit' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止盈记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue elif take_profit_2 is not None and position_info['side'] == 'SELL' and current_price <= take_profit_2: logger.info( f"{symbol} 触发第二目标止盈: {current_price:.4f} <= {take_profit_2:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) exit_reason = 'take_profit' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止盈记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue else: # 如果未部分止盈,但达到第二目标,直接全部平仓 take_profit = position_info.get('takeProfit') if take_profit is not None and position_info['side'] == 'BUY' and current_price >= take_profit: logger.info( f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) exit_reason = 'take_profit' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止盈记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue if take_profit is not None and position_info['side'] == 'SELL' and current_price <= take_profit: logger.info( f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) exit_reason = 'take_profit' # 更新数据库 if DB_AVAILABLE: trade_id = position_info.get('tradeId') if trade_id: try: Trade.update_exit( trade_id=trade_id, exit_price=current_price, exit_reason=exit_reason, pnl=pnl_percent * entry_price * quantity / 100, pnl_percent=pnl_percent ) except Exception as e: logger.warning(f"更新止盈记录失败: {e}") if await self.close_position(symbol, reason=exit_reason): closed_positions.append(symbol) continue except Exception as e: logger.error(f"检查止损止盈失败: {e}") return closed_positions async def get_position_summary(self) -> Dict: """ 获取持仓摘要 Returns: 持仓摘要信息 """ try: positions = await self.client.get_open_positions() balance = await self.client.get_account_balance() total_pnl = sum(p['unRealizedProfit'] for p in positions) return { 'totalPositions': len(positions), 'totalBalance': balance.get('total', 0), 'availableBalance': balance.get('available', 0), 'totalPnL': total_pnl, 'positions': [ { 'symbol': p['symbol'], 'positionAmt': p['positionAmt'], 'entryPrice': p['entryPrice'], 'pnl': p['unRealizedProfit'] } for p in positions ] } except Exception as e: logger.error(f"获取持仓摘要失败: {e}") return {} async def sync_positions_with_binance(self): """ 同步币安实际持仓状态与数据库状态 检查哪些持仓在数据库中还是open状态,但在币安已经不存在了 """ if not DB_AVAILABLE or not Trade: logger.debug("数据库不可用,跳过持仓状态同步") return try: logger.info("开始同步币安持仓状态与数据库...") # 1. 获取币安实际持仓 binance_positions = await self.client.get_open_positions() binance_symbols = {p['symbol'] for p in binance_positions} logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})") # 2. 获取数据库中状态为open的交易记录 db_open_trades = Trade.get_all(status='open') db_open_symbols = {t['symbol'] for t in db_open_trades} logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else '无'})") # 3. 找出在数据库中open但在币安已不存在的持仓 missing_in_binance = db_open_symbols - binance_symbols if missing_in_binance: logger.warning( f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态,但在币安已不存在: " f"{', '.join(missing_in_binance)}" ) # 4. 更新这些持仓的状态 for symbol in missing_in_binance: trades = Trade.get_by_symbol(symbol, status='open') for trade in trades: trade_id = trade['id'] try: logger.info(f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})...") # 尝试从币安历史订单获取实际平仓价格 exit_price = None try: # 获取最近的平仓订单(reduceOnly=True的订单) import time end_time = int(time.time() * 1000) # 当前时间(毫秒) start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天 # 获取历史订单 orders = await self.client.client.futures_get_all_orders( symbol=symbol, startTime=start_time, endTime=end_time ) # 查找最近的平仓订单(reduceOnly=True且已成交) close_orders = [ o for o in orders if o.get('reduceOnly') == True and o.get('status') == 'FILLED' ] if close_orders: # 按时间倒序排序,取最近的 close_orders.sort(key=lambda x: x.get('updateTime', 0), reverse=True) latest_order = close_orders[0] # 获取平均成交价格 exit_price = float(latest_order.get('avgPrice', 0)) if exit_price > 0: logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT") except Exception as order_error: logger.debug(f"{symbol} [状态同步] 获取历史订单失败: {order_error}") # 如果无法从订单获取,使用当前价格 if not exit_price or exit_price <= 0: ticker = await self.client.get_ticker_24h(symbol) exit_price = float(ticker['price']) if ticker else float(trade['entry_price']) logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT") # 计算盈亏(确保所有值都是float类型,避免Decimal类型问题) entry_price = float(trade['entry_price']) quantity = float(trade['quantity']) if trade['side'] == 'BUY': pnl = (exit_price - entry_price) * quantity pnl_percent = ((exit_price - entry_price) / entry_price) * 100 else: # SELL pnl = (entry_price - exit_price) * quantity pnl_percent = ((entry_price - exit_price) / entry_price) * 100 # 从历史订单中获取平仓订单号 exit_order_id = None if close_orders: exit_order_id = close_orders[0].get('orderId') if exit_order_id: logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}") Trade.update_exit( trade_id=trade_id, exit_price=exit_price, exit_reason='sync', # 标记为同步更新 pnl=pnl, pnl_percent=pnl_percent, exit_order_id=exit_order_id # 保存币安平仓订单号 ) logger.info( f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, " f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)" ) # 清理本地记录 if symbol in self.active_positions: await self._stop_position_monitoring(symbol) del self.active_positions[symbol] logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录") except Exception as e: logger.error(f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): {e}") else: logger.info("✓ 持仓状态同步完成,数据库与币安状态一致") # 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的) missing_in_db = binance_symbols - db_open_symbols if missing_in_db: logger.info( f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: " f"{', '.join(missing_in_db)} (可能是手动开仓)" ) # 为手动开仓的持仓创建数据库记录并启动监控 for symbol in missing_in_db: try: # 获取币安持仓详情 binance_position = next( (p for p in binance_positions if p['symbol'] == symbol), None ) if not binance_position: continue position_amt = binance_position['positionAmt'] entry_price = binance_position['entryPrice'] quantity = abs(position_amt) side = 'BUY' if position_amt > 0 else 'SELL' logger.info( f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... " f"({side} {quantity:.4f} @ {entry_price:.4f})" ) # 创建数据库记录 trade_id = Trade.create( symbol=symbol, side=side, quantity=quantity, entry_price=entry_price, leverage=binance_position.get('leverage', 10), entry_reason='manual_entry' # 标记为手动开仓 ) logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})") # 创建本地持仓记录(用于监控) ticker = await self.client.get_ticker_24h(symbol) current_price = ticker['price'] if ticker else entry_price # 计算止损止盈 stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side) take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side) position_info = { 'symbol': symbol, 'side': side, 'quantity': quantity, 'entryPrice': entry_price, 'changePercent': 0, # 手动开仓,无法计算涨跌幅 'orderId': None, 'tradeId': trade_id, 'stopLoss': stop_loss_price, 'takeProfit': take_profit_price, 'initialStopLoss': stop_loss_price, 'leverage': binance_position.get('leverage', 10), 'entryReason': 'manual_entry', 'atr': None, 'maxProfit': 0.0, 'trailingStopActivated': False } self.active_positions[symbol] = position_info # 启动WebSocket监控 if self._monitoring_enabled: await self._start_position_monitoring(symbol) logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控") logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成") except Exception as e: logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}") import traceback logger.error(f" 错误详情:\n{traceback.format_exc()}") logger.info("持仓状态同步完成") except Exception as e: logger.error(f"同步持仓状态失败: {e}") import traceback logger.error(f"错误详情:\n{traceback.format_exc()}") async def start_all_position_monitoring(self): """ 启动所有持仓的WebSocket实时监控 """ if not self._monitoring_enabled: logger.info("实时监控已禁用,跳过启动") return # WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager if not self.client: logger.warning("客户端未初始化,无法启动实时监控") return # 获取当前所有持仓 positions = await self.client.get_open_positions() binance_symbols = {p['symbol'] for p in positions} active_symbols = set(self.active_positions.keys()) logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})") logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else '无'})") # 为所有币安持仓启动监控(即使不在active_positions中,可能是手动开仓的) for position in positions: symbol = position['symbol'] if symbol not in self._monitor_tasks: # 如果不在active_positions中,先创建记录 if symbol not in self.active_positions: logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...") # 这里会通过sync_positions_with_binance来处理,但先启动监控 try: entry_price = position.get('entryPrice', 0) position_amt = position['positionAmt'] quantity = abs(position_amt) side = 'BUY' if position_amt > 0 else 'SELL' # 创建临时记录用于监控 ticker = await self.client.get_ticker_24h(symbol) current_price = ticker['price'] if ticker else entry_price stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side) take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side) position_info = { 'symbol': symbol, 'side': side, 'quantity': quantity, 'entryPrice': entry_price, 'changePercent': 0, 'orderId': None, 'tradeId': None, 'stopLoss': stop_loss_price, 'takeProfit': take_profit_price, 'initialStopLoss': stop_loss_price, 'leverage': position.get('leverage', 10), 'entryReason': 'manual_entry_temp', 'atr': None, 'maxProfit': 0.0, 'trailingStopActivated': False } self.active_positions[symbol] = position_info logger.info(f"{symbol} 已创建临时持仓记录用于监控") except Exception as e: logger.error(f"{symbol} 创建临时持仓记录失败: {e}") await self._start_position_monitoring(symbol) logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控") async def stop_all_position_monitoring(self): """ 停止所有持仓的WebSocket监控 """ symbols = list(self._monitor_tasks.keys()) for symbol in symbols: await self._stop_position_monitoring(symbol) logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)") async def _start_position_monitoring(self, symbol: str): """ 启动单个持仓的WebSocket价格监控 Args: symbol: 交易对 """ if symbol in self._monitor_tasks: logger.debug(f"{symbol} 监控任务已存在,跳过") return # WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager if not self.client: logger.warning(f"{symbol} 客户端未初始化,无法启动监控") return try: task = asyncio.create_task(self._monitor_position_price(symbol)) self._monitor_tasks[symbol] = task logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控") except Exception as e: logger.error(f"启动 {symbol} 监控失败: {e}") async def _stop_position_monitoring(self, symbol: str): """ 停止单个持仓的WebSocket监控 Args: symbol: 交易对 """ if symbol not in self._monitor_tasks: return task = self._monitor_tasks[symbol] if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass del self._monitor_tasks[symbol] logger.debug(f"已停止 {symbol} 的WebSocket监控") async def _monitor_position_price(self, symbol: str): """ 监控单个持仓的价格(WebSocket实时推送) Args: symbol: 交易对 """ retry_count = 0 max_retries = 5 while retry_count < max_retries: try: if symbol not in self.active_positions: logger.info(f"{symbol} 持仓已不存在,停止监控") break # 使用WebSocket订阅价格流 # 直接使用 aiohttp 连接 Binance 期货 WebSocket API # 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams # 端点:wss://fstream.binance.com/ws/@ticker ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker" async with aiohttp.ClientSession() as session: async with session.ws_connect(ws_url) as ws: logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新") retry_count = 0 # 连接成功,重置重试计数 async for msg in ws: if symbol not in self.active_positions: logger.info(f"{symbol} 持仓已不存在,停止监控") break if msg.type == aiohttp.WSMsgType.TEXT: try: # 解析 JSON 消息 data = json.loads(msg.data) # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} # 根据文档,ticker 流包含 'c' 字段(最后价格) if isinstance(data, dict): if 'c' in data: # 'c' 是当前价格 current_price = float(data['c']) # 立即检查止损止盈 await self._check_single_position(symbol, current_price) elif 'data' in data: # 兼容组合流格式(如果使用 /stream 端点) if isinstance(data['data'], dict) and 'c' in data['data']: current_price = float(data['data']['c']) await self._check_single_position(symbol, current_price) except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e: logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}") continue elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning(f"{symbol} WebSocket错误: {ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSE: logger.info(f"{symbol} WebSocket连接关闭") break except asyncio.CancelledError: logger.info(f"{symbol} 监控任务已取消") break except Exception as e: retry_count += 1 logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}") if retry_count < max_retries: # 指数退避重试 wait_time = min(2 ** retry_count, 30) logger.info(f"{symbol} {wait_time}秒后重试连接...") await asyncio.sleep(wait_time) else: logger.error(f"{symbol} WebSocket监控失败,已达到最大重试次数") # 回退到定时检查模式 logger.info(f"{symbol} 将使用定时检查模式(非实时)") break # 清理任务 if symbol in self._monitor_tasks: del self._monitor_tasks[symbol] async def _check_single_position(self, symbol: str, current_price: float): """ 检查单个持仓的止损止盈(实时检查) Args: symbol: 交易对 current_price: 当前价格 """ position_info = self.active_positions.get(symbol) if not position_info: return # 确保所有值都是float类型 entry_price = float(position_info['entryPrice']) quantity = float(position_info['quantity']) current_price_float = float(current_price) # 计算当前盈亏 if position_info['side'] == 'BUY': pnl_percent = ((current_price_float - entry_price) / entry_price) * 100 else: # SELL pnl_percent = ((entry_price - current_price_float) / entry_price) * 100 # 更新最大盈利 if pnl_percent > position_info.get('maxProfit', 0): position_info['maxProfit'] = pnl_percent # 移动止损逻辑(盈利后保护利润) use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True) if use_trailing: trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) if not position_info.get('trailingStopActivated', False): # 盈利超过阈值后,激活移动止损 if pnl_percent > trailing_activation * 100: position_info['trailingStopActivated'] = True # 将止损移至成本价(保本) position_info['stopLoss'] = entry_price logger.info( f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} " f"(盈利: {pnl_percent:.2f}%)" ) else: # 盈利超过2%后,止损移至保护利润位 if pnl_percent > 2.0: if position_info['side'] == 'BUY': new_stop_loss = entry_price * (1 + trailing_protect) if new_stop_loss > position_info['stopLoss']: position_info['stopLoss'] = new_stop_loss logger.info( f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} " f"(保护{trailing_protect*100:.1f}%利润)" ) else: # SELL new_stop_loss = entry_price * (1 - trailing_protect) if new_stop_loss < position_info['stopLoss']: position_info['stopLoss'] = new_stop_loss logger.info( f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} " f"(保护{trailing_protect*100:.1f}%利润)" ) # 检查止损 stop_loss = position_info['stopLoss'] should_close = False exit_reason = None if position_info['side'] == 'BUY' and current_price <= stop_loss: should_close = True exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' logger.warning( f"{symbol} [实时监控] 触发止损: {current_price:.4f} <= {stop_loss:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) elif position_info['side'] == 'SELL' and current_price >= stop_loss: should_close = True exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss' logger.warning( f"{symbol} [实时监控] 触发止损: {current_price:.4f} >= {stop_loss:.4f} " f"(盈亏: {pnl_percent:.2f}%)" ) # 检查止盈 if not should_close: take_profit = float(position_info['takeProfit']) # 计算止盈百分比(用于诊断) if position_info['side'] == 'BUY': take_profit_pct = ((take_profit - entry_price) / entry_price) * 100 else: # SELL take_profit_pct = ((entry_price - take_profit) / entry_price) * 100 # 每5%盈利记录一次诊断日志(帮助排查问题) # 使用更宽松的条件,避免因为浮点数精度问题导致日志不输出 if pnl_percent >= 5.0: # 每5%记录一次,但允许一些容差 should_log = (int(pnl_percent) % 5 == 0) or (pnl_percent >= 10.0 and pnl_percent < 10.5) if should_log: trigger_condition = current_price_float >= take_profit if position_info['side'] == 'BUY' else current_price_float <= take_profit logger.warning( f"{symbol} [实时监控] 诊断: 盈利{pnl_percent:.2f}% | " f"当前价: {current_price_float:.4f} | " f"入场价: {entry_price:.4f} | " f"止盈价: {take_profit:.4f} ({take_profit_pct:.2f}%) | " f"方向: {position_info['side']} | " f"是否触发: {trigger_condition} | " f"价格差: {abs(current_price_float - take_profit):.4f} | " f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}" ) # 如果盈利超过止盈目标但未触发,记录警告 if pnl_percent > take_profit_pct and not trigger_condition: logger.error( f"{symbol} [实时监控] ⚠️ 异常: 盈利{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!" ) price_diff = current_price_float - take_profit if position_info['side'] == 'BUY' else take_profit - current_price_float logger.error(f" 当前价: {current_price_float:.4f}, 止盈价: {take_profit:.4f}, 价格差: {price_diff:.4f}") if position_info['side'] == 'BUY' and current_price_float >= take_profit: should_close = True exit_reason = 'take_profit' logger.info( f"{symbol} [实时监控] 触发止盈: {current_price_float:.4f} >= {take_profit:.4f} " f"(盈亏: {pnl_percent:.2f}%, 止盈目标: {take_profit_pct:.2f}%)" ) elif position_info['side'] == 'SELL' and current_price_float <= take_profit: should_close = True exit_reason = 'take_profit' logger.info( f"{symbol} [实时监控] 触发止盈: {current_price_float:.4f} <= {take_profit:.4f} " f"(盈亏: {pnl_percent:.2f}%, 止盈目标: {take_profit_pct:.2f}%)" ) # 如果触发止损止盈,执行平仓 if should_close: logger.info( f"{symbol} [自动平仓] 开始执行平仓操作 | " f"原因: {exit_reason} | " f"入场价: {entry_price:.4f} | " f"当前价: {current_price:.4f} | " f"盈亏: {pnl_percent:.2f}% | " f"数量: {quantity:.4f}" ) # 更新数据库 if DB_AVAILABLE and Trade: trade_id = position_info.get('tradeId') if trade_id: try: if position_info['side'] == 'BUY': pnl = (current_price_float - entry_price) * quantity else: # SELL pnl = (entry_price - current_price_float) * quantity logger.info(f"{symbol} [自动平仓] 更新数据库记录 (ID: {trade_id})...") Trade.update_exit( trade_id=trade_id, exit_price=current_price_float, exit_reason=exit_reason, pnl=pnl, pnl_percent=pnl_percent ) logger.info(f"{symbol} [自动平仓] ✓ 数据库记录已更新 (盈亏: {pnl:.2f} USDT)") except Exception as e: logger.error(f"{symbol} [自动平仓] ❌ 更新数据库记录失败: {e}") import traceback logger.error(f" 错误详情:\n{traceback.format_exc()}") # 执行平仓(注意:这里会停止监控,所以先更新数据库) logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...") success = await self.close_position(symbol, reason=exit_reason) if success: logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成") else: logger.error(f"{symbol} [自动平仓] ❌ 平仓失败") async def diagnose_position(self, symbol: str): """ 诊断持仓状态(用于排查为什么没有自动平仓) Args: symbol: 交易对 """ try: logger.info(f"{symbol} [诊断] 开始诊断持仓状态...") # 1. 检查是否在active_positions中 if symbol not in self.active_positions: logger.warning(f"{symbol} [诊断] ❌ 不在本地持仓记录中 (active_positions)") logger.warning(f" 可能原因: 手动开仓或系统重启后未同步") logger.warning(f" 解决方案: 等待下次状态同步或手动触发同步") else: position_info = self.active_positions[symbol] logger.info(f"{symbol} [诊断] ✓ 在本地持仓记录中") logger.info(f" 入场价: {position_info['entryPrice']:.4f}") logger.info(f" 方向: {position_info['side']}") logger.info(f" 数量: {position_info['quantity']:.4f}") logger.info(f" 止损价: {position_info['stopLoss']:.4f}") logger.info(f" 止盈价: {position_info['takeProfit']:.4f}") # 2. 检查WebSocket监控状态 if symbol in self._monitor_tasks: task = self._monitor_tasks[symbol] if task.done(): logger.warning(f"{symbol} [诊断] ⚠ WebSocket监控任务已结束") try: await task # 获取异常信息 except Exception as e: logger.warning(f" 任务异常: {e}") else: logger.info(f"{symbol} [诊断] ✓ WebSocket监控任务运行中") else: logger.warning(f"{symbol} [诊断] ❌ 没有WebSocket监控任务") logger.warning(f" 可能原因: 监控未启动或已停止") # 3. 获取币安实际持仓 positions = await self.client.get_open_positions() binance_position = next((p for p in positions if p['symbol'] == symbol), None) if not binance_position: logger.warning(f"{symbol} [诊断] ❌ 币安账户中没有持仓") return logger.info(f"{symbol} [诊断] ✓ 币安账户中有持仓") entry_price_binance = binance_position.get('entryPrice', 0) mark_price = binance_position.get('markPrice', 0) unrealized_pnl = binance_position.get('unRealizedProfit', 0) logger.info(f" 币安入场价: {entry_price_binance:.4f}") logger.info(f" 标记价格: {mark_price:.4f}") logger.info(f" 未实现盈亏: {unrealized_pnl:.2f} USDT") # 4. 计算实际盈亏 if symbol in self.active_positions: position_info = self.active_positions[symbol] entry_price = float(position_info['entryPrice']) take_profit = float(position_info['takeProfit']) if position_info['side'] == 'BUY': pnl_percent = ((mark_price - entry_price) / entry_price) * 100 take_profit_pct = ((take_profit - entry_price) / entry_price) * 100 should_trigger = mark_price >= take_profit else: # SELL pnl_percent = ((entry_price - mark_price) / entry_price) * 100 take_profit_pct = ((entry_price - take_profit) / entry_price) * 100 should_trigger = mark_price <= take_profit logger.info(f"{symbol} [诊断] 盈亏分析:") logger.info(f" 当前盈亏: {pnl_percent:.2f}%") logger.info(f" 止盈目标: {take_profit_pct:.2f}%") logger.info(f" 当前价: {mark_price:.4f}") logger.info(f" 止盈价: {take_profit:.4f}") logger.info(f" 价格差: {abs(mark_price - take_profit):.4f}") logger.info(f" 应该触发: {should_trigger}") if pnl_percent > take_profit_pct and not should_trigger: logger.error(f"{symbol} [诊断] ❌ 异常: 盈亏{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!") logger.error(f" 可能原因: 浮点数精度问题或止盈价格计算错误") logger.info(f"{symbol} [诊断] 诊断完成") except Exception as e: logger.error(f"{symbol} [诊断] 诊断失败: {e}") import traceback logger.error(f" 错误详情:\n{traceback.format_exc()}")