auto_trade_sys/trading_system/position_manager.py
薇薇安 7ec1ae32d7 a
2026-01-22 23:03:32 +08:00

2848 lines
162 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓位管理模块 - 管理持仓和订单
"""
import asyncio
import logging
import json
import aiohttp
import time
from typing import Dict, List, Optional
from datetime import datetime
try:
from .binance_client import BinanceClient
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型(如果可用)
DB_AVAILABLE = False
Trade = None
get_beijing_time = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade, get_beijing_time
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
# 如果没有导入get_beijing_time创建一个本地版本
if get_beijing_time is None:
from datetime import datetime, timezone, timedelta
BEIJING_TZ = timezone(timedelta(hours=8))
def get_beijing_time():
"""获取当前北京时间UTC+8"""
return datetime.now(BEIJING_TZ).replace(tzinfo=None)
class PositionManager:
"""仓位管理类"""
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
"""
初始化仓位管理器
Args:
client: 币安客户端
risk_manager: 风险管理器
"""
self.client = client
self.risk_manager = risk_manager
self.active_positions: Dict[str, Dict] = {}
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
self._monitoring_enabled = True # 是否启用实时监控
self._pending_entry_orders: Dict[str, Dict] = {} # 未成交的入场订单(避免重复挂单)
self._last_entry_attempt_ms: Dict[str, int] = {} # 每个symbol的最近一次尝试冷却/去抖)
# 自动平仓去抖/限流(避免止损触发后反复下单/刷屏)
self._last_auto_close_attempt_ms: Dict[str, int] = {}
self._last_auto_close_fail_log_ms: Dict[str, int] = {}
@staticmethod
def _pct_like_to_ratio(v: float) -> float:
"""
将“看起来像百分比”的值转换为比例0~1
兼容两种来源:
- 前端/后端按“比例”存储0.006 表示 0.6%
- 历史/默认值按“百分比数值”存储0.6 表示 0.6%
经验规则:
- v > 1: 认为是 60/100 这种百分数除以100
- 0.05 < v <= 1: 也更可能是“0.6% 这种写法”除以100
- v <= 0.05: 更可能已经是比例(<=5%
"""
try:
x = float(v or 0.0)
except Exception:
x = 0.0
if x <= 0:
return 0.0
if x > 1.0:
return x / 100.0
if x > 0.05:
return x / 100.0
return x
def _calc_limit_entry_price(self, current_price: float, side: str, offset_ratio: float) -> float:
"""根据当前价与偏移比例计算限价入场价BUY: 下方回调SELL: 上方回调)"""
try:
cp = float(current_price)
except Exception:
cp = 0.0
try:
off = float(offset_ratio or 0.0)
except Exception:
off = 0.0
if cp <= 0:
return 0.0
if (side or "").upper() == "BUY":
return cp * (1 - off)
return cp * (1 + off)
async def _wait_for_order_filled(
self,
symbol: str,
order_id: int,
timeout_sec: int = 30,
poll_sec: float = 1.0,
) -> Dict:
"""
等待订单成交FILLED返回
{ ok, status, avg_price, executed_qty, raw }
"""
deadline = time.time() + max(1, int(timeout_sec or 1))
last_status = None
while time.time() < deadline:
try:
info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
status = info.get("status")
last_status = status
if status == "FILLED":
avg_price = float(info.get("avgPrice", 0) or 0) or float(info.get("price", 0) or 0)
executed_qty = float(info.get("executedQty", 0) or 0)
return {"ok": True, "status": status, "avg_price": avg_price, "executed_qty": executed_qty, "raw": info}
if status in ("CANCELED", "REJECTED", "EXPIRED"):
return {"ok": False, "status": status, "avg_price": 0, "executed_qty": float(info.get("executedQty", 0) or 0), "raw": info}
except Exception:
# 忽略单次失败,继续轮询
pass
await asyncio.sleep(max(0.2, float(poll_sec or 1.0)))
return {"ok": False, "status": last_status or "TIMEOUT", "avg_price": 0, "executed_qty": 0, "raw": None}
async def open_position(
self,
symbol: str,
change_percent: float,
leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
signal_strength: Optional[int] = None,
market_regime: Optional[str] = None,
trend_4h: Optional[str] = None,
atr: Optional[float] = None,
klines: Optional[List] = None,
bollinger: Optional[Dict] = None
) -> Optional[Dict]:
"""
开仓
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
leverage: 杠杆倍数
Returns:
订单信息失败返回None
"""
try:
# 0) 防止同一 symbol 重复挂入场单/快速重复尝试(去抖 + 冷却)
now_ms = int(time.time() * 1000)
cooldown_sec = int(config.TRADING_CONFIG.get("ENTRY_SYMBOL_COOLDOWN_SEC", 120) or 0)
last_ms = self._last_entry_attempt_ms.get(symbol)
if last_ms and cooldown_sec > 0 and now_ms - last_ms < cooldown_sec * 1000:
logger.info(f"{symbol} [入场] 冷却中({cooldown_sec}s跳过本次自动开仓")
return None
pending = self._pending_entry_orders.get(symbol)
if pending and pending.get("order_id"):
logger.info(f"{symbol} [入场] 已有未完成入场订单(orderId={pending.get('order_id')}),跳过重复开仓")
return None
# 标记本次尝试(无论最终是否成交,都避免短时间内反复开仓/挂单)
self._last_entry_attempt_ms[symbol] = now_ms
# 判断是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
return None
# 设置杠杆
await self.client.set_leverage(symbol, leverage)
# 计算仓位大小(传入实际使用的杠杆)
logger.info(f"开始为 {symbol} 计算仓位大小...")
quantity = await self.risk_manager.calculate_position_size(
symbol, change_percent, leverage=leverage
)
if quantity is None:
logger.warning(f"{symbol} 仓位计算失败,跳过交易")
logger.warning(f" 可能原因:")
logger.warning(f" 1. 账户余额不足")
logger.warning(f" 2. 单笔仓位超过限制")
logger.warning(f" 3. 总仓位超过限制")
logger.warning(f" 4. 无法获取价格数据")
logger.warning(f" 5. 保证金不足最小要求MIN_MARGIN_USDT")
logger.warning(f" 6. 名义价值小于0.2 USDT避免无意义的小单子")
return None
logger.info(f"{symbol} 仓位计算成功: {quantity:.4f}")
# 确定交易方向(优先使用技术指标信号)
if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
entry_price = ticker['price']
# 获取K线数据用于动态止损计算从symbol_info中获取如果可用
klines = None
bollinger = None
if 'klines' in locals() or 'symbol_info' in locals():
# 尝试从外部传入的symbol_info获取
pass
# 计算基于支撑/阻力的动态止损
# 优先使用技术结构(支撑/阻力位、布林带)
# 如果无法获取K线数据回退到ATR或固定止损
if not klines:
# 如果没有传入K线数据尝试获取
try:
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
klines_data = await self.client.get_klines(
symbol=symbol,
interval=primary_interval,
limit=20 # 获取最近20根K线用于计算支撑/阻力
)
klines = klines_data if len(klines_data) >= 10 else None
except Exception as e:
logger.debug(f"获取K线数据失败使用固定止损: {e}")
klines = None
# 在开仓前从Redis重新加载配置确保使用最新配置包括ATR参数
# 从Redis读取最新配置轻量级即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
logger.debug(f"{symbol} 开仓前已从Redis重新加载配置")
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
# ===== 智能入场方案C趋势强更少错过震荡更保守=====
smart_entry_enabled = bool(config.TRADING_CONFIG.get("SMART_ENTRY_ENABLED", True))
# LIMIT_ORDER_OFFSET_PCT兼容“比例/百分比”两种存储方式
limit_offset_ratio = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("LIMIT_ORDER_OFFSET_PCT", 0.5) or 0.0))
# 规则:趋势(trending) 或 4H共振明显 + 强信号 -> 允许“超时后市价兜底(有追价上限)”
mr = (market_regime or "").strip().lower() if market_regime else ""
t4 = (trend_4h or "").strip().lower() if trend_4h else ""
ss = int(signal_strength) if signal_strength is not None else 0
allow_market_fallback = False
if mr == "trending":
allow_market_fallback = True
if ss >= int(config.TRADING_CONFIG.get("SMART_ENTRY_STRONG_SIGNAL", 8) or 8) and t4 in ("up", "down"):
allow_market_fallback = True
# 追价上限:超过就不追,宁愿错过(避免回到“无脑追价高频打损”)
drift_ratio_trending = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_TRENDING", 0.6) or 0.6))
drift_ratio_ranging = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_RANGING", 0.3) or 0.3))
max_drift_ratio = drift_ratio_trending if allow_market_fallback else drift_ratio_ranging
# 总等待/追价参数
timeout_sec = int(config.TRADING_CONFIG.get("ENTRY_TIMEOUT_SEC", 180) or 180)
step_wait_sec = int(config.TRADING_CONFIG.get("ENTRY_STEP_WAIT_SEC", 15) or 15)
chase_steps = int(config.TRADING_CONFIG.get("ENTRY_CHASE_MAX_STEPS", 4) or 4)
market_fallback_after_sec = int(config.TRADING_CONFIG.get("ENTRY_MARKET_FALLBACK_AFTER_SEC", 45) or 45)
# 初始限价(回调入场)
current_px = float(entry_price)
initial_limit = self._calc_limit_entry_price(current_px, side, limit_offset_ratio)
if initial_limit <= 0:
return None
order = None
entry_order_id = None
order_status = None
actual_entry_price = None
filled_quantity = 0.0
entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase")
if not smart_entry_enabled:
# 根治方案:关闭智能入场后,回归“纯限价单模式”
# - 不追价
# - 不市价兜底
# - 未在确认时间内成交则撤单并跳过(属于策略未触发入场,不是系统错误)
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
logger.info(
f"{symbol} [纯限价入场] side={side} | 限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
f"确认超时={confirm_timeout}s未成交将撤单跳过"
)
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit
)
if not order:
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
else:
# 1) 先挂限价单
logger.info(
f"{symbol} [智能入场] 模式={entry_mode_used} | side={side} | "
f"marketRegime={market_regime} trend_4h={trend_4h} strength={ss}/10 | "
f"初始限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
f"追价上限={max_drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit)
if not order:
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
start_ts = time.time()
# 2) 分步等待 + 追价(逐步减少 offset并在趋势强时允许市价兜底有追价上限
for step in range(max(1, chase_steps)):
# 先等待一段时间看是否成交
wait_res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=step_wait_sec, poll_sec=1.0)
order_status = wait_res.get("status")
if wait_res.get("ok"):
actual_entry_price = float(wait_res.get("avg_price") or 0)
filled_quantity = float(wait_res.get("executed_qty") or 0)
break
# 未成交:如果超时太久且允许市价兜底,检查追价上限后转市价
elapsed = time.time() - start_ts
ticker2 = await self.client.get_ticker_24h(symbol)
cur2 = float(ticker2.get("price")) if ticker2 else current_px
drift_ratio = 0.0
try:
base = float(initial_limit) if float(initial_limit) > 0 else cur2
drift_ratio = abs((cur2 - base) / base)
except Exception:
drift_ratio = 0.0
if allow_market_fallback and elapsed >= market_fallback_after_sec:
if drift_ratio <= max_drift_ratio:
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底")
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET")
# 关键:转市价后必须更新 entry_order_id否则后续会继续查询“已取消的旧限价单”导致误判 CANCELED
try:
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
except Exception:
entry_order_id = None
break
else:
logger.info(f"{symbol} [智能入场] 限价超时,但偏离{drift_ratio*100:.2f}%>{max_drift_ratio*100:.2f}%,取消并放弃本次交易")
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
# 震荡/不允许市价兜底:尝试追价(减小 offset -> 更靠近当前价),但不突破追价上限
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
# offset 逐步减少到 0越追越接近当前价
step_ratio = (step + 1) / max(1, chase_steps)
cur_offset_ratio = max(0.0, limit_offset_ratio * (1.0 - step_ratio))
desired = self._calc_limit_entry_price(cur2, side, cur_offset_ratio)
if side == "BUY":
cap = initial_limit * (1 + max_drift_ratio)
desired = min(desired, cap)
else:
cap = initial_limit * (1 - max_drift_ratio)
desired = max(desired, cap)
if desired <= 0:
self._pending_entry_orders.pop(symbol, None)
return None
logger.info(
f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | "
f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired)
if not order:
self._pending_entry_orders.pop(symbol, None)
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
# 如果是市价兜底或最终限价成交,这里统一继续后续流程(下面会再查实际成交)
# ===== 统一处理:确认订单成交并获取实际成交价/数量 =====
if order:
if not entry_order_id:
entry_order_id = order.get("orderId")
if entry_order_id:
logger.info(f"{symbol} [开仓] 币安订单号: {entry_order_id}")
# 等待订单成交,检查订单状态并获取实际成交价格
# 只有在订单真正成交FILLED后才保存到数据库
if entry_order_id:
# 智能入场的限价订单可能需要更长等待,这里给一个总等待兜底(默认 30s
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=confirm_timeout, poll_sec=1.0)
order_status = res.get("status")
if res.get("ok"):
actual_entry_price = float(res.get("avg_price") or 0)
filled_quantity = float(res.get("executed_qty") or 0)
else:
# 未成交NEW/超时/CANCELED 等)属于“策略未触发入场”或“挂单没成交”
# 这不应当当作系统错误同时需要撤单best-effort避免留下悬挂委托造成后续混乱。
logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},跳过本次开仓并撤销挂单")
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
if not actual_entry_price or actual_entry_price <= 0:
logger.error(f"{symbol} [开仓] ❌ 无法获取实际成交价格,不保存到数据库")
self._pending_entry_orders.pop(symbol, None)
return None
if filled_quantity <= 0:
logger.error(f"{symbol} [开仓] ❌ 成交数量为0不保存到数据库")
self._pending_entry_orders.pop(symbol, None)
return None
# 使用实际成交价格和数量
original_entry_price = entry_price
entry_price = actual_entry_price
quantity = filled_quantity # 使用实际成交数量
logger.info(f"{symbol} [开仓] ✓ 使用实际成交价格: {entry_price:.4f} USDT (下单时价格: {original_entry_price:.4f}), 成交数量: {quantity:.4f}")
# 成交后清理 pending
self._pending_entry_orders.pop(symbol, None)
# ===== 成交后基于“实际成交价/数量”重新计算止损止盈(修复限价/滑点导致的偏差)=====
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.03)
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin,
klines=klines,
bollinger=bollinger,
atr=atr
)
stop_distance_for_tp = None
if side == 'BUY':
stop_distance_for_tp = entry_price - stop_loss_price
else:
stop_distance_for_tp = stop_loss_price - entry_price
if atr is not None and atr > 0 and entry_price > 0:
atr_percent = atr / entry_price
atr_multiplier = config.TRADING_CONFIG.get('ATR_STOP_LOSS_MULTIPLIER', 1.8)
stop_distance_for_tp = entry_price * atr_percent * atr_multiplier
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.30)
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = float(stop_loss_pct_margin or 0) * 3.0
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin,
atr=atr,
stop_distance=stop_distance_for_tp
)
# 分步止盈(基于“实际成交价 + 已计算的止损/止盈”)
if side == 'BUY':
take_profit_1 = entry_price + (entry_price - stop_loss_price) # 盈亏比1:1
else:
take_profit_1 = entry_price - (stop_loss_price - entry_price) # 盈亏比1:1
take_profit_2 = take_profit_price
# 交易规模:名义/保证金用于统计总交易量与UI展示
try:
notional_usdt = float(entry_price) * float(quantity)
except Exception:
notional_usdt = None
try:
margin_usdt = (float(notional_usdt) / float(leverage)) if notional_usdt is not None and float(leverage) > 0 else notional_usdt
except Exception:
margin_usdt = None
# 记录到数据库(只有在订单真正成交后才保存)
trade_id = None
if DB_AVAILABLE and Trade:
try:
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity, # 使用实际成交数量
entry_price=entry_price, # 使用实际成交价格
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id, # 保存币安订单号
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
return None
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法保存 {symbol} 交易记录")
# 记录持仓信息(包含动态止损止盈和分步止盈)
from datetime import datetime
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': change_percent,
'orderId': order.get('orderId'),
'tradeId': trade_id, # 数据库交易ID
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price, # 保留原始止盈价(第二目标)
'takeProfit1': take_profit_1, # 第一目标盈亏比1:1了结50%
'takeProfit2': take_profit_2, # 第二目标原始止盈价剩余50%
'partialProfitTaken': False, # 是否已部分止盈
'remainingQuantity': quantity, # 剩余仓位数量
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'entryTime': get_beijing_time(), # 记录入场时间(使用北京时间,用于计算持仓持续时间)
'strategyType': 'trend_following', # 策略类型(简化后只有趋势跟踪)
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
}
self.active_positions[symbol] = position_info
logger.info(
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
f"(涨跌幅: {change_percent:.2f}%)"
)
# 验证持仓是否真的在币安存在
try:
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
positions = await self.client.get_open_positions()
binance_position = next(
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
None
)
if binance_position:
logger.info(
f"{symbol} [开仓验证] ✓ 币安持仓确认: "
f"数量={float(binance_position.get('positionAmt', 0)):.4f}, "
f"入场价={float(binance_position.get('entryPrice', 0)):.4f}"
)
# 在币安侧挂“止损/止盈保护单”,避免仅依赖本地监控(服务重启/网络波动时更安全)
try:
current_mark = None
try:
current_mark = float(binance_position.get("markPrice", 0) or 0) or None
except Exception:
current_mark = None
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=current_mark)
except Exception as e:
logger.warning(f"{symbol} 挂币安止盈止损失败(不影响持仓监控): {e}")
else:
logger.warning(
f"{symbol} [开仓验证] ⚠️ 币安账户中没有持仓,可能订单未成交或被立即平仓"
)
# 清理本地记录
if symbol in self.active_positions:
del self.active_positions[symbol]
# 如果数据库已保存,标记为取消
if trade_id and DB_AVAILABLE and Trade:
try:
from database.connection import db
db.execute_update(
"UPDATE trades SET status = 'cancelled' WHERE id = %s",
(trade_id,)
)
logger.info(f"{symbol} [开仓验证] 已更新数据库状态为 cancelled (ID: {trade_id})")
except Exception as e:
logger.warning(f"{symbol} [开仓验证] 更新数据库状态失败: {e}")
return None
except Exception as verify_error:
logger.warning(f"{symbol} [开仓验证] 验证持仓时出错: {verify_error},继续使用本地记录")
# 启动WebSocket实时监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
# 记录“今日开仓次数”(用于用户风控旋钮 MAX_DAILY_ENTRIES
try:
await self.risk_manager.record_entry(symbol)
except Exception:
pass
return position_info
return None
except Exception as e:
logger.error(f"开仓失败 {symbol}: {e}")
return None
async def close_position(self, symbol: str, reason: str = 'manual') -> bool:
"""
平仓
Args:
symbol: 交易对
reason: 平仓原因manual, stop_loss, take_profit, trailing_stop, sync
Returns:
是否成功
"""
try:
logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})")
# 先取消币安侧的保护单(避免平仓后残留委托导致反向开仓/误触发)
try:
info0 = self.active_positions.get(symbol) if hasattr(self, "active_positions") else None
if info0 and isinstance(info0, dict):
for k in ("exchangeSlOrderId", "exchangeTpOrderId"):
oid = info0.get(k)
if oid:
try:
await self.client.futures_cancel_algo_order(int(oid))
except Exception:
pass
info0.pop("exchangeSlOrderId", None)
info0.pop("exchangeTpOrderId", None)
except Exception:
pass
# 获取当前持仓
positions = await self.client.get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if not position:
logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓")
# 即使币安没有持仓,也要更新数据库状态
updated = False
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...")
# 获取当前价格作为平仓价格
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice'])
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
if position_info['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
# 同步平仓时没有订单号设为None
# 计算持仓持续时间和策略类型
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None, # 同步平仓时没有订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
updated = True
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}")
# 清理本地记录
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
# 如果更新了数据库,返回成功;否则返回失败
return updated
# 确定平仓方向(与开仓相反)
position_amt = position['positionAmt']
side = 'SELL' if position_amt > 0 else 'BUY'
quantity = abs(position_amt)
position_side = 'LONG' if position_amt > 0 else 'SHORT'
# 二次校验:用币安实时持仓数量兜底,避免 reduceOnly 被拒绝(-2022
live_amt = await self._get_live_position_amt(symbol, position_side=position_side)
if live_amt is None or abs(live_amt) <= 0:
logger.warning(f"{symbol} [平仓] 实时查询到持仓已为0跳过下单并按已平仓处理")
# 复用“币安无持仓”的处理逻辑:走上面的分支
position = None
# 触发上方逻辑:直接返回 updated/清理
# 这里简单调用同步函数路径
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(position.get('entryPrice', 0) if position else 0)
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(f"{symbol} [平仓] 已清理本地记录(币安无持仓)")
return True
# 以币安实时持仓数量为准(并向下截断到不超过持仓)
quantity = min(quantity, abs(live_amt))
quantity = await self._adjust_close_quantity(symbol, quantity)
if quantity <= 0:
logger.warning(f"{symbol} [平仓] 数量调整后为0跳过下单并清理本地记录")
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
return True
logger.info(
f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET "
f"(持仓数量: {position_amt:.4f})"
)
# 平仓(使用 reduceOnly=True 确保只减少持仓,不增加反向持仓)
try:
logger.debug(f"{symbol} [平仓] 调用 place_order: {side} {quantity:.4f} @ MARKET (reduceOnly=True)")
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET',
reduce_only=True, # 平仓时使用 reduceOnly=True
position_side=position_side, # 兼容对冲模式Hedge必须指定 LONG/SHORT
)
logger.debug(f"{symbol} [平仓] place_order 返回: {order}")
except Exception as order_error:
logger.error(f"{symbol} [平仓] ❌ 下单失败: {order_error}")
logger.error(f" 下单参数: symbol={symbol}, side={side}, quantity={quantity:.4f}, order_type=MARKET")
logger.error(f" 错误类型: {type(order_error).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
raise # 重新抛出异常,让外层捕获
if order:
order_id = order.get('orderId')
logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order_id})")
# 等待订单成交,然后从币安获取实际成交价格
exit_price = None
try:
# 等待一小段时间让订单成交
await asyncio.sleep(1)
# 从币安获取订单详情,获取实际成交价格
try:
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
if order_info:
# 优先使用平均成交价格avgPrice如果没有则使用价格字段
exit_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0))
if exit_price > 0:
logger.info(f"{symbol} [平仓] 从币安订单获取实际成交价格: {exit_price:.4f} USDT")
else:
# 如果订单还没有完全成交,尝试从成交记录获取
if order_info.get('status') == 'FILLED' and order_info.get('fills'):
# 计算加权平均成交价格
total_qty = 0
total_value = 0
for fill in order_info.get('fills', []):
qty = float(fill.get('qty', 0))
price = float(fill.get('price', 0))
total_qty += qty
total_value += qty * price
if total_qty > 0:
exit_price = total_value / total_qty
logger.info(f"{symbol} [平仓] 从成交记录计算平均成交价格: {exit_price:.4f} USDT")
except Exception as order_error:
logger.warning(f"{symbol} [平仓] 获取订单详情失败: {order_error},使用备用方法")
# 如果无法从订单获取价格,使用当前价格作为备用
if not exit_price or exit_price <= 0:
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
exit_price = float(ticker['price'])
logger.warning(f"{symbol} [平仓] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
else:
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [平仓] 无法获取平仓价格,使用订单价格字段")
exit_price = float(order.get('price', 0))
except Exception as price_error:
logger.warning(f"{symbol} [平仓] 获取成交价格时出错: {price_error},使用当前价格")
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(order.get('price', 0))
# 更新数据库记录
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
# 计算盈亏确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity_float = float(quantity)
exit_price_float = float(exit_price)
if position_info['side'] == 'BUY':
pnl = (exit_price_float - entry_price) * quantity_float
pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price_float) * quantity_float
pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100
# 获取平仓订单号
exit_order_id = order.get('orderId')
if exit_order_id:
logger.info(f"{symbol} [平仓] 币安订单号: {exit_order_id}")
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型(从开仓原因或持仓信息中获取)
strategy_type = position_info.get('strategyType', 'trend_following') # 默认趋势跟踪
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price_float,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=exit_order_id, # 保存币安平仓订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
logger.info(
f"{symbol} [平仓] ✓ 数据库记录已更新 "
f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})"
)
except Exception as e:
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.warning(f"{symbol} 没有关联的数据库交易ID无法更新平仓记录")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法更新 {symbol} 平仓记录")
# 停止WebSocket监控
await self._stop_position_monitoring(symbol)
# 移除持仓记录
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(
f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} "
f"(原因: {reason})"
)
return True
else:
# place_order 返回 None可能是 -2022ReduceOnly rejected等竞态场景
# 兜底再查一次实时持仓如果已经为0则当作“已平仓”处理避免刷屏与误判失败
try:
live2 = await self._get_live_position_amt(symbol, position_side=position_side)
except Exception:
live2 = None
if live2 is None or abs(live2) <= 0:
logger.warning(f"{symbol} [平仓] 下单返回None但实时持仓已为0按已平仓处理可能竞态/手动平仓)")
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
return True
logger.error(f"{symbol} [平仓] ❌ 下单返回 None实时持仓仍存在: {live2}),可能的原因:")
logger.error(f" 1. ReduceOnly 被拒绝(-2022但持仓未同步")
logger.error(f" 2. 数量精度调整后为 0 或负数")
logger.error(f" 3. 无法获取价格信息")
logger.error(f" 4. 其他下单错误(已在 place_order 中记录)")
logger.error(f" 持仓信息: {side} {quantity:.4f} @ MARKET")
# 尝试获取更多诊断信息
try:
symbol_info = await self.client.get_symbol_info(symbol)
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
notional_value = quantity * current_price
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
logger.error(f" 当前价格: {current_price:.4f} USDT")
logger.error(f" 订单名义价值: {notional_value:.2f} USDT")
logger.error(f" 最小名义价值: {min_notional:.2f} USDT")
if notional_value < min_notional:
logger.error(f" ⚠ 订单名义价值不足,无法平仓")
except Exception as diag_error:
logger.warning(f" 无法获取诊断信息: {diag_error}")
return False
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
# 关键:平仓失败时不要盲目清理本地持仓/停止监控,否则会导致“仍有仓位但不再监控/不再自动止损止盈”
# 仅当确认币安已无持仓或实时持仓为0才清理本地记录。
try:
amt0 = await self._get_live_position_amt(symbol, position_side=None)
except Exception:
amt0 = None
if amt0 is not None and abs(amt0) <= 0:
try:
await self._stop_position_monitoring(symbol)
except Exception:
pass
try:
if symbol in self.active_positions:
del self.active_positions[symbol]
except Exception:
pass
logger.warning(f"{symbol} [平仓] 异常后检查币安持仓已为0已清理本地记录")
else:
logger.warning(
f"{symbol} [平仓] 异常后检查:币安持仓仍可能存在(amt={amt0}),保留本地记录与监控,等待下次同步/重试"
)
return False
async def _get_live_position_amt(self, symbol: str, position_side: Optional[str] = None) -> Optional[float]:
"""
从币安原始接口读取持仓数量(避免本地状态/缓存不一致导致 reduceOnly 被拒绝)。
- 单向模式:通常只有一个 net 持仓
- 对冲模式:可能同时有 LONG/SHORT两条腿用 positionSide 区分
"""
try:
if not getattr(self.client, "client", None):
return None
res = await self.client.client.futures_position_information(symbol=symbol)
if not isinstance(res, list):
return None
ps = (position_side or "").upper()
nonzero = []
for p in res:
if not isinstance(p, dict):
continue
try:
amt = float(p.get("positionAmt", 0))
except Exception:
continue
if abs(amt) <= 0:
continue
nonzero.append((amt, p))
if not nonzero:
return 0.0
if ps in ("LONG", "SHORT"):
for amt, p in nonzero:
pps = (p.get("positionSide") or "").upper()
if pps == ps:
return amt
# 如果没匹配到 positionSide退化为按符号推断
if ps == "LONG":
cand = next((amt for amt, _ in nonzero if amt > 0), None)
return cand if cand is not None else 0.0
if ps == "SHORT":
cand = next((amt for amt, _ in nonzero if amt < 0), None)
return cand if cand is not None else 0.0
# 没提供 position_side返回净持仓单向模式
return sum([amt for amt, _ in nonzero])
except Exception as e:
logger.debug(f"{symbol} 读取实时持仓失败: {e}")
return None
async def _adjust_close_quantity(self, symbol: str, quantity: float) -> float:
"""
平仓数量调整:只允许向下取整(避免超过持仓导致 reduceOnly 被拒绝)。
"""
try:
symbol_info = await self.client.get_symbol_info(symbol)
except Exception:
symbol_info = None
try:
q = float(quantity)
except Exception:
return 0.0
if not symbol_info:
return max(0.0, q)
try:
step_size = float(symbol_info.get("stepSize", 0) or 0)
except Exception:
step_size = 0.0
qty_precision = int(symbol_info.get("quantityPrecision", 8) or 8)
if step_size and step_size > 0:
q = float(int(q / step_size)) * step_size
else:
q = round(q, qty_precision)
q = round(q, qty_precision)
return max(0.0, q)
async def _ensure_exchange_sltp_orders(self, symbol: str, position_info: Dict, current_price: Optional[float] = None) -> None:
"""
在币安侧挂止损/止盈保护单STOP_MARKET + TAKE_PROFIT_MARKET
目的:
- 服务重启/网络波动时仍有交易所级别保护
- 用户在币安界面能看到止损/止盈委托
"""
try:
enabled = bool(config.TRADING_CONFIG.get("EXCHANGE_SLTP_ENABLED", True))
except Exception:
enabled = True
if not enabled:
return
if not position_info or not isinstance(position_info, dict):
return
side = (position_info.get("side") or "").upper()
if side not in {"BUY", "SELL"}:
return
stop_loss = position_info.get("stopLoss")
take_profit = position_info.get("takeProfit2") or position_info.get("takeProfit")
try:
stop_loss = float(stop_loss) if stop_loss is not None else None
except Exception:
stop_loss = None
try:
take_profit = float(take_profit) if take_profit is not None else None
except Exception:
take_profit = None
if not stop_loss or not take_profit:
return
# 防重复:先取消旧的保护单(仅取消特定类型,避免误伤普通挂单)
try:
await self.client.cancel_open_algo_orders_by_order_types(
symbol, {"STOP_MARKET", "TAKE_PROFIT_MARKET", "TRAILING_STOP_MARKET"}
)
except Exception:
pass
sl_order = await self.client.place_trigger_close_position_order(
symbol=symbol,
position_direction=side,
trigger_type="STOP_MARKET",
stop_price=stop_loss,
current_price=current_price,
working_type="MARK_PRICE",
)
tp_order = await self.client.place_trigger_close_position_order(
symbol=symbol,
position_direction=side,
trigger_type="TAKE_PROFIT_MARKET",
stop_price=take_profit,
current_price=current_price,
working_type="MARK_PRICE",
)
try:
# Algo 接口返回 algoId
position_info["exchangeSlOrderId"] = sl_order.get("algoId") if isinstance(sl_order, dict) else None
except Exception:
position_info["exchangeSlOrderId"] = None
try:
position_info["exchangeTpOrderId"] = tp_order.get("algoId") if isinstance(tp_order, dict) else None
except Exception:
position_info["exchangeTpOrderId"] = None
if position_info.get("exchangeSlOrderId") or position_info.get("exchangeTpOrderId"):
logger.info(
f"{symbol} 已挂币安保护单: "
f"SL={position_info.get('exchangeSlOrderId') or '-'} "
f"TP={position_info.get('exchangeTpOrderId') or '-'}"
)
async def check_stop_loss_take_profit(self) -> List[str]:
"""
检查止损止盈
Returns:
需要平仓的交易对列表
"""
closed_positions = []
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
if symbol not in position_dict:
# 持仓已不存在,移除记录
del self.active_positions[symbol]
continue
current_position = position_dict[symbol]
entry_price = position_info['entryPrice']
quantity = position_info['quantity'] # 修复获取quantity
# 获取当前标记价格
current_price = current_position.get('markPrice', 0)
if current_price == 0:
# 如果标记价格为0尝试从ticker获取
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
else:
current_price = entry_price
# 计算当前盈亏(基于保证金)
leverage = position_info.get('leverage', 10)
position_value = entry_price * quantity
margin = position_value / leverage if leverage > 0 else position_value
# 计算盈亏金额
if position_info['side'] == 'BUY':
pnl_amount = (current_price - entry_price) * quantity
else:
pnl_amount = (entry_price - current_price) * quantity
# 计算盈亏百分比(相对于保证金,与币安一致)
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
# 也计算价格百分比(用于显示和移动止损)
if position_info['side'] == 'BUY':
pnl_percent_price = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent_price = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利(基于保证金)
if pnl_percent_margin > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent_margin
# 移动止损逻辑(盈利后保护利润,基于保证金)
# 每次检查时从Redis重新加载配置确保配置修改能即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
# 检查是否启用移动止损默认False需要显式启用
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', False)
if use_trailing:
logger.debug(f"{symbol} [移动止损] 已启用,将检查移动止损逻辑")
else:
logger.debug(f"{symbol} [移动止损] 已禁用USE_TRAILING_STOP=False跳过移动止损检查")
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后(相对于保证金),激活移动止损
if pnl_percent_margin > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
)
else:
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
# 如果已经部分止盈,使用剩余仓位计算
if position_info.get('partialProfitTaken', False):
remaining_quantity = position_info.get('remainingQuantity', quantity)
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
protect_amount = remaining_margin * trailing_protect
# 计算剩余仓位的盈亏
if position_info['side'] == 'BUY':
remaining_pnl = (current_price - entry_price) * remaining_quantity
else:
remaining_pnl = (entry_price - current_price) * remaining_quantity
# 计算新的止损价(基于剩余仓位)
if position_info['side'] == 'BUY':
new_stop_loss = entry_price + (remaining_pnl - protect_amount) / remaining_quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
f"剩余数量: {remaining_quantity:.4f})"
)
else:
# 做空:止损价 = 开仓价 + (剩余盈亏 - 保护金额) / 剩余数量
# 注意:对于做空,止损价应该高于开仓价,所以用加法
# 移动止损只应该在盈利时激活
new_stop_loss = entry_price + (remaining_pnl - protect_amount) / remaining_quantity
# 对于做空,止损价应该越来越高(更宽松),所以检查 new_stop_loss > 当前止损
# 同时,移动止损只应该在盈利时激活
if new_stop_loss > position_info['stopLoss'] and remaining_pnl > 0:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
f"剩余数量: {remaining_quantity:.4f})"
)
else:
# 未部分止盈,使用原始仓位计算
protect_amount = margin * trailing_protect
# 计算对应的止损价
if position_info['side'] == 'BUY':
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
else:
# 做空:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
# 注意:对于做空,止损价应该高于开仓价,所以用加法
# 当盈利时pnl_amount > 0止损价应该往上移更宽松
# 当亏损时pnl_amount < 0不应该移动止损保持初始止损
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
# 对于做空,止损价应该越来越高(更宽松),所以检查 new_stop_loss > 当前止损
# 同时,移动止损只应该在盈利时激活,不应该在亏损时把止损往下移
if new_stop_loss > position_info['stopLoss'] and pnl_amount > 0:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
# 检查止损(使用更新后的止损价,基于保证金收益比)
# ⚠️ 重要:止损检查应该在时间锁之前,止损必须立即执行
stop_loss = position_info.get('stopLoss')
should_close_due_to_sl = False
exit_reason_sl = None
if stop_loss is None:
logger.warning(f"{symbol} 止损价未设置,跳过止损检查")
elif stop_loss is not None:
# 计算止损对应的保证金百分比目标
if position_info['side'] == 'BUY':
stop_loss_amount = (entry_price - stop_loss) * quantity
else: # SELL
stop_loss_amount = (stop_loss - entry_price) * quantity
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与止损目标(基于保证金)
if pnl_percent_margin <= -stop_loss_pct_margin:
should_close_due_to_sl = True
exit_reason_sl = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
# 计算持仓时间
entry_time = position_info.get('entryTime')
hold_time_minutes = 0
if entry_time:
try:
if isinstance(entry_time, datetime):
hold_time_sec = int((get_beijing_time() - entry_time).total_seconds())
else:
hold_time_sec = int(time.time() - (float(entry_time) if isinstance(entry_time, (int, float)) else 0))
hold_time_minutes = hold_time_sec / 60.0
except Exception:
hold_time_minutes = 0
# 详细诊断日志:记录平仓时的所有关键信息
logger.warning("=" * 80)
logger.warning(f"{symbol} [平仓诊断日志] ===== 触发止损平仓 =====")
logger.warning(f" 平仓原因: {exit_reason_sl}")
logger.warning(f" 入场价格: {entry_price:.6f} USDT")
logger.warning(f" 当前价格: {current_price:.4f} USDT")
logger.warning(f" 止损价格: {stop_loss:.4f} USDT")
logger.warning(f" 持仓数量: {quantity:.4f}")
logger.warning(f" 持仓时间: {hold_time_minutes:.1f} 分钟")
logger.warning(f" 入场时间: {entry_time}")
logger.warning(f" 当前盈亏: {pnl_percent_margin:.2f}% of margin")
logger.warning(f" 止损目标: -{stop_loss_pct_margin*100:.2f}% of margin")
logger.warning(f" 亏损金额: {abs(pnl_amount):.4f} USDT")
if position_info.get('trailingStopActivated'):
logger.warning(f" 移动止损: 已激活(从初始止损 {position_info.get('initialStopLoss', 'N/A')} 调整)")
logger.warning("=" * 80)
# 止损必须立即执行,不受时间锁限制
# 更新数据库
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_amount,
pnl_percent=pnl_percent_margin,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
# ⚠️ 关键修复:止损必须立即执行,不受时间锁限制
if await self.close_position(symbol, reason=exit_reason_sl):
closed_positions.append(symbol)
continue # 止损已执行,跳过后续止盈检查
# 检查分步止盈(基于保证金收益比)
# ⚠️ 优化:已移除止盈时间锁,止盈可以立即执行(与止损一致)
# 理由1) 止损已不受时间锁限制,止盈也应该一致
# 2) 分步止盈策略本身已提供利润保护50%在1:1止盈剩余保本
# 3) 交易所级别止盈单已提供保护
# 4) 及时止盈可以保护利润,避免价格回落
take_profit_1 = position_info.get('takeProfit1') # 第一目标盈亏比1:1
take_profit_2 = position_info.get('takeProfit2', position_info.get('takeProfit')) # 第二目标
partial_profit_taken = position_info.get('partialProfitTaken', False)
remaining_quantity = position_info.get('remainingQuantity', quantity)
# 第一目标盈亏比1:1了结50%仓位
# ✅ 已移除时间锁限制,可以立即执行
if not partial_profit_taken and take_profit_1 is not None:
# 计算第一目标对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_1_amount = (take_profit_1 - entry_price) * quantity
else: # SELL
take_profit_1_amount = (entry_price - take_profit_1) * quantity
take_profit_1_pct_margin = (take_profit_1_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与第一目标(基于保证金)
if pnl_percent_margin >= take_profit_1_pct_margin:
logger.info(
f"{symbol} 触发第一目标止盈盈亏比1:1基于保证金: "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_1_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 目标价={take_profit_1:.4f}"
)
# 部分平仓50%
partial_quantity = quantity * 0.5
try:
# 部分平仓
close_side = 'SELL' if position_info['side'] == 'BUY' else 'BUY'
close_position_side = 'LONG' if position_info['side'] == 'BUY' else 'SHORT'
# 二次校验并截断数量,避免 reduceOnly 被拒绝(-2022
live_amt = await self._get_live_position_amt(symbol, position_side=close_position_side)
if live_amt is None or abs(live_amt) <= 0:
logger.warning(f"{symbol} 部分止盈实时持仓已为0跳过部分平仓")
continue
partial_quantity = min(partial_quantity, abs(live_amt))
partial_quantity = await self._adjust_close_quantity(symbol, partial_quantity)
if partial_quantity <= 0:
logger.warning(f"{symbol} 部分止盈数量调整后为0跳过")
continue
partial_order = await self.client.place_order(
symbol=symbol,
side=close_side,
quantity=partial_quantity,
order_type='MARKET',
reduce_only=True, # 部分止盈必须 reduceOnly避免反向开仓
position_side=close_position_side, # 兼容对冲模式:指定要减少的持仓方向
)
if partial_order:
position_info['partialProfitTaken'] = True
position_info['remainingQuantity'] = remaining_quantity - partial_quantity
logger.info(
f"{symbol} 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}"
)
# 分步止盈后的“保本”处理:
# - 若启用 USE_TRAILING_STOP允许把剩余仓位止损移至成本价并进入移动止损阶段
# - 若关闭 USE_TRAILING_STOP严格不自动移动止损避免你说的“仍然保本/仍然移动止损”)
# 无论是否启用移动止损,分步止盈后都将剩余仓位止损移至成本价(保本)
# 这样既不错失后续行情,又彻底杜绝了该笔交易亏损的可能
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 部分止盈后:剩余仓位止损移至成本价 {entry_price:.4f}(保本),"
f"剩余50%仓位追求1.5:1止盈目标"
)
else:
# 兜底:可能遇到 -2022reduceOnly rejected等竞态重新查一次持仓
try:
live2 = await self._get_live_position_amt(symbol, position_side=close_position_side)
except Exception:
live2 = None
if live2 is None or abs(live2) <= 0:
logger.warning(f"{symbol} 部分止盈下单返回None但实时持仓已为0跳过")
continue
logger.warning(f"{symbol} 部分止盈下单返回None实时持仓仍存在: {live2}),稍后将继续由止损/止盈逻辑处理")
except Exception as e:
logger.error(f"{symbol} 部分止盈失败: {e}")
# 第二目标:原始止盈价,平掉剩余仓位(基于保证金收益比)
# ✅ 已移除时间锁限制,可以立即执行
if partial_profit_taken and take_profit_2 is not None:
# 计算第二目标对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_2_amount = (take_profit_2 - entry_price) * remaining_quantity
else: # SELL
take_profit_2_amount = (entry_price - take_profit_2) * remaining_quantity
# 使用剩余仓位的保证金
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
take_profit_2_pct_margin = (take_profit_2_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 计算剩余仓位的当前盈亏
if position_info['side'] == 'BUY':
remaining_pnl_amount = (current_price - entry_price) * remaining_quantity
else:
remaining_pnl_amount = (entry_price - current_price) * remaining_quantity
remaining_pnl_pct_margin = (remaining_pnl_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 直接比较剩余仓位盈亏百分比与第二目标(基于保证金)
if remaining_pnl_pct_margin >= take_profit_2_pct_margin:
logger.info(
f"{symbol} 触发第二目标止盈(基于保证金): "
f"剩余仓位盈亏={remaining_pnl_pct_margin:.2f}% of margin >= 目标={take_profit_2_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 目标价={take_profit_2:.4f}, "
f"剩余数量={remaining_quantity:.4f}"
)
exit_reason = 'take_profit'
# 计算总盈亏(原始仓位 + 部分止盈的盈亏)
# 部分止盈时的价格需要从数据库或记录中获取,这里简化处理
total_pnl_amount = remaining_pnl_amount # 简化:只计算剩余仓位盈亏
total_pnl_percent = (total_pnl_amount / margin * 100) if margin > 0 else 0
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=total_pnl_amount,
pnl_percent=total_pnl_percent,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol, reason=exit_reason):
closed_positions.append(symbol)
continue
else:
# 如果未部分止盈,但达到止盈目标,直接全部平仓(基于保证金收益比)
# ✅ 已移除时间锁限制,可以立即执行
take_profit = position_info.get('takeProfit')
if take_profit is not None:
# 计算止盈对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_amount = (take_profit - entry_price) * quantity
else: # SELL
take_profit_amount = (entry_price - take_profit) * quantity
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
if pnl_percent_margin >= take_profit_pct_margin:
logger.info(
f"{symbol} 触发止盈(基于保证金): "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 止盈价={take_profit:.4f}"
)
exit_reason = 'take_profit'
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间和策略类型
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
from datetime import datetime
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_amount,
pnl_percent=pnl_percent_margin,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol, reason=exit_reason):
closed_positions.append(symbol)
continue
except Exception as e:
logger.error(f"检查止损止盈失败: {e}")
return closed_positions
async def get_position_summary(self) -> Dict:
"""
获取持仓摘要
Returns:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
return {
'totalPositions': len(positions),
'totalBalance': balance.get('total', 0),
'availableBalance': balance.get('available', 0),
'totalPnL': total_pnl,
'positions': [
{
'symbol': p['symbol'],
'positionAmt': p['positionAmt'],
'entryPrice': p['entryPrice'],
'pnl': p['unRealizedProfit']
}
for p in positions
]
}
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {}
async def sync_positions_with_binance(self):
"""
同步币安实际持仓状态与数据库状态
检查哪些持仓在数据库中还是open状态但在币安已经不存在了
"""
if not DB_AVAILABLE or not Trade:
logger.debug("数据库不可用,跳过持仓状态同步")
return
try:
logger.info("开始同步币安持仓状态与数据库...")
# 1. 获取币安实际持仓
binance_positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions}
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
# 2. 获取数据库中状态为open的交易记录
db_open_trades = Trade.get_all(status='open')
db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else ''})")
# 3. 找出在数据库中open但在币安已不存在的持仓
missing_in_binance = db_open_symbols - binance_symbols
if missing_in_binance:
logger.warning(
f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态但在币安已不存在: "
f"{', '.join(missing_in_binance)}"
)
# 4. 更新这些持仓的状态
for symbol in missing_in_binance:
try:
trades = Trade.get_by_symbol(symbol, status='open')
if not trades:
logger.warning(f"{symbol} [状态同步] ⚠️ 数据库中没有找到open状态的交易记录跳过")
continue
logger.info(f"{symbol} [状态同步] 找到 {len(trades)} 条open状态的交易记录开始更新...")
except Exception as get_trades_error:
logger.error(
f"{symbol} [状态同步] ❌ 获取交易记录失败: "
f"错误类型={type(get_trades_error).__name__}, 错误消息={str(get_trades_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
continue
for trade in trades:
trade_id = trade.get('id')
if not trade_id:
logger.warning(f"{symbol} [状态同步] ⚠️ 交易记录缺少ID字段跳过: {trade}")
continue
try:
logger.info(
f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})... | "
f"入场价: {trade.get('entry_price', 'N/A')}, "
f"数量: {trade.get('quantity', 'N/A')}, "
f"方向: {trade.get('side', 'N/A')}"
)
# 尝试从币安历史订单获取实际平仓价格
exit_price = None
close_orders = []
try:
# 获取最近的平仓订单reduceOnly=True的订单
import time
end_time = int(time.time() * 1000) # 当前时间(毫秒)
start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天
logger.debug(
f"{symbol} [状态同步] 获取历史订单: "
f"symbol={symbol}, startTime={start_time}, endTime={end_time}"
)
# 获取历史订单
orders = await self.client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time,
endTime=end_time
)
# 验证 orders 的类型
if not isinstance(orders, list):
logger.warning(
f"{symbol} [状态同步] ⚠️ futures_get_all_orders 返回的不是列表: "
f"类型={type(orders)}, 值={orders}"
)
orders = [] # 设置为空列表,避免后续错误
if not orders:
logger.debug(f"{symbol} [状态同步] 未找到历史订单")
else:
logger.debug(f"{symbol} [状态同步] 找到 {len(orders)} 个历史订单")
# 查找最近的平仓订单reduceOnly=True且已成交
close_orders = []
for o in orders:
try:
if isinstance(o, dict) and o.get('reduceOnly') == True and o.get('status') == 'FILLED':
close_orders.append(o)
except (AttributeError, TypeError) as e:
logger.warning(
f"{symbol} [状态同步] ⚠️ 处理订单数据时出错: "
f"错误类型={type(e).__name__}, 错误消息={str(e)}, "
f"订单数据类型={type(o)}, 订单数据={o}"
)
continue
if close_orders:
# 按时间倒序排序,取最近的
close_orders.sort(key=lambda x: x.get('updateTime', 0), reverse=True)
latest_order = close_orders[0]
# 获取平均成交价格
exit_price = float(latest_order.get('avgPrice', 0))
if exit_price > 0:
logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT")
else:
logger.warning(
f"{symbol} [状态同步] 历史订单中没有有效的avgPrice: {latest_order}"
)
except KeyError as key_error:
# KeyError 可能是访问 orders[symbol] 或其他字典访问错误
logger.error(
f"{symbol} [状态同步] ❌ 获取历史订单时KeyError: "
f"错误key={key_error}, 错误类型={type(key_error).__name__}"
)
import traceback
logger.debug(f"{symbol} [状态同步] KeyError详情:\n{traceback.format_exc()}")
except Exception as order_error:
logger.warning(
f"{symbol} [状态同步] 获取历史订单失败: "
f"错误类型={type(order_error).__name__}, 错误消息={str(order_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
# 如果无法从订单获取,使用当前价格
if not exit_price or exit_price <= 0:
try:
ticker = await self.client.get_ticker_24h(symbol)
if ticker and ticker.get('price'):
exit_price = float(ticker['price'])
logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
else:
exit_price = float(trade.get('entry_price', 0))
logger.warning(
f"{symbol} [状态同步] 无法获取当前价格ticker={ticker}"
f"使用入场价: {exit_price:.4f} USDT"
)
except KeyError as key_error:
# KeyError 可能是访问 ticker['price'] 时出错
logger.error(
f"{symbol} [状态同步] ❌ 获取当前价格时KeyError: {key_error}, "
f"ticker数据: {ticker if 'ticker' in locals() else 'N/A'}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
except Exception as ticker_error:
logger.warning(
f"{symbol} [状态同步] 获取当前价格失败: "
f"错误类型={type(ticker_error).__name__}, 错误消息={str(ticker_error)}"
f"使用入场价: {trade.get('entry_price', 'N/A')}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
# 计算盈亏确保所有值都是float类型避免Decimal类型问题
try:
entry_price = float(trade.get('entry_price', 0))
quantity = float(trade.get('quantity', 0))
if entry_price <= 0 or quantity <= 0:
logger.error(
f"{symbol} [状态同步] ❌ 交易记录数据无效: "
f"入场价={entry_price}, 数量={quantity}, 跳过更新"
)
continue
if trade.get('side') == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
logger.debug(
f"{symbol} [状态同步] 盈亏计算: "
f"入场价={entry_price:.4f}, 平仓价={exit_price:.4f}, "
f"数量={quantity:.4f}, 方向={trade.get('side', 'N/A')}, "
f"盈亏={pnl:.2f} USDT ({pnl_percent:.2f}%)"
)
except (ValueError, TypeError, KeyError) as calc_error:
logger.error(
f"{symbol} [状态同步] ❌ 计算盈亏失败 (ID: {trade_id}): "
f"错误类型={type(calc_error).__name__}, 错误消息={str(calc_error)}, "
f"交易记录数据: entry_price={trade.get('entry_price', 'N/A')}, "
f"quantity={trade.get('quantity', 'N/A')}, side={trade.get('side', 'N/A')}"
)
continue
# 从历史订单中获取平仓订单号
exit_order_id = None
latest_close_order = None
if close_orders:
exit_order_id = close_orders[0].get('orderId')
latest_close_order = close_orders[0]
if exit_order_id:
logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}")
# 使用 try-except 包裹,确保异常被正确处理
try:
# 计算持仓持续时间和策略类型
# exit_reason 细分:优先看币安平仓订单类型,其次用价格接近止损/止盈价做兜底
exit_reason = "sync"
exit_time_ts = None
try:
if latest_close_order and isinstance(latest_close_order, dict):
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
# 检查订单的 reduceOnly 字段:如果是 true说明是自动平仓不应该标记为 manual
is_reduce_only = latest_close_order.get("reduceOnly", False)
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
# STOP / STOP_MARKET 通常对应止损触发
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
# 如果是 reduceOnly 订单,说明是自动平仓(可能是保护单触发的),先标记为 sync后续用价格判断
if is_reduce_only:
exit_reason = "sync" # 临时标记,后续用价格判断
else:
exit_reason = "manual" # 非 reduceOnly 的 MARKET/LIMIT 订单才是真正的手动平仓
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
try:
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
except Exception:
# 保持默认 sync
pass
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
# 这对于保护单触发的 MARKET 订单特别重要(币安的保护单触发后会生成 MARKET 订单)
try:
def _close_to(a: float, b: float, max_pct: float = 0.02) -> bool: # 放宽到2%,因为滑点可能导致价格不完全一致
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(exit_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
# 优先检查止损(因为止损更关键)
if sl is not None and _close_to(ep, float(sl), max_pct=0.02):
exit_reason = "stop_loss"
# 然后检查止盈
elif tp is not None and _close_to(ep, float(tp), max_pct=0.02):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.02):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.02):
exit_reason = "take_profit"
# 如果之前标记为 sync 且是 reduceOnly 订单,但价格不匹配止损/止盈,可能是其他自动平仓(如移动止损)
elif exit_reason == "sync" and is_reduce_only:
# 检查是否是移动止损:如果价格接近入场价,可能是移动止损触发的
entry_price_val = float(trade.get("entry_price", 0) or 0)
if entry_price_val > 0 and _close_to(ep, entry_price_val, max_pct=0.01):
exit_reason = "trailing_stop"
except Exception:
pass
# 持仓持续时间(分钟):优先用币安订单时间,否则用当前时间
entry_time = trade.get("entry_time")
duration_minutes = None
try:
et = None
if isinstance(entry_time, (int, float)):
et = int(entry_time)
elif isinstance(entry_time, str) and entry_time.strip():
# 兼容旧格式:字符串时间戳/日期字符串
s = entry_time.strip()
if s.isdigit():
et = int(s)
else:
from datetime import datetime
et = int(datetime.fromisoformat(s).timestamp())
xt = int(exit_time_ts) if exit_time_ts is not None else int(get_beijing_time())
if et is not None and xt >= et:
duration_minutes = int((xt - et) / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = 'trend_following' # 默认策略类型
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=exit_order_id, # 保存币安平仓订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
except Exception as update_error:
# update_exit 内部已经有异常处理,但如果仍然失败,记录错误但不中断同步流程
error_str = str(update_error)
if "Duplicate entry" in error_str and "exit_order_id" in error_str:
logger.warning(
f"{symbol} [状态同步] ⚠️ exit_order_id {exit_order_id} 唯一约束冲突,"
f"update_exit 内部处理失败,尝试不更新 exit_order_id"
)
# 再次尝试,不更新 exit_order_id
try:
from database.connection import db
from database.models import get_beijing_time
exit_time = int(exit_time_ts) if exit_time_ts is not None else get_beijing_time()
db.execute_update(
"""UPDATE trades
SET exit_price = %s, exit_time = %s,
exit_reason = %s, pnl = %s, pnl_percent = %s, status = 'closed'
WHERE id = %s""",
(exit_price, exit_time, exit_reason, pnl, pnl_percent, trade_id)
)
logger.info(f"{symbol} [状态同步] ✓ 已更新(跳过 exit_order_id")
except Exception as retry_error:
logger.error(f"{symbol} [状态同步] ❌ 重试更新也失败: {retry_error}")
raise
else:
# 其他错误,重新抛出
raise
logger.info(
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)"
)
# 清理本地记录
if symbol in self.active_positions:
await self._stop_position_monitoring(symbol)
del self.active_positions[symbol]
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
except Exception as e:
# 详细记录错误信息,包括异常类型、错误消息和堆栈跟踪
import traceback
error_type = type(e).__name__
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(
f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): "
f"错误类型={error_type}, 错误消息={error_msg}"
)
logger.debug(
f"{symbol} [状态同步] 错误详情:\n{error_traceback}"
)
# 如果是数据库相关错误,记录更多信息
if "Duplicate entry" in error_msg or "1062" in error_msg:
logger.error(
f"{symbol} [状态同步] 数据库唯一约束冲突,"
f"可能原因: exit_order_id={exit_order_id} 已被其他交易记录使用"
)
elif "database" in error_msg.lower() or "connection" in error_msg.lower():
logger.error(
f"{symbol} [状态同步] 数据库连接或执行错误,"
f"请检查数据库连接状态"
)
else:
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")
# 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的)
missing_in_db = binance_symbols - db_open_symbols
if missing_in_db:
logger.info(
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: "
f"{', '.join(missing_in_db)} (可能是手动开仓)"
)
# 为手动开仓的持仓创建数据库记录并启动监控
for symbol in missing_in_db:
try:
# 获取币安持仓详情
binance_position = next(
(p for p in binance_positions if p['symbol'] == symbol),
None
)
if not binance_position:
continue
position_amt = binance_position['positionAmt']
entry_price = binance_position['entryPrice']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
logger.info(
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
# 创建数据库记录
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=binance_position.get('leverage', 10),
entry_reason='manual_entry', # 标记为手动开仓
# 手动开仓无法拿到策略侧ATR/分步止盈,这里尽量补齐“规模字段”
notional_usdt=(float(entry_price) * float(quantity)) if entry_price and quantity else None,
margin_usdt=((float(entry_price) * float(quantity)) / float(binance_position.get('leverage', 10))) if entry_price and quantity and float(binance_position.get('leverage', 10) or 0) > 0 else None,
)
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
# 创建本地持仓记录(用于监控)
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# 计算止损止盈(基于保证金)
leverage = binance_position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
# 如果配置中没有设置止盈则使用止损的2倍作为默认
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0, # 手动开仓,无法计算涨跌幅
'orderId': None,
'tradeId': trade_id,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': leverage,
'entryReason': 'manual_entry',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
# 启动WebSocket监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控")
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
logger.info("持仓状态同步完成")
except Exception as e:
logger.error(f"同步持仓状态失败: {e}")
import traceback
logger.error(f"错误详情:\n{traceback.format_exc()}")
async def start_all_position_monitoring(self):
"""
启动所有持仓的WebSocket实时监控
"""
if not self._monitoring_enabled:
logger.info("实时监控已禁用,跳过启动")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning("客户端未初始化,无法启动实时监控")
return
# 获取当前所有持仓
positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in positions}
active_symbols = set(self.active_positions.keys())
logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else ''})")
# 为所有币安持仓启动监控即使不在active_positions中可能是手动开仓的
for position in positions:
symbol = position['symbol']
if symbol not in self._monitor_tasks:
# 如果不在active_positions中先创建记录
if symbol not in self.active_positions:
logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...")
# 这里会通过sync_positions_with_binance来处理但先启动监控
try:
entry_price = position.get('entryPrice', 0)
position_amt = position['positionAmt']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
# 创建临时记录用于监控
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# 计算止损止盈(基于保证金)
leverage = position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
# 如果配置中没有设置止盈则使用止损的2倍作为默认
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0,
'orderId': None,
'tradeId': None,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': leverage,
'entryReason': 'manual_entry_temp',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
logger.info(f"{symbol} 已创建临时持仓记录用于监控")
# 也为“现有持仓”补挂交易所保护单(重启/掉线更安全)
try:
mp = None
try:
mp = float(position.get("markPrice", 0) or 0) or None
except Exception:
mp = None
await self._ensure_exchange_sltp_orders(symbol, position_info, current_price=mp or current_price)
except Exception as e:
logger.warning(f"{symbol} 补挂币安止盈止损失败(不影响监控): {e}")
except Exception as e:
logger.error(f"{symbol} 创建临时持仓记录失败: {e}")
await self._start_position_monitoring(symbol)
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
async def stop_all_position_monitoring(self):
"""
停止所有持仓的WebSocket监控
"""
symbols = list(self._monitor_tasks.keys())
for symbol in symbols:
await self._stop_position_monitoring(symbol)
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
async def _start_position_monitoring(self, symbol: str):
"""
启动单个持仓的WebSocket价格监控
Args:
symbol: 交易对
"""
if symbol in self._monitor_tasks:
logger.debug(f"{symbol} 监控任务已存在,跳过")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning(f"{symbol} 客户端未初始化,无法启动监控")
return
try:
task = asyncio.create_task(self._monitor_position_price(symbol))
self._monitor_tasks[symbol] = task
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
except Exception as e:
logger.error(f"启动 {symbol} 监控失败: {e}")
async def _stop_position_monitoring(self, symbol: str):
"""
停止单个持仓的WebSocket监控
Args:
symbol: 交易对
"""
# 幂等:可能会被多处/并发调用,先 pop 再处理,避免 KeyError
task = self._monitor_tasks.pop(symbol, None)
if task is None:
return
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.debug(f"已停止 {symbol} 的WebSocket监控")
async def _monitor_position_price(self, symbol: str):
"""
监控单个持仓的价格WebSocket实时推送
Args:
symbol: 交易对
"""
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
# 使用WebSocket订阅价格流
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
logger.debug(f"{symbol} WebSocket连接已建立开始接收价格更新")
retry_count = 0 # 连接成功,重置重试计数
async for msg in ws:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
# 根据文档ticker 流包含 'c' 字段(最后价格)
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
current_price = float(data['c'])
# 立即检查止损止盈
await self._check_single_position(symbol, current_price)
elif 'data' in data:
# 兼容组合流格式(如果使用 /stream 端点)
if isinstance(data['data'], dict) and 'c' in data['data']:
current_price = float(data['data']['c'])
await self._check_single_position(symbol, current_price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"{symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"{symbol} WebSocket连接关闭")
break
except asyncio.CancelledError:
logger.info(f"{symbol} 监控任务已取消")
break
except Exception as e:
retry_count += 1
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
if retry_count < max_retries:
# 指数退避重试
wait_time = min(2 ** retry_count, 30)
logger.info(f"{symbol} {wait_time}秒后重试连接...")
await asyncio.sleep(wait_time)
else:
logger.error(f"{symbol} WebSocket监控失败已达到最大重试次数")
# 回退到定时检查模式
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
break
# 清理任务
if symbol in self._monitor_tasks:
del self._monitor_tasks[symbol]
async def _check_single_position(self, symbol: str, current_price: float):
"""
检查单个持仓的止损止盈(实时检查)
Args:
symbol: 交易对
current_price: 当前价格
"""
position_info = self.active_positions.get(symbol)
if not position_info:
return
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
current_price_float = float(current_price)
# 计算当前盈亏(基于保证金)
leverage = position_info.get('leverage', 10)
position_value = entry_price * quantity
margin = position_value / leverage if leverage > 0 else position_value
# 计算盈亏金额
if position_info['side'] == 'BUY':
pnl_amount = (current_price_float - entry_price) * quantity
else: # SELL
pnl_amount = (entry_price - current_price_float) * quantity
# 计算盈亏百分比(相对于保证金,与币安一致)
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
# 也计算价格百分比(用于显示)
if position_info['side'] == 'BUY':
pnl_percent_price = ((current_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl_percent_price = ((entry_price - current_price_float) / entry_price) * 100
# 更新最大盈利(基于保证金)
if pnl_percent_margin > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent_margin
# 移动止损逻辑(盈利后保护利润,基于保证金)
# 每次检查时从Redis重新加载配置确保配置修改能即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
# ⚠️ 优化:已完全移除时间锁限制
# 理由1) 止损和止盈都应该立即执行,不受时间限制
# 2) 交易所级别的止损/止盈单已提供保护
# 3) 分步止盈策略本身已提供利润保护
# 4) 及时执行止损/止盈可以保护资金和利润
# 注意如果需要防止秒级平仓可以通过提高入场信号质量MIN_SIGNAL_STRENGTH来实现
# 检查是否启用移动止损默认False需要显式启用
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', False)
if use_trailing:
logger.debug(f"{symbol} [实时监控-移动止损] 已启用,将检查移动止损逻辑")
else:
logger.debug(f"{symbol} [实时监控-移动止损] 已禁用USE_TRAILING_STOP=False跳过移动止损检查")
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后(相对于保证金),激活移动止损
if pnl_percent_margin > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
)
else:
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
# 计算需要保护的利润金额
protect_amount = margin * trailing_protect
# 计算对应的止损价
if position_info['side'] == 'BUY':
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
else: # SELL
# 做空:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
# 注意:对于做空,止损价应该高于开仓价,所以用加法
# 当盈利时pnl_amount > 0止损价应该往上移更宽松
# 当亏损时pnl_amount < 0不应该移动止损保持初始止损
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
# 对于做空,止损价应该越来越高(更宽松),所以检查 new_stop_loss > 当前止损
# 同时,移动止损只应该在盈利时激活,不应该在亏损时把止损往下移
if new_stop_loss > position_info['stopLoss'] and pnl_amount > 0:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
# 检查止损(基于保证金收益比)
# ⚠️ 重要:止损检查应该在时间锁之前,止损必须立即执行
stop_loss = position_info.get('stopLoss')
should_close_due_to_sl = False
exit_reason_sl = None
if stop_loss is not None:
# 计算止损对应的保证金百分比目标
# 止损金额 = (开仓价 - 止损价) × 数量 或 (止损价 - 开仓价) × 数量
if position_info['side'] == 'BUY':
stop_loss_amount = (entry_price - stop_loss) * quantity
else: # SELL
stop_loss_amount = (stop_loss - entry_price) * quantity
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
# 每5%亏损记录一次诊断日志(帮助排查问题)
if pnl_percent_margin <= -5.0:
should_log = (int(abs(pnl_percent_margin)) % 5 == 0) or (pnl_percent_margin <= -10.0 and pnl_percent_margin > -10.5)
if should_log:
trigger_condition = pnl_percent_margin <= -stop_loss_pct_margin
logger.warning(
f"{symbol} [实时监控] 诊断: 亏损{pnl_percent_margin:.2f}% of margin | "
f"当前价: {current_price_float:.4f} | "
f"入场价: {entry_price:.4f} | "
f"止损价: {stop_loss:.4f} (目标: -{stop_loss_pct_margin:.2f}% of margin) | "
f"方向: {position_info['side']} | "
f"是否触发: {trigger_condition} | "
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
)
# 直接比较当前盈亏百分比与止损目标(基于保证金)
if pnl_percent_margin <= -stop_loss_pct_margin:
should_close_due_to_sl = True
exit_reason_sl = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
# 计算持仓时间
entry_time = position_info.get('entryTime')
hold_time_minutes = 0
if entry_time:
try:
if isinstance(entry_time, datetime):
hold_time_sec = int((get_beijing_time() - entry_time).total_seconds())
else:
hold_time_sec = int(time.time() - (float(entry_time) if isinstance(entry_time, (int, float)) else 0))
hold_time_minutes = hold_time_sec / 60.0
except Exception:
hold_time_minutes = 0
# 详细诊断日志:记录平仓时的所有关键信息
logger.warning("=" * 80)
logger.warning(f"{symbol} [实时监控-平仓诊断日志] ===== 触发止损平仓 =====")
logger.warning(f" 平仓原因: {exit_reason_sl}")
logger.warning(f" 入场价格: {entry_price:.6f} USDT")
logger.warning(f" 当前价格: {current_price_float:.6f} USDT")
logger.warning(f" 止损价格: {stop_loss:.4f} USDT")
logger.warning(f" 持仓数量: {quantity:.4f}")
logger.warning(f" 持仓时间: {hold_time_minutes:.1f} 分钟")
logger.warning(f" 入场时间: {entry_time}")
logger.warning(f" 当前盈亏: {pnl_percent_margin:.2f}% of margin")
logger.warning(f" 止损目标: -{stop_loss_pct_margin:.2f}% of margin")
logger.warning(f" 亏损金额: {abs(pnl_amount):.4f} USDT")
if position_info.get('trailingStopActivated'):
logger.warning(f" 移动止损: 已激活(从初始止损 {position_info.get('initialStopLoss', 'N/A')} 调整)")
logger.warning("=" * 80)
# ⚠️ 关键修复:止损必须立即执行,不受时间锁限制
if await self.close_position(symbol, reason=exit_reason_sl):
logger.info(f"{symbol} [实时监控] 止损平仓成功(不受时间锁限制)")
return # 止损已执行,跳过后续止盈检查
# 检查分步止盈(实时监控)
# ⚠️ 优化:已移除止盈时间锁,止盈可以立即执行(与止损一致)
# 理由1) 止损已不受时间锁限制,止盈也应该一致
# 2) 分步止盈策略本身已提供利润保护50%在1:1止盈剩余保本
# 3) 交易所级别止盈单已提供保护
# 4) 及时止盈可以保护利润,避免价格回落
should_close = False
take_profit_1 = position_info.get('takeProfit1') # 第一目标盈亏比1:1
take_profit_2 = position_info.get('takeProfit2', position_info.get('takeProfit')) # 第二目标1.5:1
partial_profit_taken = position_info.get('partialProfitTaken', False)
remaining_quantity = position_info.get('remainingQuantity', quantity)
# 第一目标盈亏比1:1了结50%仓位
if not partial_profit_taken and take_profit_1 is not None:
# 计算第一目标对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_1_amount = (take_profit_1 - entry_price) * quantity
else: # SELL
take_profit_1_amount = (entry_price - take_profit_1) * quantity
take_profit_1_pct_margin = (take_profit_1_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与第一目标(基于保证金)
if pnl_percent_margin >= take_profit_1_pct_margin:
logger.info(
f"{symbol} [实时监控] 触发第一目标止盈盈亏比1:1基于保证金: "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_1_pct_margin:.2f}% of margin | "
f"当前价={current_price_float:.4f}, 目标价={take_profit_1:.4f}"
)
# 部分平仓50%
partial_quantity = quantity * 0.5
try:
close_side = 'SELL' if position_info['side'] == 'BUY' else 'BUY'
close_position_side = 'LONG' if position_info['side'] == 'BUY' else 'SHORT'
live_amt = await self._get_live_position_amt(symbol, position_side=close_position_side)
if live_amt is None or abs(live_amt) <= 0:
logger.warning(f"{symbol} [实时监控] 部分止盈实时持仓已为0跳过部分平仓")
else:
partial_quantity = min(partial_quantity, abs(live_amt))
partial_quantity = await self._adjust_close_quantity(symbol, partial_quantity)
if partial_quantity > 0:
partial_order = await self.client.place_order(
symbol=symbol,
side=close_side,
quantity=partial_quantity,
order_type='MARKET',
reduce_only=True,
position_side=close_position_side,
)
if partial_order:
position_info['partialProfitTaken'] = True
position_info['remainingQuantity'] = remaining_quantity - partial_quantity
logger.info(
f"{symbol} [实时监控] 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}"
)
# 分步止盈后的"保本"处理:将剩余仓位止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} [实时监控] 部分止盈后:剩余仓位止损移至成本价 {entry_price:.4f}(保本),"
f"剩余50%仓位追求1.5:1止盈目标"
)
except Exception as e:
logger.error(f"{symbol} [实时监控] 部分止盈失败: {e}")
# 第二目标1.5:1止盈平掉剩余仓位
if partial_profit_taken and take_profit_2 is not None and not should_close:
# 计算第二目标对应的保证金百分比(基于剩余仓位)
if position_info['side'] == 'BUY':
take_profit_2_amount = (take_profit_2 - entry_price) * remaining_quantity
else: # SELL
take_profit_2_amount = (entry_price - take_profit_2) * remaining_quantity
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
take_profit_2_pct_margin = (take_profit_2_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 计算剩余仓位的当前盈亏
if position_info['side'] == 'BUY':
remaining_pnl_amount = (current_price_float - entry_price) * remaining_quantity
else:
remaining_pnl_amount = (entry_price - current_price_float) * remaining_quantity
remaining_pnl_pct_margin = (remaining_pnl_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 直接比较剩余仓位盈亏百分比与第二目标(基于保证金)
if remaining_pnl_pct_margin >= take_profit_2_pct_margin:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发第二目标止盈1.5:1基于保证金: "
f"剩余仓位盈亏={remaining_pnl_pct_margin:.2f}% of margin >= 目标={take_profit_2_pct_margin:.2f}% of margin | "
f"当前价={current_price_float:.4f}, 目标价={take_profit_2:.4f}, "
f"剩余数量={remaining_quantity:.4f}"
)
# 检查止盈(基于保证金收益比)- 用于未启用分步止盈的情况
if not should_close:
take_profit = position_info.get('takeProfit')
if take_profit is not None:
# 计算止盈对应的保证金百分比目标
# 止盈金额 = (止盈价 - 开仓价) × 数量 或 (开仓价 - 止盈价) × 数量
if position_info['side'] == 'BUY':
take_profit_amount = (take_profit - entry_price) * quantity
else: # SELL
take_profit_amount = (entry_price - take_profit) * quantity
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
# 每5%盈利记录一次诊断日志(帮助排查问题)
if pnl_percent_margin >= 5.0:
should_log = (int(pnl_percent_margin) % 5 == 0) or (pnl_percent_margin >= 10.0 and pnl_percent_margin < 10.5)
if should_log:
trigger_condition = pnl_percent_margin >= take_profit_pct_margin
logger.warning(
f"{symbol} [实时监控] 诊断: 盈利{pnl_percent_margin:.2f}% of margin | "
f"当前价: {current_price_float:.4f} | "
f"入场价: {entry_price:.4f} | "
f"止盈价: {take_profit:.4f} (目标: {take_profit_pct_margin:.2f}% of margin) | "
f"方向: {position_info['side']} | "
f"是否触发: {trigger_condition} | "
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
)
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
if pnl_percent_margin >= take_profit_pct_margin:
should_close = True
exit_reason = 'take_profit'
# 详细诊断日志:记录平仓时的所有关键信息
logger.info("=" * 80)
logger.info(f"{symbol} [实时监控-平仓诊断日志] ===== 触发止盈平仓 =====")
logger.info(f" 平仓原因: {exit_reason}")
logger.info(f" 入场价格: {entry_price:.6f} USDT")
logger.info(f" 当前价格: {current_price_float:.6f} USDT")
logger.info(f" 止盈价格: {take_profit:.4f} USDT")
logger.info(f" 持仓数量: {quantity:.4f}")
logger.info(f" 持仓时间: {hold_time_minutes:.1f} 分钟")
logger.info(f" 入场时间: {entry_time}")
logger.info(f" 当前盈亏: {pnl_percent_margin:.2f}% of margin")
logger.info(f" 止盈目标: {take_profit_pct_margin:.2f}% of margin")
logger.info(f" 盈利金额: {pnl_amount:.4f} USDT")
logger.info("=" * 80)
# 如果触发止损止盈,执行平仓
if should_close:
# 自动平仓限流:避免同一 symbol 短时间内反复触发平仓请求WebSocket 高频推送下很常见)
try:
now_ms = int(time.time() * 1000)
except Exception:
now_ms = None
if now_ms is not None:
cooldown_sec = int(config.TRADING_CONFIG.get("AUTO_CLOSE_COOLDOWN_SEC", 20) or 0)
last_ms = self._last_auto_close_attempt_ms.get(symbol)
if last_ms and cooldown_sec > 0 and now_ms - last_ms < cooldown_sec * 1000:
# 不重复刷屏:仅 debug
logger.debug(f"{symbol} [自动平仓] 冷却中({cooldown_sec}s跳过重复平仓尝试")
return
self._last_auto_close_attempt_ms[symbol] = now_ms
logger.info(
f"{symbol} [自动平仓] 开始执行平仓操作 | "
f"原因: {exit_reason} | "
f"入场价: {entry_price:.4f} | "
f"当前价: {current_price_float:.4f} | "
f"盈亏: {pnl_percent_margin:.2f}% of margin ({pnl_amount:.4f} USDT) | "
f"数量: {quantity:.4f}"
)
# 执行平仓(让 close_position 统一处理数据库更新,避免重复更新和状态不一致)
logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...")
success = await self.close_position(symbol, reason=exit_reason)
if success:
logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成")
# 平仓成功后,立即触发一次状态同步,确保数据库状态与币安一致
try:
await asyncio.sleep(2) # 等待2秒让币安订单完全成交
await self.sync_positions_with_binance()
logger.debug(f"{symbol} [自动平仓] 已触发状态同步")
except Exception as sync_error:
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
else:
# 平仓失败:先二次核对币安是否已无仓位(常见于竞态/网络抖动/幂等场景)
live_amt = None
try:
live_amt = await self._get_live_position_amt(symbol, position_side=None)
except Exception:
live_amt = None
if live_amt is not None and abs(live_amt) <= 0:
logger.warning(f"{symbol} [自动平仓] 平仓返回失败但币安持仓已为0按已平仓处理避免误报")
# 尝试同步一次让DB与界面尽快一致失败也不刷屏
try:
await self.sync_positions_with_binance()
except Exception:
pass
return
# 仍有仓位:减少刷屏(按时间窗口合并/限流)
should_log = True
if now_ms is not None:
log_cd = int(config.TRADING_CONFIG.get("AUTO_CLOSE_FAIL_LOG_COOLDOWN_SEC", 600) or 0)
last_log = self._last_auto_close_fail_log_ms.get(symbol)
if last_log and log_cd > 0 and now_ms - last_log < log_cd * 1000:
should_log = False
else:
self._last_auto_close_fail_log_ms[symbol] = now_ms
if should_log:
logger.error(f"{symbol} [自动平仓] ❌ 平仓失败(币安持仓仍存在: {live_amt}")
else:
logger.warning(f"{symbol} [自动平仓] 平仓仍失败(已在短时间内记录,暂不重复输出)")
# 即使平仓失败,也尝试同步状态(可能币安已经平仓了)
try:
await self.sync_positions_with_binance()
except Exception as sync_error:
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
async def diagnose_position(self, symbol: str):
"""
诊断持仓状态(用于排查为什么没有自动平仓)
Args:
symbol: 交易对
"""
try:
logger.info(f"{symbol} [诊断] 开始诊断持仓状态...")
# 1. 检查是否在active_positions中
if symbol not in self.active_positions:
logger.warning(f"{symbol} [诊断] ❌ 不在本地持仓记录中 (active_positions)")
logger.warning(f" 可能原因: 手动开仓或系统重启后未同步")
logger.warning(f" 解决方案: 等待下次状态同步或手动触发同步")
else:
position_info = self.active_positions[symbol]
logger.info(f"{symbol} [诊断] ✓ 在本地持仓记录中")
logger.info(f" 入场价: {position_info['entryPrice']:.4f}")
logger.info(f" 方向: {position_info['side']}")
logger.info(f" 数量: {position_info['quantity']:.4f}")
logger.info(f" 止损价: {position_info['stopLoss']:.4f}")
logger.info(f" 止盈价: {position_info['takeProfit']:.4f}")
# 2. 检查WebSocket监控状态
if symbol in self._monitor_tasks:
task = self._monitor_tasks[symbol]
if task.done():
logger.warning(f"{symbol} [诊断] ⚠ WebSocket监控任务已结束")
try:
await task # 获取异常信息
except Exception as e:
logger.warning(f" 任务异常: {e}")
else:
logger.info(f"{symbol} [诊断] ✓ WebSocket监控任务运行中")
else:
logger.warning(f"{symbol} [诊断] ❌ 没有WebSocket监控任务")
logger.warning(f" 可能原因: 监控未启动或已停止")
# 3. 获取币安实际持仓
positions = await self.client.get_open_positions()
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
if not binance_position:
logger.warning(f"{symbol} [诊断] ❌ 币安账户中没有持仓")
return
logger.info(f"{symbol} [诊断] ✓ 币安账户中有持仓")
entry_price_binance = binance_position.get('entryPrice', 0)
mark_price = binance_position.get('markPrice', 0)
unrealized_pnl = binance_position.get('unRealizedProfit', 0)
logger.info(f" 币安入场价: {entry_price_binance:.4f}")
logger.info(f" 标记价格: {mark_price:.4f}")
logger.info(f" 未实现盈亏: {unrealized_pnl:.2f} USDT")
# 4. 计算实际盈亏
if symbol in self.active_positions:
position_info = self.active_positions[symbol]
entry_price = float(position_info['entryPrice'])
take_profit = float(position_info['takeProfit'])
if position_info['side'] == 'BUY':
pnl_percent = ((mark_price - entry_price) / entry_price) * 100
take_profit_pct = ((take_profit - entry_price) / entry_price) * 100
should_trigger = mark_price >= take_profit
else: # SELL
pnl_percent = ((entry_price - mark_price) / entry_price) * 100
take_profit_pct = ((entry_price - take_profit) / entry_price) * 100
should_trigger = mark_price <= take_profit
logger.info(f"{symbol} [诊断] 盈亏分析:")
logger.info(f" 当前盈亏: {pnl_percent:.2f}%")
logger.info(f" 止盈目标: {take_profit_pct:.2f}%")
logger.info(f" 当前价: {mark_price:.4f}")
logger.info(f" 止盈价: {take_profit:.4f}")
logger.info(f" 价格差: {abs(mark_price - take_profit):.4f}")
logger.info(f" 应该触发: {should_trigger}")
if pnl_percent > take_profit_pct and not should_trigger:
logger.error(f"{symbol} [诊断] ❌ 异常: 盈亏{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!")
logger.error(f" 可能原因: 浮点数精度问题或止盈价格计算错误")
logger.info(f"{symbol} [诊断] 诊断完成")
except Exception as e:
logger.error(f"{symbol} [诊断] 诊断失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")