2450 lines
137 KiB
Python
2450 lines
137 KiB
Python
"""
|
||
仓位管理模块 - 管理持仓和订单
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import json
|
||
import aiohttp
|
||
import time
|
||
from typing import Dict, List, Optional
|
||
from datetime import datetime
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .risk_manager import RiskManager
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from risk_manager import RiskManager
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试导入数据库模型(如果可用)
|
||
DB_AVAILABLE = False
|
||
Trade = None
|
||
get_beijing_time = None
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import Trade, get_beijing_time
|
||
DB_AVAILABLE = True
|
||
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
|
||
else:
|
||
logger.warning("⚠ backend目录不存在,无法使用数据库功能")
|
||
DB_AVAILABLE = False
|
||
except ImportError as e:
|
||
logger.warning(f"⚠ 无法导入数据库模型: {e}")
|
||
logger.warning(" 交易记录将不会保存到数据库")
|
||
DB_AVAILABLE = False
|
||
except Exception as e:
|
||
logger.warning(f"⚠ 数据库初始化失败: {e}")
|
||
logger.warning(" 交易记录将不会保存到数据库")
|
||
DB_AVAILABLE = False
|
||
|
||
# 如果没有导入get_beijing_time,创建一个本地版本
|
||
if get_beijing_time is None:
|
||
from datetime import datetime, timezone, timedelta
|
||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||
def get_beijing_time():
|
||
"""获取当前北京时间(UTC+8)"""
|
||
return datetime.now(BEIJING_TZ).replace(tzinfo=None)
|
||
|
||
|
||
class PositionManager:
|
||
"""仓位管理类"""
|
||
|
||
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
|
||
"""
|
||
初始化仓位管理器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
risk_manager: 风险管理器
|
||
"""
|
||
self.client = client
|
||
self.risk_manager = risk_manager
|
||
self.active_positions: Dict[str, Dict] = {}
|
||
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
|
||
self._monitoring_enabled = True # 是否启用实时监控
|
||
self._pending_entry_orders: Dict[str, Dict] = {} # 未成交的入场订单(避免重复挂单)
|
||
self._last_entry_attempt_ms: Dict[str, int] = {} # 每个symbol的最近一次尝试(冷却/去抖)
|
||
|
||
@staticmethod
|
||
def _pct_like_to_ratio(v: float) -> float:
|
||
"""
|
||
将“看起来像百分比”的值转换为比例(0~1)。
|
||
|
||
兼容两种来源:
|
||
- 前端/后端按“比例”存储:0.006 表示 0.6%
|
||
- 历史/默认值按“百分比数值”存储:0.6 表示 0.6%
|
||
|
||
经验规则:
|
||
- v > 1: 认为是 60/100 这种百分数,除以100
|
||
- 0.05 < v <= 1: 也更可能是“0.6% 这种写法”,除以100
|
||
- v <= 0.05: 更可能已经是比例(<=5%)
|
||
"""
|
||
try:
|
||
x = float(v or 0.0)
|
||
except Exception:
|
||
x = 0.0
|
||
if x <= 0:
|
||
return 0.0
|
||
if x > 1.0:
|
||
return x / 100.0
|
||
if x > 0.05:
|
||
return x / 100.0
|
||
return x
|
||
|
||
def _calc_limit_entry_price(self, current_price: float, side: str, offset_ratio: float) -> float:
|
||
"""根据当前价与偏移比例,计算限价入场价(BUY: 下方回调;SELL: 上方回调)"""
|
||
try:
|
||
cp = float(current_price)
|
||
except Exception:
|
||
cp = 0.0
|
||
try:
|
||
off = float(offset_ratio or 0.0)
|
||
except Exception:
|
||
off = 0.0
|
||
if cp <= 0:
|
||
return 0.0
|
||
if (side or "").upper() == "BUY":
|
||
return cp * (1 - off)
|
||
return cp * (1 + off)
|
||
|
||
async def _wait_for_order_filled(
|
||
self,
|
||
symbol: str,
|
||
order_id: int,
|
||
timeout_sec: int = 30,
|
||
poll_sec: float = 1.0,
|
||
) -> Dict:
|
||
"""
|
||
等待订单成交(FILLED),返回:
|
||
{ ok, status, avg_price, executed_qty, raw }
|
||
"""
|
||
deadline = time.time() + max(1, int(timeout_sec or 1))
|
||
last_status = None
|
||
while time.time() < deadline:
|
||
try:
|
||
info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
|
||
status = info.get("status")
|
||
last_status = status
|
||
if status == "FILLED":
|
||
avg_price = float(info.get("avgPrice", 0) or 0) or float(info.get("price", 0) or 0)
|
||
executed_qty = float(info.get("executedQty", 0) or 0)
|
||
return {"ok": True, "status": status, "avg_price": avg_price, "executed_qty": executed_qty, "raw": info}
|
||
if status in ("CANCELED", "REJECTED", "EXPIRED"):
|
||
return {"ok": False, "status": status, "avg_price": 0, "executed_qty": float(info.get("executedQty", 0) or 0), "raw": info}
|
||
except Exception:
|
||
# 忽略单次失败,继续轮询
|
||
pass
|
||
await asyncio.sleep(max(0.2, float(poll_sec or 1.0)))
|
||
return {"ok": False, "status": last_status or "TIMEOUT", "avg_price": 0, "executed_qty": 0, "raw": None}
|
||
|
||
async def open_position(
|
||
self,
|
||
symbol: str,
|
||
change_percent: float,
|
||
leverage: int = 10,
|
||
trade_direction: Optional[str] = None,
|
||
entry_reason: str = '',
|
||
signal_strength: Optional[int] = None,
|
||
market_regime: Optional[str] = None,
|
||
trend_4h: Optional[str] = None,
|
||
atr: Optional[float] = None,
|
||
klines: Optional[List] = None,
|
||
bollinger: Optional[Dict] = None
|
||
) -> Optional[Dict]:
|
||
"""
|
||
开仓
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
change_percent: 涨跌幅百分比
|
||
leverage: 杠杆倍数
|
||
|
||
Returns:
|
||
订单信息,失败返回None
|
||
"""
|
||
try:
|
||
# 0) 防止同一 symbol 重复挂入场单/快速重复尝试(去抖 + 冷却)
|
||
now_ms = int(time.time() * 1000)
|
||
cooldown_sec = int(config.TRADING_CONFIG.get("ENTRY_SYMBOL_COOLDOWN_SEC", 120) or 0)
|
||
last_ms = self._last_entry_attempt_ms.get(symbol)
|
||
if last_ms and cooldown_sec > 0 and now_ms - last_ms < cooldown_sec * 1000:
|
||
logger.info(f"{symbol} [入场] 冷却中({cooldown_sec}s),跳过本次自动开仓")
|
||
return None
|
||
|
||
pending = self._pending_entry_orders.get(symbol)
|
||
if pending and pending.get("order_id"):
|
||
logger.info(f"{symbol} [入场] 已有未完成入场订单(orderId={pending.get('order_id')}),跳过重复开仓")
|
||
return None
|
||
|
||
# 标记本次尝试(无论最终是否成交,都避免短时间内反复开仓/挂单)
|
||
self._last_entry_attempt_ms[symbol] = now_ms
|
||
|
||
# 判断是否应该交易
|
||
if not await self.risk_manager.should_trade(symbol, change_percent):
|
||
return None
|
||
|
||
# 设置杠杆
|
||
await self.client.set_leverage(symbol, leverage)
|
||
|
||
# 计算仓位大小(传入实际使用的杠杆)
|
||
logger.info(f"开始为 {symbol} 计算仓位大小...")
|
||
quantity = await self.risk_manager.calculate_position_size(
|
||
symbol, change_percent, leverage=leverage
|
||
)
|
||
|
||
if quantity is None:
|
||
logger.warning(f"❌ {symbol} 仓位计算失败,跳过交易")
|
||
logger.warning(f" 可能原因:")
|
||
logger.warning(f" 1. 账户余额不足")
|
||
logger.warning(f" 2. 单笔仓位超过限制")
|
||
logger.warning(f" 3. 总仓位超过限制")
|
||
logger.warning(f" 4. 无法获取价格数据")
|
||
logger.warning(f" 5. 保证金不足最小要求(MIN_MARGIN_USDT)")
|
||
logger.warning(f" 6. 名义价值小于0.2 USDT(避免无意义的小单子)")
|
||
return None
|
||
|
||
logger.info(f"✓ {symbol} 仓位计算成功: {quantity:.4f}")
|
||
|
||
# 确定交易方向(优先使用技术指标信号)
|
||
if trade_direction:
|
||
side = trade_direction
|
||
else:
|
||
side = 'BUY' if change_percent > 0 else 'SELL'
|
||
|
||
# 获取当前价格
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
return None
|
||
|
||
entry_price = ticker['price']
|
||
|
||
# 获取K线数据用于动态止损计算(从symbol_info中获取,如果可用)
|
||
klines = None
|
||
bollinger = None
|
||
if 'klines' in locals() or 'symbol_info' in locals():
|
||
# 尝试从外部传入的symbol_info获取
|
||
pass
|
||
|
||
# 计算基于支撑/阻力的动态止损
|
||
# 优先使用技术结构(支撑/阻力位、布林带)
|
||
# 如果无法获取K线数据,回退到ATR或固定止损
|
||
if not klines:
|
||
# 如果没有传入K线数据,尝试获取
|
||
try:
|
||
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
|
||
klines_data = await self.client.get_klines(
|
||
symbol=symbol,
|
||
interval=primary_interval,
|
||
limit=20 # 获取最近20根K线用于计算支撑/阻力
|
||
)
|
||
klines = klines_data if len(klines_data) >= 10 else None
|
||
except Exception as e:
|
||
logger.debug(f"获取K线数据失败,使用固定止损: {e}")
|
||
klines = None
|
||
|
||
# 在开仓前从Redis重新加载配置,确保使用最新配置(包括ATR参数)
|
||
# 从Redis读取最新配置(轻量级,即时生效)
|
||
try:
|
||
if config._config_manager:
|
||
config._config_manager.reload_from_redis()
|
||
config.TRADING_CONFIG = config._get_trading_config()
|
||
logger.debug(f"{symbol} 开仓前已从Redis重新加载配置")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载配置失败: {e}")
|
||
|
||
# ===== 智能入场(方案C:趋势强更少错过,震荡更保守)=====
|
||
smart_entry_enabled = bool(config.TRADING_CONFIG.get("SMART_ENTRY_ENABLED", True))
|
||
# LIMIT_ORDER_OFFSET_PCT:兼容“比例/百分比”两种存储方式
|
||
limit_offset_ratio = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("LIMIT_ORDER_OFFSET_PCT", 0.5) or 0.0))
|
||
|
||
# 规则:趋势(trending) 或 4H共振明显 + 强信号 -> 允许“超时后市价兜底(有追价上限)”
|
||
mr = (market_regime or "").strip().lower() if market_regime else ""
|
||
t4 = (trend_4h or "").strip().lower() if trend_4h else ""
|
||
ss = int(signal_strength) if signal_strength is not None else 0
|
||
|
||
allow_market_fallback = False
|
||
if mr == "trending":
|
||
allow_market_fallback = True
|
||
if ss >= int(config.TRADING_CONFIG.get("SMART_ENTRY_STRONG_SIGNAL", 8) or 8) and t4 in ("up", "down"):
|
||
allow_market_fallback = True
|
||
|
||
# 追价上限:超过就不追,宁愿错过(避免回到“无脑追价高频打损”)
|
||
drift_ratio_trending = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_TRENDING", 0.6) or 0.6))
|
||
drift_ratio_ranging = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_RANGING", 0.3) or 0.3))
|
||
max_drift_ratio = drift_ratio_trending if allow_market_fallback else drift_ratio_ranging
|
||
|
||
# 总等待/追价参数
|
||
timeout_sec = int(config.TRADING_CONFIG.get("ENTRY_TIMEOUT_SEC", 180) or 180)
|
||
step_wait_sec = int(config.TRADING_CONFIG.get("ENTRY_STEP_WAIT_SEC", 15) or 15)
|
||
chase_steps = int(config.TRADING_CONFIG.get("ENTRY_CHASE_MAX_STEPS", 4) or 4)
|
||
market_fallback_after_sec = int(config.TRADING_CONFIG.get("ENTRY_MARKET_FALLBACK_AFTER_SEC", 45) or 45)
|
||
|
||
# 初始限价(回调入场)
|
||
current_px = float(entry_price)
|
||
initial_limit = self._calc_limit_entry_price(current_px, side, limit_offset_ratio)
|
||
if initial_limit <= 0:
|
||
return None
|
||
|
||
order = None
|
||
entry_order_id = None
|
||
order_status = None
|
||
actual_entry_price = None
|
||
filled_quantity = 0.0
|
||
entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase")
|
||
|
||
if not smart_entry_enabled:
|
||
# 根治方案:关闭智能入场后,回归“纯限价单模式”
|
||
# - 不追价
|
||
# - 不市价兜底
|
||
# - 未在确认时间内成交则撤单并跳过(属于策略未触发入场,不是系统错误)
|
||
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
|
||
logger.info(
|
||
f"{symbol} [纯限价入场] side={side} | 限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
|
||
f"确认超时={confirm_timeout}s(未成交将撤单跳过)"
|
||
)
|
||
order = await self.client.place_order(
|
||
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit
|
||
)
|
||
if not order:
|
||
return None
|
||
entry_order_id = order.get("orderId")
|
||
if entry_order_id:
|
||
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
|
||
else:
|
||
# 1) 先挂限价单
|
||
logger.info(
|
||
f"{symbol} [智能入场] 模式={entry_mode_used} | side={side} | "
|
||
f"marketRegime={market_regime} trend_4h={trend_4h} strength={ss}/10 | "
|
||
f"初始限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
|
||
f"追价上限={max_drift_ratio*100:.2f}%"
|
||
)
|
||
|
||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit)
|
||
if not order:
|
||
return None
|
||
entry_order_id = order.get("orderId")
|
||
if entry_order_id:
|
||
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
|
||
|
||
start_ts = time.time()
|
||
# 2) 分步等待 + 追价(逐步减少 offset),并在趋势强时允许市价兜底(有追价上限)
|
||
for step in range(max(1, chase_steps)):
|
||
# 先等待一段时间看是否成交
|
||
wait_res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=step_wait_sec, poll_sec=1.0)
|
||
order_status = wait_res.get("status")
|
||
if wait_res.get("ok"):
|
||
actual_entry_price = float(wait_res.get("avg_price") or 0)
|
||
filled_quantity = float(wait_res.get("executed_qty") or 0)
|
||
break
|
||
|
||
# 未成交:如果超时太久且允许市价兜底,检查追价上限后转市价
|
||
elapsed = time.time() - start_ts
|
||
ticker2 = await self.client.get_ticker_24h(symbol)
|
||
cur2 = float(ticker2.get("price")) if ticker2 else current_px
|
||
drift_ratio = 0.0
|
||
try:
|
||
base = float(initial_limit) if float(initial_limit) > 0 else cur2
|
||
drift_ratio = abs((cur2 - base) / base)
|
||
except Exception:
|
||
drift_ratio = 0.0
|
||
|
||
if allow_market_fallback and elapsed >= market_fallback_after_sec:
|
||
if drift_ratio <= max_drift_ratio:
|
||
try:
|
||
await self.client.cancel_order(symbol, int(entry_order_id))
|
||
except Exception:
|
||
pass
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底")
|
||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET")
|
||
# 关键:转市价后必须更新 entry_order_id,否则后续会继续查询“已取消的旧限价单”,导致误判 CANCELED
|
||
try:
|
||
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
|
||
except Exception:
|
||
entry_order_id = None
|
||
break
|
||
else:
|
||
logger.info(f"{symbol} [智能入场] 限价超时,但偏离{drift_ratio*100:.2f}%>{max_drift_ratio*100:.2f}%,取消并放弃本次交易")
|
||
try:
|
||
await self.client.cancel_order(symbol, int(entry_order_id))
|
||
except Exception:
|
||
pass
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
|
||
# 震荡/不允许市价兜底:尝试追价(减小 offset -> 更靠近当前价),但不突破追价上限
|
||
try:
|
||
await self.client.cancel_order(symbol, int(entry_order_id))
|
||
except Exception:
|
||
pass
|
||
|
||
# offset 逐步减少到 0(越追越接近当前价)
|
||
step_ratio = (step + 1) / max(1, chase_steps)
|
||
cur_offset_ratio = max(0.0, limit_offset_ratio * (1.0 - step_ratio))
|
||
desired = self._calc_limit_entry_price(cur2, side, cur_offset_ratio)
|
||
if side == "BUY":
|
||
cap = initial_limit * (1 + max_drift_ratio)
|
||
desired = min(desired, cap)
|
||
else:
|
||
cap = initial_limit * (1 - max_drift_ratio)
|
||
desired = max(desired, cap)
|
||
|
||
if desired <= 0:
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
|
||
logger.info(
|
||
f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | "
|
||
f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%"
|
||
)
|
||
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired)
|
||
if not order:
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
entry_order_id = order.get("orderId")
|
||
if entry_order_id:
|
||
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
|
||
|
||
# 如果是市价兜底或最终限价成交,这里统一继续后续流程(下面会再查实际成交)
|
||
|
||
# ===== 统一处理:确认订单成交并获取实际成交价/数量 =====
|
||
if order:
|
||
if not entry_order_id:
|
||
entry_order_id = order.get("orderId")
|
||
if entry_order_id:
|
||
logger.info(f"{symbol} [开仓] 币安订单号: {entry_order_id}")
|
||
|
||
# 等待订单成交,检查订单状态并获取实际成交价格
|
||
# 只有在订单真正成交(FILLED)后才保存到数据库
|
||
if entry_order_id:
|
||
# 智能入场的限价订单可能需要更长等待,这里给一个总等待兜底(默认 30s)
|
||
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
|
||
res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=confirm_timeout, poll_sec=1.0)
|
||
order_status = res.get("status")
|
||
if res.get("ok"):
|
||
actual_entry_price = float(res.get("avg_price") or 0)
|
||
filled_quantity = float(res.get("executed_qty") or 0)
|
||
else:
|
||
# 未成交(NEW/超时/CANCELED 等)属于“策略未触发入场”或“挂单没成交”
|
||
# 这不应当当作系统错误;同时需要撤单(best-effort),避免留下悬挂委托造成后续混乱。
|
||
logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},跳过本次开仓并撤销挂单")
|
||
try:
|
||
await self.client.cancel_order(symbol, int(entry_order_id))
|
||
except Exception:
|
||
pass
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
|
||
if not actual_entry_price or actual_entry_price <= 0:
|
||
logger.error(f"{symbol} [开仓] ❌ 无法获取实际成交价格,不保存到数据库")
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
|
||
if filled_quantity <= 0:
|
||
logger.error(f"{symbol} [开仓] ❌ 成交数量为0,不保存到数据库")
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
return None
|
||
|
||
# 使用实际成交价格和数量
|
||
original_entry_price = entry_price
|
||
entry_price = actual_entry_price
|
||
quantity = filled_quantity # 使用实际成交数量
|
||
logger.info(f"{symbol} [开仓] ✓ 使用实际成交价格: {entry_price:.4f} USDT (下单时价格: {original_entry_price:.4f}), 成交数量: {quantity:.4f}")
|
||
|
||
# 成交后清理 pending
|
||
self._pending_entry_orders.pop(symbol, None)
|
||
|
||
# ===== 成交后基于“实际成交价/数量”重新计算止损止盈(修复限价/滑点导致的偏差)=====
|
||
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.03)
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(
|
||
entry_price, side, quantity, leverage,
|
||
stop_loss_pct=stop_loss_pct_margin,
|
||
klines=klines,
|
||
bollinger=bollinger,
|
||
atr=atr
|
||
)
|
||
|
||
stop_distance_for_tp = None
|
||
if side == 'BUY':
|
||
stop_distance_for_tp = entry_price - stop_loss_price
|
||
else:
|
||
stop_distance_for_tp = stop_loss_price - entry_price
|
||
|
||
if atr is not None and atr > 0 and entry_price > 0:
|
||
atr_percent = atr / entry_price
|
||
atr_multiplier = config.TRADING_CONFIG.get('ATR_STOP_LOSS_MULTIPLIER', 1.8)
|
||
stop_distance_for_tp = entry_price * atr_percent * atr_multiplier
|
||
|
||
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.30)
|
||
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
|
||
take_profit_pct_margin = float(stop_loss_pct_margin or 0) * 3.0
|
||
take_profit_price = self.risk_manager.get_take_profit_price(
|
||
entry_price, side, quantity, leverage,
|
||
take_profit_pct=take_profit_pct_margin,
|
||
atr=atr,
|
||
stop_distance=stop_distance_for_tp
|
||
)
|
||
|
||
# 分步止盈(基于“实际成交价 + 已计算的止损/止盈”)
|
||
if side == 'BUY':
|
||
take_profit_1 = entry_price + (entry_price - stop_loss_price) # 盈亏比1:1
|
||
else:
|
||
take_profit_1 = entry_price - (stop_loss_price - entry_price) # 盈亏比1:1
|
||
take_profit_2 = take_profit_price
|
||
|
||
# 交易规模:名义/保证金(用于统计总交易量与UI展示)
|
||
try:
|
||
notional_usdt = float(entry_price) * float(quantity)
|
||
except Exception:
|
||
notional_usdt = None
|
||
try:
|
||
margin_usdt = (float(notional_usdt) / float(leverage)) if notional_usdt is not None and float(leverage) > 0 else notional_usdt
|
||
except Exception:
|
||
margin_usdt = None
|
||
|
||
# 记录到数据库(只有在订单真正成交后才保存)
|
||
trade_id = None
|
||
if DB_AVAILABLE and Trade:
|
||
try:
|
||
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
|
||
trade_id = Trade.create(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity, # 使用实际成交数量
|
||
entry_price=entry_price, # 使用实际成交价格
|
||
leverage=leverage,
|
||
entry_reason=entry_reason,
|
||
entry_order_id=entry_order_id, # 保存币安订单号
|
||
stop_loss_price=stop_loss_price,
|
||
take_profit_price=take_profit_price,
|
||
take_profit_1=take_profit_1,
|
||
take_profit_2=take_profit_2,
|
||
atr=atr,
|
||
notional_usdt=notional_usdt,
|
||
margin_usdt=margin_usdt,
|
||
)
|
||
logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
|
||
except Exception as e:
|
||
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
return None
|
||
elif not DB_AVAILABLE:
|
||
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
|
||
elif not Trade:
|
||
logger.warning(f"Trade模型未导入,无法保存 {symbol} 交易记录")
|
||
|
||
# 记录持仓信息(包含动态止损止盈和分步止盈)
|
||
from datetime import datetime
|
||
position_info = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'quantity': quantity,
|
||
'entryPrice': entry_price,
|
||
'changePercent': change_percent,
|
||
'orderId': order.get('orderId'),
|
||
'tradeId': trade_id, # 数据库交易ID
|
||
'stopLoss': stop_loss_price,
|
||
'takeProfit': take_profit_price, # 保留原始止盈价(第二目标)
|
||
'takeProfit1': take_profit_1, # 第一目标(盈亏比1:1,了结50%)
|
||
'takeProfit2': take_profit_2, # 第二目标(原始止盈价,剩余50%)
|
||
'partialProfitTaken': False, # 是否已部分止盈
|
||
'remainingQuantity': quantity, # 剩余仓位数量
|
||
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
|
||
'leverage': leverage,
|
||
'entryReason': entry_reason,
|
||
'entryTime': get_beijing_time(), # 记录入场时间(使用北京时间,用于计算持仓持续时间)
|
||
'strategyType': 'trend_following', # 策略类型(简化后只有趋势跟踪)
|
||
'atr': atr,
|
||
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
|
||
'trailingStopActivated': False # 移动止损是否已激活
|
||
}
|
||
|
||
self.active_positions[symbol] = position_info
|
||
|
||
logger.info(
|
||
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
|
||
f"(涨跌幅: {change_percent:.2f}%)"
|
||
)
|
||
|
||
# 验证持仓是否真的在币安存在
|
||
try:
|
||
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
|
||
positions = await self.client.get_open_positions()
|
||
binance_position = next(
|
||
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
|
||
None
|
||
)
|
||
if binance_position:
|
||
logger.info(
|
||
f"{symbol} [开仓验证] ✓ 币安持仓确认: "
|
||
f"数量={float(binance_position.get('positionAmt', 0)):.4f}, "
|
||
f"入场价={float(binance_position.get('entryPrice', 0)):.4f}"
|
||
)
|
||
else:
|
||
logger.warning(
|
||
f"{symbol} [开仓验证] ⚠️ 币安账户中没有持仓,可能订单未成交或被立即平仓"
|
||
)
|
||
# 清理本地记录
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
# 如果数据库已保存,标记为取消
|
||
if trade_id and DB_AVAILABLE and Trade:
|
||
try:
|
||
from database.connection import db
|
||
db.execute_update(
|
||
"UPDATE trades SET status = 'cancelled' WHERE id = %s",
|
||
(trade_id,)
|
||
)
|
||
logger.info(f"{symbol} [开仓验证] 已更新数据库状态为 cancelled (ID: {trade_id})")
|
||
except Exception as e:
|
||
logger.warning(f"{symbol} [开仓验证] 更新数据库状态失败: {e}")
|
||
return None
|
||
except Exception as verify_error:
|
||
logger.warning(f"{symbol} [开仓验证] 验证持仓时出错: {verify_error},继续使用本地记录")
|
||
|
||
# 启动WebSocket实时监控
|
||
if self._monitoring_enabled:
|
||
await self._start_position_monitoring(symbol)
|
||
|
||
return position_info
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"开仓失败 {symbol}: {e}")
|
||
return None
|
||
|
||
async def close_position(self, symbol: str, reason: str = 'manual') -> bool:
|
||
"""
|
||
平仓
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
reason: 平仓原因(manual, stop_loss, take_profit, trailing_stop, sync)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})")
|
||
|
||
# 获取当前持仓
|
||
positions = await self.client.get_open_positions()
|
||
position = next(
|
||
(p for p in positions if p['symbol'] == symbol),
|
||
None
|
||
)
|
||
|
||
if not position:
|
||
logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓")
|
||
# 即使币安没有持仓,也要更新数据库状态
|
||
updated = False
|
||
if DB_AVAILABLE and Trade and symbol in self.active_positions:
|
||
position_info = self.active_positions[symbol]
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...")
|
||
# 获取当前价格作为平仓价格
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice'])
|
||
|
||
# 确保所有值都是float类型
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity = float(position_info['quantity'])
|
||
if position_info['side'] == 'BUY':
|
||
pnl = (exit_price - entry_price) * quantity
|
||
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
|
||
else:
|
||
pnl = (entry_price - exit_price) * quantity
|
||
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
|
||
|
||
# 同步平仓时没有订单号,设为None
|
||
# 计算持仓持续时间和策略类型
|
||
entry_time = position_info.get('entryTime')
|
||
duration_minutes = None
|
||
if entry_time:
|
||
try:
|
||
if isinstance(entry_time, str):
|
||
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
entry_dt = entry_time
|
||
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
|
||
duration = exit_dt - entry_dt
|
||
duration_minutes = int(duration.total_seconds() / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
strategy_type = position_info.get('strategyType', 'trend_following')
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price,
|
||
exit_reason=reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent,
|
||
exit_order_id=None, # 同步平仓时没有订单号
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes
|
||
)
|
||
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
|
||
updated = True
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}")
|
||
|
||
# 清理本地记录
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
|
||
# 如果更新了数据库,返回成功;否则返回失败
|
||
return updated
|
||
|
||
# 确定平仓方向(与开仓相反)
|
||
position_amt = position['positionAmt']
|
||
side = 'SELL' if position_amt > 0 else 'BUY'
|
||
quantity = abs(position_amt)
|
||
position_side = 'LONG' if position_amt > 0 else 'SHORT'
|
||
|
||
# 二次校验:用币安实时持仓数量兜底,避免 reduceOnly 被拒绝(-2022)
|
||
live_amt = await self._get_live_position_amt(symbol, position_side=position_side)
|
||
if live_amt is None or abs(live_amt) <= 0:
|
||
logger.warning(f"{symbol} [平仓] 实时查询到持仓已为0,跳过下单并按已平仓处理")
|
||
# 复用“币安无持仓”的处理逻辑:走上面的分支
|
||
position = None
|
||
# 触发上方逻辑:直接返回 updated/清理
|
||
# 这里简单调用同步函数路径
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
exit_price = float(ticker['price']) if ticker else float(position.get('entryPrice', 0) if position else 0)
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
logger.info(f"{symbol} [平仓] 已清理本地记录(币安无持仓)")
|
||
return True
|
||
|
||
# 以币安实时持仓数量为准(并向下截断到不超过持仓)
|
||
quantity = min(quantity, abs(live_amt))
|
||
quantity = await self._adjust_close_quantity(symbol, quantity)
|
||
if quantity <= 0:
|
||
logger.warning(f"{symbol} [平仓] 数量调整后为0,跳过下单并清理本地记录")
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
return True
|
||
|
||
logger.info(
|
||
f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET "
|
||
f"(持仓数量: {position_amt:.4f})"
|
||
)
|
||
|
||
# 平仓(使用 reduceOnly=True 确保只减少持仓,不增加反向持仓)
|
||
try:
|
||
logger.debug(f"{symbol} [平仓] 调用 place_order: {side} {quantity:.4f} @ MARKET (reduceOnly=True)")
|
||
order = await self.client.place_order(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
order_type='MARKET',
|
||
reduce_only=True, # 平仓时使用 reduceOnly=True
|
||
position_side=position_side, # 兼容对冲模式(Hedge):必须指定 LONG/SHORT
|
||
)
|
||
logger.debug(f"{symbol} [平仓] place_order 返回: {order}")
|
||
except Exception as order_error:
|
||
logger.error(f"{symbol} [平仓] ❌ 下单失败: {order_error}")
|
||
logger.error(f" 下单参数: symbol={symbol}, side={side}, quantity={quantity:.4f}, order_type=MARKET")
|
||
logger.error(f" 错误类型: {type(order_error).__name__}")
|
||
import traceback
|
||
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
|
||
raise # 重新抛出异常,让外层捕获
|
||
|
||
if order:
|
||
order_id = order.get('orderId')
|
||
logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order_id})")
|
||
|
||
# 等待订单成交,然后从币安获取实际成交价格
|
||
exit_price = None
|
||
try:
|
||
# 等待一小段时间让订单成交
|
||
await asyncio.sleep(1)
|
||
|
||
# 从币安获取订单详情,获取实际成交价格
|
||
try:
|
||
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
|
||
if order_info:
|
||
# 优先使用平均成交价格(avgPrice),如果没有则使用价格字段
|
||
exit_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0))
|
||
if exit_price > 0:
|
||
logger.info(f"{symbol} [平仓] 从币安订单获取实际成交价格: {exit_price:.4f} USDT")
|
||
else:
|
||
# 如果订单还没有完全成交,尝试从成交记录获取
|
||
if order_info.get('status') == 'FILLED' and order_info.get('fills'):
|
||
# 计算加权平均成交价格
|
||
total_qty = 0
|
||
total_value = 0
|
||
for fill in order_info.get('fills', []):
|
||
qty = float(fill.get('qty', 0))
|
||
price = float(fill.get('price', 0))
|
||
total_qty += qty
|
||
total_value += qty * price
|
||
if total_qty > 0:
|
||
exit_price = total_value / total_qty
|
||
logger.info(f"{symbol} [平仓] 从成交记录计算平均成交价格: {exit_price:.4f} USDT")
|
||
except Exception as order_error:
|
||
logger.warning(f"{symbol} [平仓] 获取订单详情失败: {order_error},使用备用方法")
|
||
|
||
# 如果无法从订单获取价格,使用当前价格作为备用
|
||
if not exit_price or exit_price <= 0:
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if ticker:
|
||
exit_price = float(ticker['price'])
|
||
logger.warning(f"{symbol} [平仓] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
|
||
else:
|
||
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
|
||
if exit_price <= 0:
|
||
logger.error(f"{symbol} [平仓] 无法获取平仓价格,使用订单价格字段")
|
||
exit_price = float(order.get('price', 0))
|
||
except Exception as price_error:
|
||
logger.warning(f"{symbol} [平仓] 获取成交价格时出错: {price_error},使用当前价格")
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
exit_price = float(ticker['price']) if ticker else float(order.get('price', 0))
|
||
|
||
# 更新数据库记录
|
||
if DB_AVAILABLE and Trade and symbol in self.active_positions:
|
||
position_info = self.active_positions[symbol]
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
|
||
# 计算盈亏(确保所有值都是float类型)
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity_float = float(quantity)
|
||
exit_price_float = float(exit_price)
|
||
if position_info['side'] == 'BUY':
|
||
pnl = (exit_price_float - entry_price) * quantity_float
|
||
pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl = (entry_price - exit_price_float) * quantity_float
|
||
pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100
|
||
|
||
# 获取平仓订单号
|
||
exit_order_id = order.get('orderId')
|
||
if exit_order_id:
|
||
logger.info(f"{symbol} [平仓] 币安订单号: {exit_order_id}")
|
||
|
||
# 计算持仓持续时间
|
||
entry_time = position_info.get('entryTime')
|
||
duration_minutes = None
|
||
if entry_time:
|
||
try:
|
||
if isinstance(entry_time, str):
|
||
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
entry_dt = entry_time
|
||
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
|
||
duration = exit_dt - entry_dt
|
||
duration_minutes = int(duration.total_seconds() / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
# 获取策略类型(从开仓原因或持仓信息中获取)
|
||
strategy_type = position_info.get('strategyType', 'trend_following') # 默认趋势跟踪
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price_float,
|
||
exit_reason=reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent,
|
||
exit_order_id=exit_order_id, # 保存币安平仓订单号
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes
|
||
)
|
||
logger.info(
|
||
f"{symbol} [平仓] ✓ 数据库记录已更新 "
|
||
f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
else:
|
||
logger.warning(f"{symbol} 没有关联的数据库交易ID,无法更新平仓记录")
|
||
elif not DB_AVAILABLE:
|
||
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
|
||
elif not Trade:
|
||
logger.warning(f"Trade模型未导入,无法更新 {symbol} 平仓记录")
|
||
|
||
# 停止WebSocket监控
|
||
await self._stop_position_monitoring(symbol)
|
||
|
||
# 移除持仓记录
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
|
||
logger.info(
|
||
f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} "
|
||
f"(原因: {reason})"
|
||
)
|
||
return True
|
||
else:
|
||
# place_order 返回 None:可能是 -2022(ReduceOnly rejected)等竞态场景
|
||
# 兜底:再查一次实时持仓,如果已经为0,则当作“已平仓”处理,避免刷屏与误判失败
|
||
try:
|
||
live2 = await self._get_live_position_amt(symbol, position_side=position_side)
|
||
except Exception:
|
||
live2 = None
|
||
if live2 is None or abs(live2) <= 0:
|
||
logger.warning(f"{symbol} [平仓] 下单返回None,但实时持仓已为0,按已平仓处理(可能竞态/手动平仓)")
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
return True
|
||
|
||
logger.error(f"{symbol} [平仓] ❌ 下单返回 None(实时持仓仍存在: {live2}),可能的原因:")
|
||
logger.error(f" 1. ReduceOnly 被拒绝(-2022)但持仓未同步")
|
||
logger.error(f" 2. 数量精度调整后为 0 或负数")
|
||
logger.error(f" 3. 无法获取价格信息")
|
||
logger.error(f" 4. 其他下单错误(已在 place_order 中记录)")
|
||
logger.error(f" 持仓信息: {side} {quantity:.4f} @ MARKET")
|
||
|
||
# 尝试获取更多诊断信息
|
||
try:
|
||
symbol_info = await self.client.get_symbol_info(symbol)
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if ticker:
|
||
current_price = ticker['price']
|
||
notional_value = quantity * current_price
|
||
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
|
||
logger.error(f" 当前价格: {current_price:.4f} USDT")
|
||
logger.error(f" 订单名义价值: {notional_value:.2f} USDT")
|
||
logger.error(f" 最小名义价值: {min_notional:.2f} USDT")
|
||
if notional_value < min_notional:
|
||
logger.error(f" ⚠ 订单名义价值不足,无法平仓")
|
||
except Exception as diag_error:
|
||
logger.warning(f" 无法获取诊断信息: {diag_error}")
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
|
||
|
||
# 尝试清理本地记录(即使平仓失败)
|
||
try:
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
logger.info(f"{symbol} [平仓] 已清理本地持仓记录")
|
||
except Exception as cleanup_error:
|
||
logger.warning(f"{symbol} [平仓] 清理本地记录时出错: {cleanup_error}")
|
||
|
||
return False
|
||
|
||
async def _get_live_position_amt(self, symbol: str, position_side: Optional[str] = None) -> Optional[float]:
|
||
"""
|
||
从币安原始接口读取持仓数量(避免本地状态/缓存不一致导致 reduceOnly 被拒绝)。
|
||
- 单向模式:通常只有一个 net 持仓
|
||
- 对冲模式:可能同时有 LONG/SHORT,两条腿用 positionSide 区分
|
||
"""
|
||
try:
|
||
if not getattr(self.client, "client", None):
|
||
return None
|
||
res = await self.client.client.futures_position_information(symbol=symbol)
|
||
if not isinstance(res, list):
|
||
return None
|
||
ps = (position_side or "").upper()
|
||
nonzero = []
|
||
for p in res:
|
||
if not isinstance(p, dict):
|
||
continue
|
||
try:
|
||
amt = float(p.get("positionAmt", 0))
|
||
except Exception:
|
||
continue
|
||
if abs(amt) <= 0:
|
||
continue
|
||
nonzero.append((amt, p))
|
||
if not nonzero:
|
||
return 0.0
|
||
if ps in ("LONG", "SHORT"):
|
||
for amt, p in nonzero:
|
||
pps = (p.get("positionSide") or "").upper()
|
||
if pps == ps:
|
||
return amt
|
||
# 如果没匹配到 positionSide,退化为按符号推断
|
||
if ps == "LONG":
|
||
cand = next((amt for amt, _ in nonzero if amt > 0), None)
|
||
return cand if cand is not None else 0.0
|
||
if ps == "SHORT":
|
||
cand = next((amt for amt, _ in nonzero if amt < 0), None)
|
||
return cand if cand is not None else 0.0
|
||
# 没提供 position_side:返回净持仓(单向模式)
|
||
return sum([amt for amt, _ in nonzero])
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 读取实时持仓失败: {e}")
|
||
return None
|
||
|
||
async def _adjust_close_quantity(self, symbol: str, quantity: float) -> float:
|
||
"""
|
||
平仓数量调整:只允许向下取整(避免超过持仓导致 reduceOnly 被拒绝)。
|
||
"""
|
||
try:
|
||
symbol_info = await self.client.get_symbol_info(symbol)
|
||
except Exception:
|
||
symbol_info = None
|
||
|
||
try:
|
||
q = float(quantity)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
if not symbol_info:
|
||
return max(0.0, q)
|
||
|
||
try:
|
||
step_size = float(symbol_info.get("stepSize", 0) or 0)
|
||
except Exception:
|
||
step_size = 0.0
|
||
qty_precision = int(symbol_info.get("quantityPrecision", 8) or 8)
|
||
|
||
if step_size and step_size > 0:
|
||
q = float(int(q / step_size)) * step_size
|
||
else:
|
||
q = round(q, qty_precision)
|
||
q = round(q, qty_precision)
|
||
return max(0.0, q)
|
||
|
||
async def check_stop_loss_take_profit(self) -> List[str]:
|
||
"""
|
||
检查止损止盈
|
||
|
||
Returns:
|
||
需要平仓的交易对列表
|
||
"""
|
||
closed_positions = []
|
||
|
||
try:
|
||
# 获取当前持仓
|
||
positions = await self.client.get_open_positions()
|
||
position_dict = {p['symbol']: p for p in positions}
|
||
|
||
for symbol, position_info in list(self.active_positions.items()):
|
||
if symbol not in position_dict:
|
||
# 持仓已不存在,移除记录
|
||
del self.active_positions[symbol]
|
||
continue
|
||
|
||
current_position = position_dict[symbol]
|
||
entry_price = position_info['entryPrice']
|
||
quantity = position_info['quantity'] # 修复:获取quantity
|
||
# 获取当前标记价格
|
||
current_price = current_position.get('markPrice', 0)
|
||
if current_price == 0:
|
||
# 如果标记价格为0,尝试从ticker获取
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if ticker:
|
||
current_price = ticker['price']
|
||
else:
|
||
current_price = entry_price
|
||
|
||
# 计算当前盈亏(基于保证金)
|
||
leverage = position_info.get('leverage', 10)
|
||
position_value = entry_price * quantity
|
||
margin = position_value / leverage if leverage > 0 else position_value
|
||
|
||
# 计算盈亏金额
|
||
if position_info['side'] == 'BUY':
|
||
pnl_amount = (current_price - entry_price) * quantity
|
||
else:
|
||
pnl_amount = (entry_price - current_price) * quantity
|
||
|
||
# 计算盈亏百分比(相对于保证金,与币安一致)
|
||
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 也计算价格百分比(用于显示和移动止损)
|
||
if position_info['side'] == 'BUY':
|
||
pnl_percent_price = ((current_price - entry_price) / entry_price) * 100
|
||
else:
|
||
pnl_percent_price = ((entry_price - current_price) / entry_price) * 100
|
||
|
||
# 更新最大盈利(基于保证金)
|
||
if pnl_percent_margin > position_info.get('maxProfit', 0):
|
||
position_info['maxProfit'] = pnl_percent_margin
|
||
|
||
# 移动止损逻辑(盈利后保护利润,基于保证金)
|
||
# 每次检查时从Redis重新加载配置,确保配置修改能即时生效
|
||
try:
|
||
if config._config_manager:
|
||
config._config_manager.reload_from_redis()
|
||
config.TRADING_CONFIG = config._get_trading_config()
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载配置失败: {e}")
|
||
|
||
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
|
||
if use_trailing:
|
||
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
|
||
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
|
||
|
||
if not position_info.get('trailingStopActivated', False):
|
||
# 盈利超过阈值后(相对于保证金),激活移动止损
|
||
if pnl_percent_margin > trailing_activation * 100:
|
||
position_info['trailingStopActivated'] = True
|
||
# 将止损移至成本价(保本)
|
||
position_info['stopLoss'] = entry_price
|
||
logger.info(
|
||
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
|
||
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
|
||
)
|
||
else:
|
||
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
|
||
# 如果已经部分止盈,使用剩余仓位计算
|
||
if position_info.get('partialProfitTaken', False):
|
||
remaining_quantity = position_info.get('remainingQuantity', quantity)
|
||
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
|
||
protect_amount = remaining_margin * trailing_protect
|
||
|
||
# 计算剩余仓位的盈亏
|
||
if position_info['side'] == 'BUY':
|
||
remaining_pnl = (current_price - entry_price) * remaining_quantity
|
||
else:
|
||
remaining_pnl = (entry_price - current_price) * remaining_quantity
|
||
|
||
# 计算新的止损价(基于剩余仓位)
|
||
if position_info['side'] == 'BUY':
|
||
new_stop_loss = entry_price + (remaining_pnl - protect_amount) / remaining_quantity
|
||
if new_stop_loss > position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
|
||
f"剩余数量: {remaining_quantity:.4f})"
|
||
)
|
||
else:
|
||
new_stop_loss = entry_price - (remaining_pnl - protect_amount) / remaining_quantity
|
||
if new_stop_loss < position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
|
||
f"剩余数量: {remaining_quantity:.4f})"
|
||
)
|
||
else:
|
||
# 未部分止盈,使用原始仓位计算
|
||
protect_amount = margin * trailing_protect
|
||
# 计算对应的止损价
|
||
if position_info['side'] == 'BUY':
|
||
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
|
||
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
|
||
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
|
||
if new_stop_loss > position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
|
||
)
|
||
else:
|
||
# 做空:止损价 = 开仓价 - (当前盈亏 - 保护金额) / 数量
|
||
new_stop_loss = entry_price - (pnl_amount - protect_amount) / quantity
|
||
if new_stop_loss < position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
|
||
)
|
||
|
||
# 检查止损(使用更新后的止损价,基于保证金收益比)
|
||
stop_loss = position_info.get('stopLoss')
|
||
if stop_loss is None:
|
||
logger.warning(f"{symbol} 止损价未设置,跳过止损检查")
|
||
elif stop_loss is not None:
|
||
# 计算止损对应的保证金百分比目标
|
||
if position_info['side'] == 'BUY':
|
||
stop_loss_amount = (entry_price - stop_loss) * quantity
|
||
else: # SELL
|
||
stop_loss_amount = (stop_loss - entry_price) * quantity
|
||
|
||
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 直接比较当前盈亏百分比与止损目标(基于保证金)
|
||
if pnl_percent_margin <= -stop_loss_pct_margin:
|
||
logger.warning(
|
||
f"{symbol} 触发止损(基于保证金): "
|
||
f"当前盈亏={pnl_percent_margin:.2f}% of margin <= 止损目标=-{stop_loss_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price:.4f}, 止损价={stop_loss:.4f}"
|
||
)
|
||
# 确定平仓原因
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
# 计算持仓持续时间
|
||
entry_time = position_info.get('entryTime')
|
||
duration_minutes = None
|
||
if entry_time:
|
||
try:
|
||
if isinstance(entry_time, str):
|
||
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
entry_dt = entry_time
|
||
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
|
||
duration = exit_dt - entry_dt
|
||
duration_minutes = int(duration.total_seconds() / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
# 获取策略类型
|
||
strategy_type = position_info.get('strategyType', 'trend_following')
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl_amount,
|
||
pnl_percent=pnl_percent_margin,
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止损记录失败: {e}")
|
||
if await self.close_position(symbol, reason=exit_reason):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
# 检查分步止盈(基于保证金收益比)
|
||
take_profit_1 = position_info.get('takeProfit1') # 第一目标(盈亏比1:1)
|
||
take_profit_2 = position_info.get('takeProfit2', position_info.get('takeProfit')) # 第二目标
|
||
partial_profit_taken = position_info.get('partialProfitTaken', False)
|
||
remaining_quantity = position_info.get('remainingQuantity', quantity)
|
||
|
||
# 第一目标:盈亏比1:1,了结50%仓位
|
||
if not partial_profit_taken and take_profit_1 is not None:
|
||
# 计算第一目标对应的保证金百分比
|
||
if position_info['side'] == 'BUY':
|
||
take_profit_1_amount = (take_profit_1 - entry_price) * quantity
|
||
else: # SELL
|
||
take_profit_1_amount = (entry_price - take_profit_1) * quantity
|
||
take_profit_1_pct_margin = (take_profit_1_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 直接比较当前盈亏百分比与第一目标(基于保证金)
|
||
if pnl_percent_margin >= take_profit_1_pct_margin:
|
||
logger.info(
|
||
f"{symbol} 触发第一目标止盈(盈亏比1:1,基于保证金): "
|
||
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_1_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price:.4f}, 目标价={take_profit_1:.4f}"
|
||
)
|
||
# 部分平仓50%
|
||
partial_quantity = quantity * 0.5
|
||
try:
|
||
# 部分平仓
|
||
close_side = 'SELL' if position_info['side'] == 'BUY' else 'BUY'
|
||
close_position_side = 'LONG' if position_info['side'] == 'BUY' else 'SHORT'
|
||
# 二次校验并截断数量,避免 reduceOnly 被拒绝(-2022)
|
||
live_amt = await self._get_live_position_amt(symbol, position_side=close_position_side)
|
||
if live_amt is None or abs(live_amt) <= 0:
|
||
logger.warning(f"{symbol} 部分止盈:实时持仓已为0,跳过部分平仓")
|
||
continue
|
||
partial_quantity = min(partial_quantity, abs(live_amt))
|
||
partial_quantity = await self._adjust_close_quantity(symbol, partial_quantity)
|
||
if partial_quantity <= 0:
|
||
logger.warning(f"{symbol} 部分止盈:数量调整后为0,跳过")
|
||
continue
|
||
partial_order = await self.client.place_order(
|
||
symbol=symbol,
|
||
side=close_side,
|
||
quantity=partial_quantity,
|
||
order_type='MARKET',
|
||
reduce_only=True, # 部分止盈必须 reduceOnly,避免反向开仓
|
||
position_side=close_position_side, # 兼容对冲模式:指定要减少的持仓方向
|
||
)
|
||
if partial_order:
|
||
position_info['partialProfitTaken'] = True
|
||
position_info['remainingQuantity'] = remaining_quantity - partial_quantity
|
||
logger.info(
|
||
f"{symbol} 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}"
|
||
)
|
||
# 分步止盈后的“保本”处理:
|
||
# - 若启用 USE_TRAILING_STOP:允许把剩余仓位止损移至成本价,并进入移动止损阶段
|
||
# - 若关闭 USE_TRAILING_STOP:严格不自动移动止损(避免你说的“仍然保本/仍然移动止损”)
|
||
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
|
||
if use_trailing:
|
||
position_info['stopLoss'] = entry_price
|
||
position_info['trailingStopActivated'] = True
|
||
logger.info(f"{symbol} 剩余仓位止损移至成本价(保本),配合移动止损博取更大利润")
|
||
else:
|
||
logger.info(f"{symbol} 已部分止盈,但已关闭移动止损:不自动将止损移至成本价")
|
||
else:
|
||
# 兜底:可能遇到 -2022(reduceOnly rejected)等竞态,重新查一次持仓
|
||
try:
|
||
live2 = await self._get_live_position_amt(symbol, position_side=close_position_side)
|
||
except Exception:
|
||
live2 = None
|
||
if live2 is None or abs(live2) <= 0:
|
||
logger.warning(f"{symbol} 部分止盈下单返回None,但实时持仓已为0,跳过")
|
||
continue
|
||
logger.warning(f"{symbol} 部分止盈下单返回None(实时持仓仍存在: {live2}),稍后将继续由止损/止盈逻辑处理")
|
||
except Exception as e:
|
||
logger.error(f"{symbol} 部分止盈失败: {e}")
|
||
|
||
# 第二目标:原始止盈价,平掉剩余仓位(基于保证金收益比)
|
||
if partial_profit_taken and take_profit_2 is not None:
|
||
# 计算第二目标对应的保证金百分比
|
||
if position_info['side'] == 'BUY':
|
||
take_profit_2_amount = (take_profit_2 - entry_price) * remaining_quantity
|
||
else: # SELL
|
||
take_profit_2_amount = (entry_price - take_profit_2) * remaining_quantity
|
||
# 使用剩余仓位的保证金
|
||
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
|
||
take_profit_2_pct_margin = (take_profit_2_amount / remaining_margin * 100) if remaining_margin > 0 else 0
|
||
# 计算剩余仓位的当前盈亏
|
||
if position_info['side'] == 'BUY':
|
||
remaining_pnl_amount = (current_price - entry_price) * remaining_quantity
|
||
else:
|
||
remaining_pnl_amount = (entry_price - current_price) * remaining_quantity
|
||
remaining_pnl_pct_margin = (remaining_pnl_amount / remaining_margin * 100) if remaining_margin > 0 else 0
|
||
|
||
# 直接比较剩余仓位盈亏百分比与第二目标(基于保证金)
|
||
if remaining_pnl_pct_margin >= take_profit_2_pct_margin:
|
||
logger.info(
|
||
f"{symbol} 触发第二目标止盈(基于保证金): "
|
||
f"剩余仓位盈亏={remaining_pnl_pct_margin:.2f}% of margin >= 目标={take_profit_2_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price:.4f}, 目标价={take_profit_2:.4f}, "
|
||
f"剩余数量={remaining_quantity:.4f}"
|
||
)
|
||
exit_reason = 'take_profit'
|
||
# 计算总盈亏(原始仓位 + 部分止盈的盈亏)
|
||
# 部分止盈时的价格需要从数据库或记录中获取,这里简化处理
|
||
total_pnl_amount = remaining_pnl_amount # 简化:只计算剩余仓位盈亏
|
||
total_pnl_percent = (total_pnl_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
# 计算持仓持续时间
|
||
entry_time = position_info.get('entryTime')
|
||
duration_minutes = None
|
||
if entry_time:
|
||
try:
|
||
if isinstance(entry_time, str):
|
||
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
entry_dt = entry_time
|
||
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
|
||
duration = exit_dt - entry_dt
|
||
duration_minutes = int(duration.total_seconds() / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
# 获取策略类型
|
||
strategy_type = position_info.get('strategyType', 'trend_following')
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason=exit_reason,
|
||
pnl=total_pnl_amount,
|
||
pnl_percent=total_pnl_percent,
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止盈记录失败: {e}")
|
||
if await self.close_position(symbol, reason=exit_reason):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
else:
|
||
# 如果未部分止盈,但达到止盈目标,直接全部平仓(基于保证金收益比)
|
||
take_profit = position_info.get('takeProfit')
|
||
if take_profit is not None:
|
||
# 计算止盈对应的保证金百分比
|
||
if position_info['side'] == 'BUY':
|
||
take_profit_amount = (take_profit - entry_price) * quantity
|
||
else: # SELL
|
||
take_profit_amount = (entry_price - take_profit) * quantity
|
||
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
|
||
if pnl_percent_margin >= take_profit_pct_margin:
|
||
logger.info(
|
||
f"{symbol} 触发止盈(基于保证金): "
|
||
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price:.4f}, 止盈价={take_profit:.4f}"
|
||
)
|
||
exit_reason = 'take_profit'
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
# 计算持仓持续时间和策略类型
|
||
entry_time = position_info.get('entryTime')
|
||
duration_minutes = None
|
||
if entry_time:
|
||
try:
|
||
from datetime import datetime
|
||
if isinstance(entry_time, str):
|
||
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
entry_dt = entry_time
|
||
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
|
||
duration = exit_dt - entry_dt
|
||
duration_minutes = int(duration.total_seconds() / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
strategy_type = position_info.get('strategyType', 'trend_following')
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl_amount,
|
||
pnl_percent=pnl_percent_margin,
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止盈记录失败: {e}")
|
||
if await self.close_position(symbol, reason=exit_reason):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查止损止盈失败: {e}")
|
||
|
||
return closed_positions
|
||
|
||
async def get_position_summary(self) -> Dict:
|
||
"""
|
||
获取持仓摘要
|
||
|
||
Returns:
|
||
持仓摘要信息
|
||
"""
|
||
try:
|
||
positions = await self.client.get_open_positions()
|
||
balance = await self.client.get_account_balance()
|
||
|
||
total_pnl = sum(p['unRealizedProfit'] for p in positions)
|
||
|
||
return {
|
||
'totalPositions': len(positions),
|
||
'totalBalance': balance.get('total', 0),
|
||
'availableBalance': balance.get('available', 0),
|
||
'totalPnL': total_pnl,
|
||
'positions': [
|
||
{
|
||
'symbol': p['symbol'],
|
||
'positionAmt': p['positionAmt'],
|
||
'entryPrice': p['entryPrice'],
|
||
'pnl': p['unRealizedProfit']
|
||
}
|
||
for p in positions
|
||
]
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取持仓摘要失败: {e}")
|
||
return {}
|
||
|
||
async def sync_positions_with_binance(self):
|
||
"""
|
||
同步币安实际持仓状态与数据库状态
|
||
检查哪些持仓在数据库中还是open状态,但在币安已经不存在了
|
||
"""
|
||
if not DB_AVAILABLE or not Trade:
|
||
logger.debug("数据库不可用,跳过持仓状态同步")
|
||
return
|
||
|
||
try:
|
||
logger.info("开始同步币安持仓状态与数据库...")
|
||
|
||
# 1. 获取币安实际持仓
|
||
binance_positions = await self.client.get_open_positions()
|
||
binance_symbols = {p['symbol'] for p in binance_positions}
|
||
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})")
|
||
|
||
# 2. 获取数据库中状态为open的交易记录
|
||
db_open_trades = Trade.get_all(status='open')
|
||
db_open_symbols = {t['symbol'] for t in db_open_trades}
|
||
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else '无'})")
|
||
|
||
# 3. 找出在数据库中open但在币安已不存在的持仓
|
||
missing_in_binance = db_open_symbols - binance_symbols
|
||
|
||
if missing_in_binance:
|
||
logger.warning(
|
||
f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态,但在币安已不存在: "
|
||
f"{', '.join(missing_in_binance)}"
|
||
)
|
||
|
||
# 4. 更新这些持仓的状态
|
||
for symbol in missing_in_binance:
|
||
try:
|
||
trades = Trade.get_by_symbol(symbol, status='open')
|
||
if not trades:
|
||
logger.warning(f"{symbol} [状态同步] ⚠️ 数据库中没有找到open状态的交易记录,跳过")
|
||
continue
|
||
|
||
logger.info(f"{symbol} [状态同步] 找到 {len(trades)} 条open状态的交易记录,开始更新...")
|
||
except Exception as get_trades_error:
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 获取交易记录失败: "
|
||
f"错误类型={type(get_trades_error).__name__}, 错误消息={str(get_trades_error)}"
|
||
)
|
||
import traceback
|
||
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
|
||
continue
|
||
|
||
for trade in trades:
|
||
trade_id = trade.get('id')
|
||
if not trade_id:
|
||
logger.warning(f"{symbol} [状态同步] ⚠️ 交易记录缺少ID字段,跳过: {trade}")
|
||
continue
|
||
|
||
try:
|
||
logger.info(
|
||
f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})... | "
|
||
f"入场价: {trade.get('entry_price', 'N/A')}, "
|
||
f"数量: {trade.get('quantity', 'N/A')}, "
|
||
f"方向: {trade.get('side', 'N/A')}"
|
||
)
|
||
|
||
# 尝试从币安历史订单获取实际平仓价格
|
||
exit_price = None
|
||
close_orders = []
|
||
try:
|
||
# 获取最近的平仓订单(reduceOnly=True的订单)
|
||
import time
|
||
end_time = int(time.time() * 1000) # 当前时间(毫秒)
|
||
start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天
|
||
|
||
logger.debug(
|
||
f"{symbol} [状态同步] 获取历史订单: "
|
||
f"symbol={symbol}, startTime={start_time}, endTime={end_time}"
|
||
)
|
||
|
||
# 获取历史订单
|
||
orders = await self.client.client.futures_get_all_orders(
|
||
symbol=symbol,
|
||
startTime=start_time,
|
||
endTime=end_time
|
||
)
|
||
|
||
# 验证 orders 的类型
|
||
if not isinstance(orders, list):
|
||
logger.warning(
|
||
f"{symbol} [状态同步] ⚠️ futures_get_all_orders 返回的不是列表: "
|
||
f"类型={type(orders)}, 值={orders}"
|
||
)
|
||
orders = [] # 设置为空列表,避免后续错误
|
||
|
||
if not orders:
|
||
logger.debug(f"{symbol} [状态同步] 未找到历史订单")
|
||
else:
|
||
logger.debug(f"{symbol} [状态同步] 找到 {len(orders)} 个历史订单")
|
||
|
||
# 查找最近的平仓订单(reduceOnly=True且已成交)
|
||
close_orders = []
|
||
for o in orders:
|
||
try:
|
||
if isinstance(o, dict) and o.get('reduceOnly') == True and o.get('status') == 'FILLED':
|
||
close_orders.append(o)
|
||
except (AttributeError, TypeError) as e:
|
||
logger.warning(
|
||
f"{symbol} [状态同步] ⚠️ 处理订单数据时出错: "
|
||
f"错误类型={type(e).__name__}, 错误消息={str(e)}, "
|
||
f"订单数据类型={type(o)}, 订单数据={o}"
|
||
)
|
||
continue
|
||
|
||
if close_orders:
|
||
# 按时间倒序排序,取最近的
|
||
close_orders.sort(key=lambda x: x.get('updateTime', 0), reverse=True)
|
||
latest_order = close_orders[0]
|
||
|
||
# 获取平均成交价格
|
||
exit_price = float(latest_order.get('avgPrice', 0))
|
||
if exit_price > 0:
|
||
logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT")
|
||
else:
|
||
logger.warning(
|
||
f"{symbol} [状态同步] 历史订单中没有有效的avgPrice: {latest_order}"
|
||
)
|
||
except KeyError as key_error:
|
||
# KeyError 可能是访问 orders[symbol] 或其他字典访问错误
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 获取历史订单时KeyError: "
|
||
f"错误key={key_error}, 错误类型={type(key_error).__name__}"
|
||
)
|
||
import traceback
|
||
logger.debug(f"{symbol} [状态同步] KeyError详情:\n{traceback.format_exc()}")
|
||
except Exception as order_error:
|
||
logger.warning(
|
||
f"{symbol} [状态同步] 获取历史订单失败: "
|
||
f"错误类型={type(order_error).__name__}, 错误消息={str(order_error)}"
|
||
)
|
||
import traceback
|
||
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
|
||
|
||
# 如果无法从订单获取,使用当前价格
|
||
if not exit_price or exit_price <= 0:
|
||
try:
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if ticker and ticker.get('price'):
|
||
exit_price = float(ticker['price'])
|
||
logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
|
||
else:
|
||
exit_price = float(trade.get('entry_price', 0))
|
||
logger.warning(
|
||
f"{symbol} [状态同步] 无法获取当前价格(ticker={ticker}),"
|
||
f"使用入场价: {exit_price:.4f} USDT"
|
||
)
|
||
except KeyError as key_error:
|
||
# KeyError 可能是访问 ticker['price'] 时出错
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 获取当前价格时KeyError: {key_error}, "
|
||
f"ticker数据: {ticker if 'ticker' in locals() else 'N/A'}"
|
||
)
|
||
exit_price = float(trade.get('entry_price', 0))
|
||
if exit_price <= 0:
|
||
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
|
||
continue
|
||
except Exception as ticker_error:
|
||
logger.warning(
|
||
f"{symbol} [状态同步] 获取当前价格失败: "
|
||
f"错误类型={type(ticker_error).__name__}, 错误消息={str(ticker_error)},"
|
||
f"使用入场价: {trade.get('entry_price', 'N/A')}"
|
||
)
|
||
exit_price = float(trade.get('entry_price', 0))
|
||
if exit_price <= 0:
|
||
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
|
||
continue
|
||
|
||
# 计算盈亏(确保所有值都是float类型,避免Decimal类型问题)
|
||
try:
|
||
entry_price = float(trade.get('entry_price', 0))
|
||
quantity = float(trade.get('quantity', 0))
|
||
|
||
if entry_price <= 0 or quantity <= 0:
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 交易记录数据无效: "
|
||
f"入场价={entry_price}, 数量={quantity}, 跳过更新"
|
||
)
|
||
continue
|
||
|
||
if trade.get('side') == 'BUY':
|
||
pnl = (exit_price - entry_price) * quantity
|
||
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl = (entry_price - exit_price) * quantity
|
||
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
|
||
|
||
logger.debug(
|
||
f"{symbol} [状态同步] 盈亏计算: "
|
||
f"入场价={entry_price:.4f}, 平仓价={exit_price:.4f}, "
|
||
f"数量={quantity:.4f}, 方向={trade.get('side', 'N/A')}, "
|
||
f"盈亏={pnl:.2f} USDT ({pnl_percent:.2f}%)"
|
||
)
|
||
except (ValueError, TypeError, KeyError) as calc_error:
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 计算盈亏失败 (ID: {trade_id}): "
|
||
f"错误类型={type(calc_error).__name__}, 错误消息={str(calc_error)}, "
|
||
f"交易记录数据: entry_price={trade.get('entry_price', 'N/A')}, "
|
||
f"quantity={trade.get('quantity', 'N/A')}, side={trade.get('side', 'N/A')}"
|
||
)
|
||
continue
|
||
|
||
# 从历史订单中获取平仓订单号
|
||
exit_order_id = None
|
||
latest_close_order = None
|
||
if close_orders:
|
||
exit_order_id = close_orders[0].get('orderId')
|
||
latest_close_order = close_orders[0]
|
||
if exit_order_id:
|
||
logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}")
|
||
|
||
# 使用 try-except 包裹,确保异常被正确处理
|
||
try:
|
||
# 计算持仓持续时间和策略类型
|
||
# exit_reason 细分:优先看币安平仓订单类型,其次用价格接近止损/止盈价做兜底
|
||
exit_reason = "sync"
|
||
exit_time_ts = None
|
||
try:
|
||
if latest_close_order and isinstance(latest_close_order, dict):
|
||
otype = str(
|
||
latest_close_order.get("type")
|
||
or latest_close_order.get("origType")
|
||
or ""
|
||
).upper()
|
||
if "TRAILING" in otype:
|
||
exit_reason = "trailing_stop"
|
||
elif "TAKE_PROFIT" in otype:
|
||
exit_reason = "take_profit"
|
||
elif "STOP" in otype:
|
||
# STOP / STOP_MARKET 通常对应止损触发
|
||
exit_reason = "stop_loss"
|
||
elif otype in ("MARKET", "LIMIT"):
|
||
exit_reason = "manual"
|
||
|
||
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
|
||
try:
|
||
if ms:
|
||
exit_time_ts = int(int(ms) / 1000)
|
||
except Exception:
|
||
exit_time_ts = None
|
||
|
||
except Exception:
|
||
# 保持默认 sync
|
||
pass
|
||
|
||
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
|
||
try:
|
||
def _close_to(a: float, b: float, max_pct: float = 0.01) -> bool:
|
||
if a <= 0 or b <= 0:
|
||
return False
|
||
return abs((a - b) / b) <= max_pct
|
||
|
||
ep = float(exit_price or 0)
|
||
if ep > 0:
|
||
sl = trade.get("stop_loss_price")
|
||
tp = trade.get("take_profit_price")
|
||
tp1 = trade.get("take_profit_1")
|
||
tp2 = trade.get("take_profit_2")
|
||
if sl is not None and _close_to(ep, float(sl), max_pct=0.01):
|
||
exit_reason = "stop_loss"
|
||
elif tp is not None and _close_to(ep, float(tp), max_pct=0.01):
|
||
exit_reason = "take_profit"
|
||
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.01):
|
||
exit_reason = "take_profit"
|
||
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.01):
|
||
exit_reason = "take_profit"
|
||
except Exception:
|
||
pass
|
||
|
||
# 持仓持续时间(分钟):优先用币安订单时间,否则用当前时间
|
||
entry_time = trade.get("entry_time")
|
||
duration_minutes = None
|
||
try:
|
||
et = None
|
||
if isinstance(entry_time, (int, float)):
|
||
et = int(entry_time)
|
||
elif isinstance(entry_time, str) and entry_time.strip():
|
||
# 兼容旧格式:字符串时间戳/日期字符串
|
||
s = entry_time.strip()
|
||
if s.isdigit():
|
||
et = int(s)
|
||
else:
|
||
from datetime import datetime
|
||
et = int(datetime.fromisoformat(s).timestamp())
|
||
|
||
xt = int(exit_time_ts) if exit_time_ts is not None else int(get_beijing_time())
|
||
if et is not None and xt >= et:
|
||
duration_minutes = int((xt - et) / 60)
|
||
except Exception as e:
|
||
logger.debug(f"计算持仓持续时间失败: {e}")
|
||
|
||
strategy_type = 'trend_following' # 默认策略类型
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent,
|
||
exit_order_id=exit_order_id, # 保存币安平仓订单号
|
||
strategy_type=strategy_type,
|
||
duration_minutes=duration_minutes,
|
||
exit_time_ts=exit_time_ts,
|
||
)
|
||
except Exception as update_error:
|
||
# update_exit 内部已经有异常处理,但如果仍然失败,记录错误但不中断同步流程
|
||
error_str = str(update_error)
|
||
if "Duplicate entry" in error_str and "exit_order_id" in error_str:
|
||
logger.warning(
|
||
f"{symbol} [状态同步] ⚠️ exit_order_id {exit_order_id} 唯一约束冲突,"
|
||
f"update_exit 内部处理失败,尝试不更新 exit_order_id"
|
||
)
|
||
# 再次尝试,不更新 exit_order_id
|
||
try:
|
||
from database.connection import db
|
||
from database.models import get_beijing_time
|
||
exit_time = int(exit_time_ts) if exit_time_ts is not None else get_beijing_time()
|
||
db.execute_update(
|
||
"""UPDATE trades
|
||
SET exit_price = %s, exit_time = %s,
|
||
exit_reason = %s, pnl = %s, pnl_percent = %s, status = 'closed'
|
||
WHERE id = %s""",
|
||
(exit_price, exit_time, exit_reason, pnl, pnl_percent, trade_id)
|
||
)
|
||
logger.info(f"{symbol} [状态同步] ✓ 已更新(跳过 exit_order_id)")
|
||
except Exception as retry_error:
|
||
logger.error(f"{symbol} [状态同步] ❌ 重试更新也失败: {retry_error}")
|
||
raise
|
||
else:
|
||
# 其他错误,重新抛出
|
||
raise
|
||
|
||
logger.info(
|
||
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "
|
||
f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)"
|
||
)
|
||
|
||
# 清理本地记录
|
||
if symbol in self.active_positions:
|
||
await self._stop_position_monitoring(symbol)
|
||
del self.active_positions[symbol]
|
||
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
|
||
|
||
except Exception as e:
|
||
# 详细记录错误信息,包括异常类型、错误消息和堆栈跟踪
|
||
import traceback
|
||
error_type = type(e).__name__
|
||
error_msg = str(e)
|
||
error_traceback = traceback.format_exc()
|
||
|
||
logger.error(
|
||
f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): "
|
||
f"错误类型={error_type}, 错误消息={error_msg}"
|
||
)
|
||
logger.debug(
|
||
f"{symbol} [状态同步] 错误详情:\n{error_traceback}"
|
||
)
|
||
|
||
# 如果是数据库相关错误,记录更多信息
|
||
if "Duplicate entry" in error_msg or "1062" in error_msg:
|
||
logger.error(
|
||
f"{symbol} [状态同步] 数据库唯一约束冲突,"
|
||
f"可能原因: exit_order_id={exit_order_id} 已被其他交易记录使用"
|
||
)
|
||
elif "database" in error_msg.lower() or "connection" in error_msg.lower():
|
||
logger.error(
|
||
f"{symbol} [状态同步] 数据库连接或执行错误,"
|
||
f"请检查数据库连接状态"
|
||
)
|
||
else:
|
||
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")
|
||
|
||
# 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的)
|
||
missing_in_db = binance_symbols - db_open_symbols
|
||
if missing_in_db:
|
||
logger.info(
|
||
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: "
|
||
f"{', '.join(missing_in_db)} (可能是手动开仓)"
|
||
)
|
||
|
||
# 为手动开仓的持仓创建数据库记录并启动监控
|
||
for symbol in missing_in_db:
|
||
try:
|
||
# 获取币安持仓详情
|
||
binance_position = next(
|
||
(p for p in binance_positions if p['symbol'] == symbol),
|
||
None
|
||
)
|
||
if not binance_position:
|
||
continue
|
||
|
||
position_amt = binance_position['positionAmt']
|
||
entry_price = binance_position['entryPrice']
|
||
quantity = abs(position_amt)
|
||
side = 'BUY' if position_amt > 0 else 'SELL'
|
||
|
||
logger.info(
|
||
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
|
||
f"({side} {quantity:.4f} @ {entry_price:.4f})"
|
||
)
|
||
|
||
# 创建数据库记录
|
||
trade_id = Trade.create(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
entry_price=entry_price,
|
||
leverage=binance_position.get('leverage', 10),
|
||
entry_reason='manual_entry', # 标记为手动开仓
|
||
# 手动开仓无法拿到策略侧ATR/分步止盈,这里尽量补齐“规模字段”
|
||
notional_usdt=(float(entry_price) * float(quantity)) if entry_price and quantity else None,
|
||
margin_usdt=((float(entry_price) * float(quantity)) / float(binance_position.get('leverage', 10))) if entry_price and quantity and float(binance_position.get('leverage', 10) or 0) > 0 else None,
|
||
)
|
||
|
||
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
|
||
|
||
# 创建本地持仓记录(用于监控)
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
current_price = ticker['price'] if ticker else entry_price
|
||
|
||
# 计算止损止盈(基于保证金)
|
||
leverage = binance_position.get('leverage', 10)
|
||
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
|
||
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
|
||
# 如果配置中没有设置止盈,则使用止损的2倍作为默认
|
||
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
|
||
take_profit_pct_margin = stop_loss_pct_margin * 2.0
|
||
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(
|
||
entry_price, side, quantity, leverage,
|
||
stop_loss_pct=stop_loss_pct_margin
|
||
)
|
||
take_profit_price = self.risk_manager.get_take_profit_price(
|
||
entry_price, side, quantity, leverage,
|
||
take_profit_pct=take_profit_pct_margin
|
||
)
|
||
|
||
position_info = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'quantity': quantity,
|
||
'entryPrice': entry_price,
|
||
'changePercent': 0, # 手动开仓,无法计算涨跌幅
|
||
'orderId': None,
|
||
'tradeId': trade_id,
|
||
'stopLoss': stop_loss_price,
|
||
'takeProfit': take_profit_price,
|
||
'initialStopLoss': stop_loss_price,
|
||
'leverage': leverage,
|
||
'entryReason': 'manual_entry',
|
||
'atr': None,
|
||
'maxProfit': 0.0,
|
||
'trailingStopActivated': False
|
||
}
|
||
|
||
self.active_positions[symbol] = position_info
|
||
|
||
# 启动WebSocket监控
|
||
if self._monitoring_enabled:
|
||
await self._start_position_monitoring(symbol)
|
||
logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控")
|
||
|
||
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
|
||
logger.info("持仓状态同步完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"同步持仓状态失败: {e}")
|
||
import traceback
|
||
logger.error(f"错误详情:\n{traceback.format_exc()}")
|
||
|
||
async def start_all_position_monitoring(self):
|
||
"""
|
||
启动所有持仓的WebSocket实时监控
|
||
"""
|
||
if not self._monitoring_enabled:
|
||
logger.info("实时监控已禁用,跳过启动")
|
||
return
|
||
|
||
# WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager
|
||
if not self.client:
|
||
logger.warning("客户端未初始化,无法启动实时监控")
|
||
return
|
||
|
||
# 获取当前所有持仓
|
||
positions = await self.client.get_open_positions()
|
||
binance_symbols = {p['symbol'] for p in positions}
|
||
active_symbols = set(self.active_positions.keys())
|
||
|
||
logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})")
|
||
logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else '无'})")
|
||
|
||
# 为所有币安持仓启动监控(即使不在active_positions中,可能是手动开仓的)
|
||
for position in positions:
|
||
symbol = position['symbol']
|
||
if symbol not in self._monitor_tasks:
|
||
# 如果不在active_positions中,先创建记录
|
||
if symbol not in self.active_positions:
|
||
logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...")
|
||
# 这里会通过sync_positions_with_binance来处理,但先启动监控
|
||
try:
|
||
entry_price = position.get('entryPrice', 0)
|
||
position_amt = position['positionAmt']
|
||
quantity = abs(position_amt)
|
||
side = 'BUY' if position_amt > 0 else 'SELL'
|
||
|
||
# 创建临时记录用于监控
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
current_price = ticker['price'] if ticker else entry_price
|
||
|
||
# 计算止损止盈(基于保证金)
|
||
leverage = position.get('leverage', 10)
|
||
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
|
||
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
|
||
# 如果配置中没有设置止盈,则使用止损的2倍作为默认
|
||
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
|
||
take_profit_pct_margin = stop_loss_pct_margin * 2.0
|
||
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(
|
||
entry_price, side, quantity, leverage,
|
||
stop_loss_pct=stop_loss_pct_margin
|
||
)
|
||
take_profit_price = self.risk_manager.get_take_profit_price(
|
||
entry_price, side, quantity, leverage,
|
||
take_profit_pct=take_profit_pct_margin
|
||
)
|
||
|
||
position_info = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'quantity': quantity,
|
||
'entryPrice': entry_price,
|
||
'changePercent': 0,
|
||
'orderId': None,
|
||
'tradeId': None,
|
||
'stopLoss': stop_loss_price,
|
||
'takeProfit': take_profit_price,
|
||
'initialStopLoss': stop_loss_price,
|
||
'leverage': leverage,
|
||
'entryReason': 'manual_entry_temp',
|
||
'atr': None,
|
||
'maxProfit': 0.0,
|
||
'trailingStopActivated': False
|
||
}
|
||
self.active_positions[symbol] = position_info
|
||
logger.info(f"{symbol} 已创建临时持仓记录用于监控")
|
||
except Exception as e:
|
||
logger.error(f"{symbol} 创建临时持仓记录失败: {e}")
|
||
|
||
await self._start_position_monitoring(symbol)
|
||
|
||
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
|
||
|
||
async def stop_all_position_monitoring(self):
|
||
"""
|
||
停止所有持仓的WebSocket监控
|
||
"""
|
||
symbols = list(self._monitor_tasks.keys())
|
||
for symbol in symbols:
|
||
await self._stop_position_monitoring(symbol)
|
||
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
|
||
|
||
async def _start_position_monitoring(self, symbol: str):
|
||
"""
|
||
启动单个持仓的WebSocket价格监控
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
if symbol in self._monitor_tasks:
|
||
logger.debug(f"{symbol} 监控任务已存在,跳过")
|
||
return
|
||
|
||
# WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager
|
||
if not self.client:
|
||
logger.warning(f"{symbol} 客户端未初始化,无法启动监控")
|
||
return
|
||
|
||
try:
|
||
task = asyncio.create_task(self._monitor_position_price(symbol))
|
||
self._monitor_tasks[symbol] = task
|
||
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
|
||
except Exception as e:
|
||
logger.error(f"启动 {symbol} 监控失败: {e}")
|
||
|
||
async def _stop_position_monitoring(self, symbol: str):
|
||
"""
|
||
停止单个持仓的WebSocket监控
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
# 幂等:可能会被多处/并发调用,先 pop 再处理,避免 KeyError
|
||
task = self._monitor_tasks.pop(symbol, None)
|
||
if task is None:
|
||
return
|
||
|
||
if not task.done():
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.debug(f"已停止 {symbol} 的WebSocket监控")
|
||
|
||
async def _monitor_position_price(self, symbol: str):
|
||
"""
|
||
监控单个持仓的价格(WebSocket实时推送)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
retry_count = 0
|
||
max_retries = 5
|
||
|
||
while retry_count < max_retries:
|
||
try:
|
||
if symbol not in self.active_positions:
|
||
logger.info(f"{symbol} 持仓已不存在,停止监控")
|
||
break
|
||
|
||
# 使用WebSocket订阅价格流
|
||
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
|
||
# 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
|
||
# 端点:wss://fstream.binance.com/ws/<symbol>@ticker
|
||
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(ws_url) as ws:
|
||
logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新")
|
||
retry_count = 0 # 连接成功,重置重试计数
|
||
|
||
async for msg in ws:
|
||
if symbol not in self.active_positions:
|
||
logger.info(f"{symbol} 持仓已不存在,停止监控")
|
||
break
|
||
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
try:
|
||
# 解析 JSON 消息
|
||
data = json.loads(msg.data)
|
||
|
||
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
|
||
# 根据文档,ticker 流包含 'c' 字段(最后价格)
|
||
if isinstance(data, dict):
|
||
if 'c' in data: # 'c' 是当前价格
|
||
current_price = float(data['c'])
|
||
# 立即检查止损止盈
|
||
await self._check_single_position(symbol, current_price)
|
||
elif 'data' in data:
|
||
# 兼容组合流格式(如果使用 /stream 端点)
|
||
if isinstance(data['data'], dict) and 'c' in data['data']:
|
||
current_price = float(data['data']['c'])
|
||
await self._check_single_position(symbol, current_price)
|
||
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
|
||
logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}")
|
||
continue
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning(f"{symbol} WebSocket错误: {ws.exception()}")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||
logger.info(f"{symbol} WebSocket连接关闭")
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info(f"{symbol} 监控任务已取消")
|
||
break
|
||
except Exception as e:
|
||
retry_count += 1
|
||
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
|
||
|
||
if retry_count < max_retries:
|
||
# 指数退避重试
|
||
wait_time = min(2 ** retry_count, 30)
|
||
logger.info(f"{symbol} {wait_time}秒后重试连接...")
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
logger.error(f"{symbol} WebSocket监控失败,已达到最大重试次数")
|
||
# 回退到定时检查模式
|
||
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
|
||
break
|
||
|
||
# 清理任务
|
||
if symbol in self._monitor_tasks:
|
||
del self._monitor_tasks[symbol]
|
||
|
||
async def _check_single_position(self, symbol: str, current_price: float):
|
||
"""
|
||
检查单个持仓的止损止盈(实时检查)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
current_price: 当前价格
|
||
"""
|
||
position_info = self.active_positions.get(symbol)
|
||
if not position_info:
|
||
return
|
||
|
||
# 确保所有值都是float类型
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity = float(position_info['quantity'])
|
||
current_price_float = float(current_price)
|
||
|
||
# 计算当前盈亏(基于保证金)
|
||
leverage = position_info.get('leverage', 10)
|
||
position_value = entry_price * quantity
|
||
margin = position_value / leverage if leverage > 0 else position_value
|
||
|
||
# 计算盈亏金额
|
||
if position_info['side'] == 'BUY':
|
||
pnl_amount = (current_price_float - entry_price) * quantity
|
||
else: # SELL
|
||
pnl_amount = (entry_price - current_price_float) * quantity
|
||
|
||
# 计算盈亏百分比(相对于保证金,与币安一致)
|
||
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 也计算价格百分比(用于显示)
|
||
if position_info['side'] == 'BUY':
|
||
pnl_percent_price = ((current_price_float - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl_percent_price = ((entry_price - current_price_float) / entry_price) * 100
|
||
|
||
# 更新最大盈利(基于保证金)
|
||
if pnl_percent_margin > position_info.get('maxProfit', 0):
|
||
position_info['maxProfit'] = pnl_percent_margin
|
||
|
||
# 移动止损逻辑(盈利后保护利润,基于保证金)
|
||
# 每次检查时从Redis重新加载配置,确保配置修改能即时生效
|
||
try:
|
||
if config._config_manager:
|
||
config._config_manager.reload_from_redis()
|
||
config.TRADING_CONFIG = config._get_trading_config()
|
||
except Exception as e:
|
||
logger.debug(f"从Redis重新加载配置失败: {e}")
|
||
|
||
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
|
||
if use_trailing:
|
||
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
|
||
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
|
||
|
||
if not position_info.get('trailingStopActivated', False):
|
||
# 盈利超过阈值后(相对于保证金),激活移动止损
|
||
if pnl_percent_margin > trailing_activation * 100:
|
||
position_info['trailingStopActivated'] = True
|
||
# 将止损移至成本价(保本)
|
||
position_info['stopLoss'] = entry_price
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
|
||
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
|
||
)
|
||
else:
|
||
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
|
||
# 计算需要保护的利润金额
|
||
protect_amount = margin * trailing_protect
|
||
# 计算对应的止损价
|
||
if position_info['side'] == 'BUY':
|
||
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
|
||
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
|
||
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
|
||
if new_stop_loss > position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
|
||
)
|
||
else: # SELL
|
||
# 做空:止损价 = 开仓价 - (当前盈亏 - 保护金额) / 数量
|
||
new_stop_loss = entry_price - (pnl_amount - protect_amount) / quantity
|
||
if new_stop_loss < position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
|
||
)
|
||
|
||
# 检查止损(基于保证金收益比)
|
||
stop_loss = position_info.get('stopLoss')
|
||
should_close = False
|
||
exit_reason = None
|
||
|
||
if stop_loss is not None:
|
||
# 计算止损对应的保证金百分比目标
|
||
# 止损金额 = (开仓价 - 止损价) × 数量 或 (止损价 - 开仓价) × 数量
|
||
if position_info['side'] == 'BUY':
|
||
stop_loss_amount = (entry_price - stop_loss) * quantity
|
||
else: # SELL
|
||
stop_loss_amount = (stop_loss - entry_price) * quantity
|
||
|
||
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 每5%亏损记录一次诊断日志(帮助排查问题)
|
||
if pnl_percent_margin <= -5.0:
|
||
should_log = (int(abs(pnl_percent_margin)) % 5 == 0) or (pnl_percent_margin <= -10.0 and pnl_percent_margin > -10.5)
|
||
if should_log:
|
||
trigger_condition = pnl_percent_margin <= -stop_loss_pct_margin
|
||
logger.warning(
|
||
f"{symbol} [实时监控] 诊断: 亏损{pnl_percent_margin:.2f}% of margin | "
|
||
f"当前价: {current_price_float:.4f} | "
|
||
f"入场价: {entry_price:.4f} | "
|
||
f"止损价: {stop_loss:.4f} (目标: -{stop_loss_pct_margin:.2f}% of margin) | "
|
||
f"方向: {position_info['side']} | "
|
||
f"是否触发: {trigger_condition} | "
|
||
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
|
||
)
|
||
|
||
# 直接比较当前盈亏百分比与止损目标(基于保证金)
|
||
if pnl_percent_margin <= -stop_loss_pct_margin:
|
||
should_close = True
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
logger.warning(
|
||
f"{symbol} [实时监控] ⚠⚠⚠ 触发止损(基于保证金): "
|
||
f"当前盈亏={pnl_percent_margin:.2f}% of margin <= 止损目标=-{stop_loss_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price_float:.4f}, 止损价={stop_loss:.4f} | "
|
||
f"保证金={margin:.4f} USDT, 亏损金额={pnl_amount:.4f} USDT"
|
||
)
|
||
|
||
# 检查止盈(基于保证金收益比)
|
||
if not should_close:
|
||
take_profit = position_info.get('takeProfit')
|
||
if take_profit is not None:
|
||
# 计算止盈对应的保证金百分比目标
|
||
# 止盈金额 = (止盈价 - 开仓价) × 数量 或 (开仓价 - 止盈价) × 数量
|
||
if position_info['side'] == 'BUY':
|
||
take_profit_amount = (take_profit - entry_price) * quantity
|
||
else: # SELL
|
||
take_profit_amount = (entry_price - take_profit) * quantity
|
||
|
||
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
|
||
|
||
# 每5%盈利记录一次诊断日志(帮助排查问题)
|
||
if pnl_percent_margin >= 5.0:
|
||
should_log = (int(pnl_percent_margin) % 5 == 0) or (pnl_percent_margin >= 10.0 and pnl_percent_margin < 10.5)
|
||
if should_log:
|
||
trigger_condition = pnl_percent_margin >= take_profit_pct_margin
|
||
logger.warning(
|
||
f"{symbol} [实时监控] 诊断: 盈利{pnl_percent_margin:.2f}% of margin | "
|
||
f"当前价: {current_price_float:.4f} | "
|
||
f"入场价: {entry_price:.4f} | "
|
||
f"止盈价: {take_profit:.4f} (目标: {take_profit_pct_margin:.2f}% of margin) | "
|
||
f"方向: {position_info['side']} | "
|
||
f"是否触发: {trigger_condition} | "
|
||
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
|
||
)
|
||
|
||
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
|
||
if pnl_percent_margin >= take_profit_pct_margin:
|
||
should_close = True
|
||
exit_reason = 'take_profit'
|
||
logger.info(
|
||
f"{symbol} [实时监控] 触发止盈(基于保证金): "
|
||
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 止盈目标={take_profit_pct_margin:.2f}% of margin | "
|
||
f"当前价={current_price_float:.4f}, 止盈价={take_profit:.4f}"
|
||
)
|
||
|
||
# 如果触发止损止盈,执行平仓
|
||
if should_close:
|
||
logger.info(
|
||
f"{symbol} [自动平仓] 开始执行平仓操作 | "
|
||
f"原因: {exit_reason} | "
|
||
f"入场价: {entry_price:.4f} | "
|
||
f"当前价: {current_price_float:.4f} | "
|
||
f"盈亏: {pnl_percent_margin:.2f}% of margin ({pnl_amount:.4f} USDT) | "
|
||
f"数量: {quantity:.4f}"
|
||
)
|
||
|
||
# 执行平仓(让 close_position 统一处理数据库更新,避免重复更新和状态不一致)
|
||
logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...")
|
||
success = await self.close_position(symbol, reason=exit_reason)
|
||
if success:
|
||
logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成")
|
||
# 平仓成功后,立即触发一次状态同步,确保数据库状态与币安一致
|
||
try:
|
||
await asyncio.sleep(2) # 等待2秒让币安订单完全成交
|
||
await self.sync_positions_with_binance()
|
||
logger.debug(f"{symbol} [自动平仓] 已触发状态同步")
|
||
except Exception as sync_error:
|
||
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
|
||
else:
|
||
logger.error(f"{symbol} [自动平仓] ❌ 平仓失败")
|
||
# 即使平仓失败,也尝试同步状态(可能币安已经平仓了)
|
||
try:
|
||
await self.sync_positions_with_binance()
|
||
except Exception as sync_error:
|
||
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
|
||
|
||
async def diagnose_position(self, symbol: str):
|
||
"""
|
||
诊断持仓状态(用于排查为什么没有自动平仓)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
try:
|
||
logger.info(f"{symbol} [诊断] 开始诊断持仓状态...")
|
||
|
||
# 1. 检查是否在active_positions中
|
||
if symbol not in self.active_positions:
|
||
logger.warning(f"{symbol} [诊断] ❌ 不在本地持仓记录中 (active_positions)")
|
||
logger.warning(f" 可能原因: 手动开仓或系统重启后未同步")
|
||
logger.warning(f" 解决方案: 等待下次状态同步或手动触发同步")
|
||
else:
|
||
position_info = self.active_positions[symbol]
|
||
logger.info(f"{symbol} [诊断] ✓ 在本地持仓记录中")
|
||
logger.info(f" 入场价: {position_info['entryPrice']:.4f}")
|
||
logger.info(f" 方向: {position_info['side']}")
|
||
logger.info(f" 数量: {position_info['quantity']:.4f}")
|
||
logger.info(f" 止损价: {position_info['stopLoss']:.4f}")
|
||
logger.info(f" 止盈价: {position_info['takeProfit']:.4f}")
|
||
|
||
# 2. 检查WebSocket监控状态
|
||
if symbol in self._monitor_tasks:
|
||
task = self._monitor_tasks[symbol]
|
||
if task.done():
|
||
logger.warning(f"{symbol} [诊断] ⚠ WebSocket监控任务已结束")
|
||
try:
|
||
await task # 获取异常信息
|
||
except Exception as e:
|
||
logger.warning(f" 任务异常: {e}")
|
||
else:
|
||
logger.info(f"{symbol} [诊断] ✓ WebSocket监控任务运行中")
|
||
else:
|
||
logger.warning(f"{symbol} [诊断] ❌ 没有WebSocket监控任务")
|
||
logger.warning(f" 可能原因: 监控未启动或已停止")
|
||
|
||
# 3. 获取币安实际持仓
|
||
positions = await self.client.get_open_positions()
|
||
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
|
||
|
||
if not binance_position:
|
||
logger.warning(f"{symbol} [诊断] ❌ 币安账户中没有持仓")
|
||
return
|
||
|
||
logger.info(f"{symbol} [诊断] ✓ 币安账户中有持仓")
|
||
entry_price_binance = binance_position.get('entryPrice', 0)
|
||
mark_price = binance_position.get('markPrice', 0)
|
||
unrealized_pnl = binance_position.get('unRealizedProfit', 0)
|
||
|
||
logger.info(f" 币安入场价: {entry_price_binance:.4f}")
|
||
logger.info(f" 标记价格: {mark_price:.4f}")
|
||
logger.info(f" 未实现盈亏: {unrealized_pnl:.2f} USDT")
|
||
|
||
# 4. 计算实际盈亏
|
||
if symbol in self.active_positions:
|
||
position_info = self.active_positions[symbol]
|
||
entry_price = float(position_info['entryPrice'])
|
||
take_profit = float(position_info['takeProfit'])
|
||
|
||
if position_info['side'] == 'BUY':
|
||
pnl_percent = ((mark_price - entry_price) / entry_price) * 100
|
||
take_profit_pct = ((take_profit - entry_price) / entry_price) * 100
|
||
should_trigger = mark_price >= take_profit
|
||
else: # SELL
|
||
pnl_percent = ((entry_price - mark_price) / entry_price) * 100
|
||
take_profit_pct = ((entry_price - take_profit) / entry_price) * 100
|
||
should_trigger = mark_price <= take_profit
|
||
|
||
logger.info(f"{symbol} [诊断] 盈亏分析:")
|
||
logger.info(f" 当前盈亏: {pnl_percent:.2f}%")
|
||
logger.info(f" 止盈目标: {take_profit_pct:.2f}%")
|
||
logger.info(f" 当前价: {mark_price:.4f}")
|
||
logger.info(f" 止盈价: {take_profit:.4f}")
|
||
logger.info(f" 价格差: {abs(mark_price - take_profit):.4f}")
|
||
logger.info(f" 应该触发: {should_trigger}")
|
||
|
||
if pnl_percent > take_profit_pct and not should_trigger:
|
||
logger.error(f"{symbol} [诊断] ❌ 异常: 盈亏{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!")
|
||
logger.error(f" 可能原因: 浮点数精度问题或止盈价格计算错误")
|
||
|
||
logger.info(f"{symbol} [诊断] 诊断完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [诊断] 诊断失败: {e}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}") |