auto_trade_sys/trading_system/position_manager.py
薇薇安 4023f7807e a
2026-01-19 20:30:57 +08:00

2450 lines
137 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓位管理模块 - 管理持仓和订单
"""
import asyncio
import logging
import json
import aiohttp
import time
from typing import Dict, List, Optional
from datetime import datetime
try:
from .binance_client import BinanceClient
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型(如果可用)
DB_AVAILABLE = False
Trade = None
get_beijing_time = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade, get_beijing_time
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
# 如果没有导入get_beijing_time创建一个本地版本
if get_beijing_time is None:
from datetime import datetime, timezone, timedelta
BEIJING_TZ = timezone(timedelta(hours=8))
def get_beijing_time():
"""获取当前北京时间UTC+8"""
return datetime.now(BEIJING_TZ).replace(tzinfo=None)
class PositionManager:
"""仓位管理类"""
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
"""
初始化仓位管理器
Args:
client: 币安客户端
risk_manager: 风险管理器
"""
self.client = client
self.risk_manager = risk_manager
self.active_positions: Dict[str, Dict] = {}
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
self._monitoring_enabled = True # 是否启用实时监控
self._pending_entry_orders: Dict[str, Dict] = {} # 未成交的入场订单(避免重复挂单)
self._last_entry_attempt_ms: Dict[str, int] = {} # 每个symbol的最近一次尝试冷却/去抖)
@staticmethod
def _pct_like_to_ratio(v: float) -> float:
"""
将“看起来像百分比”的值转换为比例0~1
兼容两种来源:
- 前端/后端按“比例”存储0.006 表示 0.6%
- 历史/默认值按“百分比数值”存储0.6 表示 0.6%
经验规则:
- v > 1: 认为是 60/100 这种百分数除以100
- 0.05 < v <= 1: 也更可能是“0.6% 这种写法”除以100
- v <= 0.05: 更可能已经是比例(<=5%
"""
try:
x = float(v or 0.0)
except Exception:
x = 0.0
if x <= 0:
return 0.0
if x > 1.0:
return x / 100.0
if x > 0.05:
return x / 100.0
return x
def _calc_limit_entry_price(self, current_price: float, side: str, offset_ratio: float) -> float:
"""根据当前价与偏移比例计算限价入场价BUY: 下方回调SELL: 上方回调)"""
try:
cp = float(current_price)
except Exception:
cp = 0.0
try:
off = float(offset_ratio or 0.0)
except Exception:
off = 0.0
if cp <= 0:
return 0.0
if (side or "").upper() == "BUY":
return cp * (1 - off)
return cp * (1 + off)
async def _wait_for_order_filled(
self,
symbol: str,
order_id: int,
timeout_sec: int = 30,
poll_sec: float = 1.0,
) -> Dict:
"""
等待订单成交FILLED返回
{ ok, status, avg_price, executed_qty, raw }
"""
deadline = time.time() + max(1, int(timeout_sec or 1))
last_status = None
while time.time() < deadline:
try:
info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
status = info.get("status")
last_status = status
if status == "FILLED":
avg_price = float(info.get("avgPrice", 0) or 0) or float(info.get("price", 0) or 0)
executed_qty = float(info.get("executedQty", 0) or 0)
return {"ok": True, "status": status, "avg_price": avg_price, "executed_qty": executed_qty, "raw": info}
if status in ("CANCELED", "REJECTED", "EXPIRED"):
return {"ok": False, "status": status, "avg_price": 0, "executed_qty": float(info.get("executedQty", 0) or 0), "raw": info}
except Exception:
# 忽略单次失败,继续轮询
pass
await asyncio.sleep(max(0.2, float(poll_sec or 1.0)))
return {"ok": False, "status": last_status or "TIMEOUT", "avg_price": 0, "executed_qty": 0, "raw": None}
async def open_position(
self,
symbol: str,
change_percent: float,
leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
signal_strength: Optional[int] = None,
market_regime: Optional[str] = None,
trend_4h: Optional[str] = None,
atr: Optional[float] = None,
klines: Optional[List] = None,
bollinger: Optional[Dict] = None
) -> Optional[Dict]:
"""
开仓
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
leverage: 杠杆倍数
Returns:
订单信息失败返回None
"""
try:
# 0) 防止同一 symbol 重复挂入场单/快速重复尝试(去抖 + 冷却)
now_ms = int(time.time() * 1000)
cooldown_sec = int(config.TRADING_CONFIG.get("ENTRY_SYMBOL_COOLDOWN_SEC", 120) or 0)
last_ms = self._last_entry_attempt_ms.get(symbol)
if last_ms and cooldown_sec > 0 and now_ms - last_ms < cooldown_sec * 1000:
logger.info(f"{symbol} [入场] 冷却中({cooldown_sec}s跳过本次自动开仓")
return None
pending = self._pending_entry_orders.get(symbol)
if pending and pending.get("order_id"):
logger.info(f"{symbol} [入场] 已有未完成入场订单(orderId={pending.get('order_id')}),跳过重复开仓")
return None
# 标记本次尝试(无论最终是否成交,都避免短时间内反复开仓/挂单)
self._last_entry_attempt_ms[symbol] = now_ms
# 判断是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
return None
# 设置杠杆
await self.client.set_leverage(symbol, leverage)
# 计算仓位大小(传入实际使用的杠杆)
logger.info(f"开始为 {symbol} 计算仓位大小...")
quantity = await self.risk_manager.calculate_position_size(
symbol, change_percent, leverage=leverage
)
if quantity is None:
logger.warning(f"{symbol} 仓位计算失败,跳过交易")
logger.warning(f" 可能原因:")
logger.warning(f" 1. 账户余额不足")
logger.warning(f" 2. 单笔仓位超过限制")
logger.warning(f" 3. 总仓位超过限制")
logger.warning(f" 4. 无法获取价格数据")
logger.warning(f" 5. 保证金不足最小要求MIN_MARGIN_USDT")
logger.warning(f" 6. 名义价值小于0.2 USDT避免无意义的小单子")
return None
logger.info(f"{symbol} 仓位计算成功: {quantity:.4f}")
# 确定交易方向(优先使用技术指标信号)
if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
entry_price = ticker['price']
# 获取K线数据用于动态止损计算从symbol_info中获取如果可用
klines = None
bollinger = None
if 'klines' in locals() or 'symbol_info' in locals():
# 尝试从外部传入的symbol_info获取
pass
# 计算基于支撑/阻力的动态止损
# 优先使用技术结构(支撑/阻力位、布林带)
# 如果无法获取K线数据回退到ATR或固定止损
if not klines:
# 如果没有传入K线数据尝试获取
try:
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
klines_data = await self.client.get_klines(
symbol=symbol,
interval=primary_interval,
limit=20 # 获取最近20根K线用于计算支撑/阻力
)
klines = klines_data if len(klines_data) >= 10 else None
except Exception as e:
logger.debug(f"获取K线数据失败使用固定止损: {e}")
klines = None
# 在开仓前从Redis重新加载配置确保使用最新配置包括ATR参数
# 从Redis读取最新配置轻量级即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
logger.debug(f"{symbol} 开仓前已从Redis重新加载配置")
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
# ===== 智能入场方案C趋势强更少错过震荡更保守=====
smart_entry_enabled = bool(config.TRADING_CONFIG.get("SMART_ENTRY_ENABLED", True))
# LIMIT_ORDER_OFFSET_PCT兼容“比例/百分比”两种存储方式
limit_offset_ratio = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("LIMIT_ORDER_OFFSET_PCT", 0.5) or 0.0))
# 规则:趋势(trending) 或 4H共振明显 + 强信号 -> 允许“超时后市价兜底(有追价上限)”
mr = (market_regime or "").strip().lower() if market_regime else ""
t4 = (trend_4h or "").strip().lower() if trend_4h else ""
ss = int(signal_strength) if signal_strength is not None else 0
allow_market_fallback = False
if mr == "trending":
allow_market_fallback = True
if ss >= int(config.TRADING_CONFIG.get("SMART_ENTRY_STRONG_SIGNAL", 8) or 8) and t4 in ("up", "down"):
allow_market_fallback = True
# 追价上限:超过就不追,宁愿错过(避免回到“无脑追价高频打损”)
drift_ratio_trending = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_TRENDING", 0.6) or 0.6))
drift_ratio_ranging = self._pct_like_to_ratio(float(config.TRADING_CONFIG.get("ENTRY_MAX_DRIFT_PCT_RANGING", 0.3) or 0.3))
max_drift_ratio = drift_ratio_trending if allow_market_fallback else drift_ratio_ranging
# 总等待/追价参数
timeout_sec = int(config.TRADING_CONFIG.get("ENTRY_TIMEOUT_SEC", 180) or 180)
step_wait_sec = int(config.TRADING_CONFIG.get("ENTRY_STEP_WAIT_SEC", 15) or 15)
chase_steps = int(config.TRADING_CONFIG.get("ENTRY_CHASE_MAX_STEPS", 4) or 4)
market_fallback_after_sec = int(config.TRADING_CONFIG.get("ENTRY_MARKET_FALLBACK_AFTER_SEC", 45) or 45)
# 初始限价(回调入场)
current_px = float(entry_price)
initial_limit = self._calc_limit_entry_price(current_px, side, limit_offset_ratio)
if initial_limit <= 0:
return None
order = None
entry_order_id = None
order_status = None
actual_entry_price = None
filled_quantity = 0.0
entry_mode_used = "limit-only" if not smart_entry_enabled else ("limit+fallback" if allow_market_fallback else "limit-chase")
if not smart_entry_enabled:
# 根治方案:关闭智能入场后,回归“纯限价单模式”
# - 不追价
# - 不市价兜底
# - 未在确认时间内成交则撤单并跳过(属于策略未触发入场,不是系统错误)
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
logger.info(
f"{symbol} [纯限价入场] side={side} | 限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
f"确认超时={confirm_timeout}s未成交将撤单跳过"
)
order = await self.client.place_order(
symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit
)
if not order:
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
else:
# 1) 先挂限价单
logger.info(
f"{symbol} [智能入场] 模式={entry_mode_used} | side={side} | "
f"marketRegime={market_regime} trend_4h={trend_4h} strength={ss}/10 | "
f"初始限价={initial_limit:.6f} (offset={limit_offset_ratio*100:.2f}%) | "
f"追价上限={max_drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=initial_limit)
if not order:
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
start_ts = time.time()
# 2) 分步等待 + 追价(逐步减少 offset并在趋势强时允许市价兜底有追价上限
for step in range(max(1, chase_steps)):
# 先等待一段时间看是否成交
wait_res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=step_wait_sec, poll_sec=1.0)
order_status = wait_res.get("status")
if wait_res.get("ok"):
actual_entry_price = float(wait_res.get("avg_price") or 0)
filled_quantity = float(wait_res.get("executed_qty") or 0)
break
# 未成交:如果超时太久且允许市价兜底,检查追价上限后转市价
elapsed = time.time() - start_ts
ticker2 = await self.client.get_ticker_24h(symbol)
cur2 = float(ticker2.get("price")) if ticker2 else current_px
drift_ratio = 0.0
try:
base = float(initial_limit) if float(initial_limit) > 0 else cur2
drift_ratio = abs((cur2 - base) / base)
except Exception:
drift_ratio = 0.0
if allow_market_fallback and elapsed >= market_fallback_after_sec:
if drift_ratio <= max_drift_ratio:
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
logger.info(f"{symbol} [智能入场] 限价超时,且偏离{drift_ratio*100:.2f}%≤{max_drift_ratio*100:.2f}%,转市价兜底")
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="MARKET")
# 关键:转市价后必须更新 entry_order_id否则后续会继续查询“已取消的旧限价单”导致误判 CANCELED
try:
entry_order_id = order.get("orderId") if isinstance(order, dict) else None
except Exception:
entry_order_id = None
break
else:
logger.info(f"{symbol} [智能入场] 限价超时,但偏离{drift_ratio*100:.2f}%>{max_drift_ratio*100:.2f}%,取消并放弃本次交易")
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
# 震荡/不允许市价兜底:尝试追价(减小 offset -> 更靠近当前价),但不突破追价上限
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
# offset 逐步减少到 0越追越接近当前价
step_ratio = (step + 1) / max(1, chase_steps)
cur_offset_ratio = max(0.0, limit_offset_ratio * (1.0 - step_ratio))
desired = self._calc_limit_entry_price(cur2, side, cur_offset_ratio)
if side == "BUY":
cap = initial_limit * (1 + max_drift_ratio)
desired = min(desired, cap)
else:
cap = initial_limit * (1 - max_drift_ratio)
desired = max(desired, cap)
if desired <= 0:
self._pending_entry_orders.pop(symbol, None)
return None
logger.info(
f"{symbol} [智能入场] 追价 step={step+1}/{chase_steps} | 当前价={cur2:.6f} | "
f"offset={cur_offset_ratio*100:.3f}% -> 限价={desired:.6f} | 偏离={drift_ratio*100:.2f}%"
)
order = await self.client.place_order(symbol=symbol, side=side, quantity=quantity, order_type="LIMIT", price=desired)
if not order:
self._pending_entry_orders.pop(symbol, None)
return None
entry_order_id = order.get("orderId")
if entry_order_id:
self._pending_entry_orders[symbol] = {"order_id": entry_order_id, "created_at_ms": int(time.time() * 1000)}
# 如果是市价兜底或最终限价成交,这里统一继续后续流程(下面会再查实际成交)
# ===== 统一处理:确认订单成交并获取实际成交价/数量 =====
if order:
if not entry_order_id:
entry_order_id = order.get("orderId")
if entry_order_id:
logger.info(f"{symbol} [开仓] 币安订单号: {entry_order_id}")
# 等待订单成交,检查订单状态并获取实际成交价格
# 只有在订单真正成交FILLED后才保存到数据库
if entry_order_id:
# 智能入场的限价订单可能需要更长等待,这里给一个总等待兜底(默认 30s
confirm_timeout = int(config.TRADING_CONFIG.get("ENTRY_CONFIRM_TIMEOUT_SEC", 30) or 30)
res = await self._wait_for_order_filled(symbol, int(entry_order_id), timeout_sec=confirm_timeout, poll_sec=1.0)
order_status = res.get("status")
if res.get("ok"):
actual_entry_price = float(res.get("avg_price") or 0)
filled_quantity = float(res.get("executed_qty") or 0)
else:
# 未成交NEW/超时/CANCELED 等)属于“策略未触发入场”或“挂单没成交”
# 这不应当当作系统错误同时需要撤单best-effort避免留下悬挂委托造成后续混乱。
logger.warning(f"{symbol} [开仓] 未成交,状态: {order_status},跳过本次开仓并撤销挂单")
try:
await self.client.cancel_order(symbol, int(entry_order_id))
except Exception:
pass
self._pending_entry_orders.pop(symbol, None)
return None
if not actual_entry_price or actual_entry_price <= 0:
logger.error(f"{symbol} [开仓] ❌ 无法获取实际成交价格,不保存到数据库")
self._pending_entry_orders.pop(symbol, None)
return None
if filled_quantity <= 0:
logger.error(f"{symbol} [开仓] ❌ 成交数量为0不保存到数据库")
self._pending_entry_orders.pop(symbol, None)
return None
# 使用实际成交价格和数量
original_entry_price = entry_price
entry_price = actual_entry_price
quantity = filled_quantity # 使用实际成交数量
logger.info(f"{symbol} [开仓] ✓ 使用实际成交价格: {entry_price:.4f} USDT (下单时价格: {original_entry_price:.4f}), 成交数量: {quantity:.4f}")
# 成交后清理 pending
self._pending_entry_orders.pop(symbol, None)
# ===== 成交后基于“实际成交价/数量”重新计算止损止盈(修复限价/滑点导致的偏差)=====
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.03)
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin,
klines=klines,
bollinger=bollinger,
atr=atr
)
stop_distance_for_tp = None
if side == 'BUY':
stop_distance_for_tp = entry_price - stop_loss_price
else:
stop_distance_for_tp = stop_loss_price - entry_price
if atr is not None and atr > 0 and entry_price > 0:
atr_percent = atr / entry_price
atr_multiplier = config.TRADING_CONFIG.get('ATR_STOP_LOSS_MULTIPLIER', 1.8)
stop_distance_for_tp = entry_price * atr_percent * atr_multiplier
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.30)
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = float(stop_loss_pct_margin or 0) * 3.0
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin,
atr=atr,
stop_distance=stop_distance_for_tp
)
# 分步止盈(基于“实际成交价 + 已计算的止损/止盈”)
if side == 'BUY':
take_profit_1 = entry_price + (entry_price - stop_loss_price) # 盈亏比1:1
else:
take_profit_1 = entry_price - (stop_loss_price - entry_price) # 盈亏比1:1
take_profit_2 = take_profit_price
# 交易规模:名义/保证金用于统计总交易量与UI展示
try:
notional_usdt = float(entry_price) * float(quantity)
except Exception:
notional_usdt = None
try:
margin_usdt = (float(notional_usdt) / float(leverage)) if notional_usdt is not None and float(leverage) > 0 else notional_usdt
except Exception:
margin_usdt = None
# 记录到数据库(只有在订单真正成交后才保存)
trade_id = None
if DB_AVAILABLE and Trade:
try:
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity, # 使用实际成交数量
entry_price=entry_price, # 使用实际成交价格
leverage=leverage,
entry_reason=entry_reason,
entry_order_id=entry_order_id, # 保存币安订单号
stop_loss_price=stop_loss_price,
take_profit_price=take_profit_price,
take_profit_1=take_profit_1,
take_profit_2=take_profit_2,
atr=atr,
notional_usdt=notional_usdt,
margin_usdt=margin_usdt,
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id}, 订单号: {entry_order_id}, 成交价: {entry_price:.4f}, 成交数量: {quantity:.4f})")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
return None
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法保存 {symbol} 交易记录")
# 记录持仓信息(包含动态止损止盈和分步止盈)
from datetime import datetime
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': change_percent,
'orderId': order.get('orderId'),
'tradeId': trade_id, # 数据库交易ID
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price, # 保留原始止盈价(第二目标)
'takeProfit1': take_profit_1, # 第一目标盈亏比1:1了结50%
'takeProfit2': take_profit_2, # 第二目标原始止盈价剩余50%
'partialProfitTaken': False, # 是否已部分止盈
'remainingQuantity': quantity, # 剩余仓位数量
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'entryTime': get_beijing_time(), # 记录入场时间(使用北京时间,用于计算持仓持续时间)
'strategyType': 'trend_following', # 策略类型(简化后只有趋势跟踪)
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
}
self.active_positions[symbol] = position_info
logger.info(
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
f"(涨跌幅: {change_percent:.2f}%)"
)
# 验证持仓是否真的在币安存在
try:
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
positions = await self.client.get_open_positions()
binance_position = next(
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
None
)
if binance_position:
logger.info(
f"{symbol} [开仓验证] ✓ 币安持仓确认: "
f"数量={float(binance_position.get('positionAmt', 0)):.4f}, "
f"入场价={float(binance_position.get('entryPrice', 0)):.4f}"
)
else:
logger.warning(
f"{symbol} [开仓验证] ⚠️ 币安账户中没有持仓,可能订单未成交或被立即平仓"
)
# 清理本地记录
if symbol in self.active_positions:
del self.active_positions[symbol]
# 如果数据库已保存,标记为取消
if trade_id and DB_AVAILABLE and Trade:
try:
from database.connection import db
db.execute_update(
"UPDATE trades SET status = 'cancelled' WHERE id = %s",
(trade_id,)
)
logger.info(f"{symbol} [开仓验证] 已更新数据库状态为 cancelled (ID: {trade_id})")
except Exception as e:
logger.warning(f"{symbol} [开仓验证] 更新数据库状态失败: {e}")
return None
except Exception as verify_error:
logger.warning(f"{symbol} [开仓验证] 验证持仓时出错: {verify_error},继续使用本地记录")
# 启动WebSocket实时监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
return position_info
return None
except Exception as e:
logger.error(f"开仓失败 {symbol}: {e}")
return None
async def close_position(self, symbol: str, reason: str = 'manual') -> bool:
"""
平仓
Args:
symbol: 交易对
reason: 平仓原因manual, stop_loss, take_profit, trailing_stop, sync
Returns:
是否成功
"""
try:
logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})")
# 获取当前持仓
positions = await self.client.get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if not position:
logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓")
# 即使币安没有持仓,也要更新数据库状态
updated = False
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...")
# 获取当前价格作为平仓价格
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice'])
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
if position_info['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
# 同步平仓时没有订单号设为None
# 计算持仓持续时间和策略类型
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None, # 同步平仓时没有订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
updated = True
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}")
# 清理本地记录
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
# 如果更新了数据库,返回成功;否则返回失败
return updated
# 确定平仓方向(与开仓相反)
position_amt = position['positionAmt']
side = 'SELL' if position_amt > 0 else 'BUY'
quantity = abs(position_amt)
position_side = 'LONG' if position_amt > 0 else 'SHORT'
# 二次校验:用币安实时持仓数量兜底,避免 reduceOnly 被拒绝(-2022
live_amt = await self._get_live_position_amt(symbol, position_side=position_side)
if live_amt is None or abs(live_amt) <= 0:
logger.warning(f"{symbol} [平仓] 实时查询到持仓已为0跳过下单并按已平仓处理")
# 复用“币安无持仓”的处理逻辑:走上面的分支
position = None
# 触发上方逻辑:直接返回 updated/清理
# 这里简单调用同步函数路径
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(position.get('entryPrice', 0) if position else 0)
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(f"{symbol} [平仓] 已清理本地记录(币安无持仓)")
return True
# 以币安实时持仓数量为准(并向下截断到不超过持仓)
quantity = min(quantity, abs(live_amt))
quantity = await self._adjust_close_quantity(symbol, quantity)
if quantity <= 0:
logger.warning(f"{symbol} [平仓] 数量调整后为0跳过下单并清理本地记录")
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
return True
logger.info(
f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET "
f"(持仓数量: {position_amt:.4f})"
)
# 平仓(使用 reduceOnly=True 确保只减少持仓,不增加反向持仓)
try:
logger.debug(f"{symbol} [平仓] 调用 place_order: {side} {quantity:.4f} @ MARKET (reduceOnly=True)")
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET',
reduce_only=True, # 平仓时使用 reduceOnly=True
position_side=position_side, # 兼容对冲模式Hedge必须指定 LONG/SHORT
)
logger.debug(f"{symbol} [平仓] place_order 返回: {order}")
except Exception as order_error:
logger.error(f"{symbol} [平仓] ❌ 下单失败: {order_error}")
logger.error(f" 下单参数: symbol={symbol}, side={side}, quantity={quantity:.4f}, order_type=MARKET")
logger.error(f" 错误类型: {type(order_error).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
raise # 重新抛出异常,让外层捕获
if order:
order_id = order.get('orderId')
logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order_id})")
# 等待订单成交,然后从币安获取实际成交价格
exit_price = None
try:
# 等待一小段时间让订单成交
await asyncio.sleep(1)
# 从币安获取订单详情,获取实际成交价格
try:
order_info = await self.client.client.futures_get_order(symbol=symbol, orderId=order_id)
if order_info:
# 优先使用平均成交价格avgPrice如果没有则使用价格字段
exit_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0))
if exit_price > 0:
logger.info(f"{symbol} [平仓] 从币安订单获取实际成交价格: {exit_price:.4f} USDT")
else:
# 如果订单还没有完全成交,尝试从成交记录获取
if order_info.get('status') == 'FILLED' and order_info.get('fills'):
# 计算加权平均成交价格
total_qty = 0
total_value = 0
for fill in order_info.get('fills', []):
qty = float(fill.get('qty', 0))
price = float(fill.get('price', 0))
total_qty += qty
total_value += qty * price
if total_qty > 0:
exit_price = total_value / total_qty
logger.info(f"{symbol} [平仓] 从成交记录计算平均成交价格: {exit_price:.4f} USDT")
except Exception as order_error:
logger.warning(f"{symbol} [平仓] 获取订单详情失败: {order_error},使用备用方法")
# 如果无法从订单获取价格,使用当前价格作为备用
if not exit_price or exit_price <= 0:
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
exit_price = float(ticker['price'])
logger.warning(f"{symbol} [平仓] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
else:
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [平仓] 无法获取平仓价格,使用订单价格字段")
exit_price = float(order.get('price', 0))
except Exception as price_error:
logger.warning(f"{symbol} [平仓] 获取成交价格时出错: {price_error},使用当前价格")
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(order.get('price', 0))
# 更新数据库记录
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
# 计算盈亏确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity_float = float(quantity)
exit_price_float = float(exit_price)
if position_info['side'] == 'BUY':
pnl = (exit_price_float - entry_price) * quantity_float
pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price_float) * quantity_float
pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100
# 获取平仓订单号
exit_order_id = order.get('orderId')
if exit_order_id:
logger.info(f"{symbol} [平仓] 币安订单号: {exit_order_id}")
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型(从开仓原因或持仓信息中获取)
strategy_type = position_info.get('strategyType', 'trend_following') # 默认趋势跟踪
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price_float,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=exit_order_id, # 保存币安平仓订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
logger.info(
f"{symbol} [平仓] ✓ 数据库记录已更新 "
f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})"
)
except Exception as e:
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.warning(f"{symbol} 没有关联的数据库交易ID无法更新平仓记录")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法更新 {symbol} 平仓记录")
# 停止WebSocket监控
await self._stop_position_monitoring(symbol)
# 移除持仓记录
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(
f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} "
f"(原因: {reason})"
)
return True
else:
# place_order 返回 None可能是 -2022ReduceOnly rejected等竞态场景
# 兜底再查一次实时持仓如果已经为0则当作“已平仓”处理避免刷屏与误判失败
try:
live2 = await self._get_live_position_amt(symbol, position_side=position_side)
except Exception:
live2 = None
if live2 is None or abs(live2) <= 0:
logger.warning(f"{symbol} [平仓] 下单返回None但实时持仓已为0按已平仓处理可能竞态/手动平仓)")
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
return True
logger.error(f"{symbol} [平仓] ❌ 下单返回 None实时持仓仍存在: {live2}),可能的原因:")
logger.error(f" 1. ReduceOnly 被拒绝(-2022但持仓未同步")
logger.error(f" 2. 数量精度调整后为 0 或负数")
logger.error(f" 3. 无法获取价格信息")
logger.error(f" 4. 其他下单错误(已在 place_order 中记录)")
logger.error(f" 持仓信息: {side} {quantity:.4f} @ MARKET")
# 尝试获取更多诊断信息
try:
symbol_info = await self.client.get_symbol_info(symbol)
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
notional_value = quantity * current_price
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
logger.error(f" 当前价格: {current_price:.4f} USDT")
logger.error(f" 订单名义价值: {notional_value:.2f} USDT")
logger.error(f" 最小名义价值: {min_notional:.2f} USDT")
if notional_value < min_notional:
logger.error(f" ⚠ 订单名义价值不足,无法平仓")
except Exception as diag_error:
logger.warning(f" 无法获取诊断信息: {diag_error}")
return False
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
# 尝试清理本地记录(即使平仓失败)
try:
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(f"{symbol} [平仓] 已清理本地持仓记录")
except Exception as cleanup_error:
logger.warning(f"{symbol} [平仓] 清理本地记录时出错: {cleanup_error}")
return False
async def _get_live_position_amt(self, symbol: str, position_side: Optional[str] = None) -> Optional[float]:
"""
从币安原始接口读取持仓数量(避免本地状态/缓存不一致导致 reduceOnly 被拒绝)。
- 单向模式:通常只有一个 net 持仓
- 对冲模式:可能同时有 LONG/SHORT两条腿用 positionSide 区分
"""
try:
if not getattr(self.client, "client", None):
return None
res = await self.client.client.futures_position_information(symbol=symbol)
if not isinstance(res, list):
return None
ps = (position_side or "").upper()
nonzero = []
for p in res:
if not isinstance(p, dict):
continue
try:
amt = float(p.get("positionAmt", 0))
except Exception:
continue
if abs(amt) <= 0:
continue
nonzero.append((amt, p))
if not nonzero:
return 0.0
if ps in ("LONG", "SHORT"):
for amt, p in nonzero:
pps = (p.get("positionSide") or "").upper()
if pps == ps:
return amt
# 如果没匹配到 positionSide退化为按符号推断
if ps == "LONG":
cand = next((amt for amt, _ in nonzero if amt > 0), None)
return cand if cand is not None else 0.0
if ps == "SHORT":
cand = next((amt for amt, _ in nonzero if amt < 0), None)
return cand if cand is not None else 0.0
# 没提供 position_side返回净持仓单向模式
return sum([amt for amt, _ in nonzero])
except Exception as e:
logger.debug(f"{symbol} 读取实时持仓失败: {e}")
return None
async def _adjust_close_quantity(self, symbol: str, quantity: float) -> float:
"""
平仓数量调整:只允许向下取整(避免超过持仓导致 reduceOnly 被拒绝)。
"""
try:
symbol_info = await self.client.get_symbol_info(symbol)
except Exception:
symbol_info = None
try:
q = float(quantity)
except Exception:
return 0.0
if not symbol_info:
return max(0.0, q)
try:
step_size = float(symbol_info.get("stepSize", 0) or 0)
except Exception:
step_size = 0.0
qty_precision = int(symbol_info.get("quantityPrecision", 8) or 8)
if step_size and step_size > 0:
q = float(int(q / step_size)) * step_size
else:
q = round(q, qty_precision)
q = round(q, qty_precision)
return max(0.0, q)
async def check_stop_loss_take_profit(self) -> List[str]:
"""
检查止损止盈
Returns:
需要平仓的交易对列表
"""
closed_positions = []
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
if symbol not in position_dict:
# 持仓已不存在,移除记录
del self.active_positions[symbol]
continue
current_position = position_dict[symbol]
entry_price = position_info['entryPrice']
quantity = position_info['quantity'] # 修复获取quantity
# 获取当前标记价格
current_price = current_position.get('markPrice', 0)
if current_price == 0:
# 如果标记价格为0尝试从ticker获取
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
else:
current_price = entry_price
# 计算当前盈亏(基于保证金)
leverage = position_info.get('leverage', 10)
position_value = entry_price * quantity
margin = position_value / leverage if leverage > 0 else position_value
# 计算盈亏金额
if position_info['side'] == 'BUY':
pnl_amount = (current_price - entry_price) * quantity
else:
pnl_amount = (entry_price - current_price) * quantity
# 计算盈亏百分比(相对于保证金,与币安一致)
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
# 也计算价格百分比(用于显示和移动止损)
if position_info['side'] == 'BUY':
pnl_percent_price = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent_price = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利(基于保证金)
if pnl_percent_margin > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent_margin
# 移动止损逻辑(盈利后保护利润,基于保证金)
# 每次检查时从Redis重新加载配置确保配置修改能即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后(相对于保证金),激活移动止损
if pnl_percent_margin > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
)
else:
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
# 如果已经部分止盈,使用剩余仓位计算
if position_info.get('partialProfitTaken', False):
remaining_quantity = position_info.get('remainingQuantity', quantity)
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
protect_amount = remaining_margin * trailing_protect
# 计算剩余仓位的盈亏
if position_info['side'] == 'BUY':
remaining_pnl = (current_price - entry_price) * remaining_quantity
else:
remaining_pnl = (entry_price - current_price) * remaining_quantity
# 计算新的止损价(基于剩余仓位)
if position_info['side'] == 'BUY':
new_stop_loss = entry_price + (remaining_pnl - protect_amount) / remaining_quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
f"剩余数量: {remaining_quantity:.4f})"
)
else:
new_stop_loss = entry_price - (remaining_pnl - protect_amount) / remaining_quantity
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新(剩余仓位): {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of remaining margin = {protect_amount:.4f} USDT, "
f"剩余数量: {remaining_quantity:.4f})"
)
else:
# 未部分止盈,使用原始仓位计算
protect_amount = margin * trailing_protect
# 计算对应的止损价
if position_info['side'] == 'BUY':
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
else:
# 做空:止损价 = 开仓价 - (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price - (pnl_amount - protect_amount) / quantity
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
# 检查止损(使用更新后的止损价,基于保证金收益比)
stop_loss = position_info.get('stopLoss')
if stop_loss is None:
logger.warning(f"{symbol} 止损价未设置,跳过止损检查")
elif stop_loss is not None:
# 计算止损对应的保证金百分比目标
if position_info['side'] == 'BUY':
stop_loss_amount = (entry_price - stop_loss) * quantity
else: # SELL
stop_loss_amount = (stop_loss - entry_price) * quantity
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与止损目标(基于保证金)
if pnl_percent_margin <= -stop_loss_pct_margin:
logger.warning(
f"{symbol} 触发止损(基于保证金): "
f"当前盈亏={pnl_percent_margin:.2f}% of margin <= 止损目标=-{stop_loss_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 止损价={stop_loss:.4f}"
)
# 确定平仓原因
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_amount,
pnl_percent=pnl_percent_margin,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol, reason=exit_reason):
closed_positions.append(symbol)
continue
# 检查分步止盈(基于保证金收益比)
take_profit_1 = position_info.get('takeProfit1') # 第一目标盈亏比1:1
take_profit_2 = position_info.get('takeProfit2', position_info.get('takeProfit')) # 第二目标
partial_profit_taken = position_info.get('partialProfitTaken', False)
remaining_quantity = position_info.get('remainingQuantity', quantity)
# 第一目标盈亏比1:1了结50%仓位
if not partial_profit_taken and take_profit_1 is not None:
# 计算第一目标对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_1_amount = (take_profit_1 - entry_price) * quantity
else: # SELL
take_profit_1_amount = (entry_price - take_profit_1) * quantity
take_profit_1_pct_margin = (take_profit_1_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与第一目标(基于保证金)
if pnl_percent_margin >= take_profit_1_pct_margin:
logger.info(
f"{symbol} 触发第一目标止盈盈亏比1:1基于保证金: "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_1_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 目标价={take_profit_1:.4f}"
)
# 部分平仓50%
partial_quantity = quantity * 0.5
try:
# 部分平仓
close_side = 'SELL' if position_info['side'] == 'BUY' else 'BUY'
close_position_side = 'LONG' if position_info['side'] == 'BUY' else 'SHORT'
# 二次校验并截断数量,避免 reduceOnly 被拒绝(-2022
live_amt = await self._get_live_position_amt(symbol, position_side=close_position_side)
if live_amt is None or abs(live_amt) <= 0:
logger.warning(f"{symbol} 部分止盈实时持仓已为0跳过部分平仓")
continue
partial_quantity = min(partial_quantity, abs(live_amt))
partial_quantity = await self._adjust_close_quantity(symbol, partial_quantity)
if partial_quantity <= 0:
logger.warning(f"{symbol} 部分止盈数量调整后为0跳过")
continue
partial_order = await self.client.place_order(
symbol=symbol,
side=close_side,
quantity=partial_quantity,
order_type='MARKET',
reduce_only=True, # 部分止盈必须 reduceOnly避免反向开仓
position_side=close_position_side, # 兼容对冲模式:指定要减少的持仓方向
)
if partial_order:
position_info['partialProfitTaken'] = True
position_info['remainingQuantity'] = remaining_quantity - partial_quantity
logger.info(
f"{symbol} 部分止盈成功: 平仓{partial_quantity:.4f},剩余{position_info['remainingQuantity']:.4f}"
)
# 分步止盈后的“保本”处理:
# - 若启用 USE_TRAILING_STOP允许把剩余仓位止损移至成本价并进入移动止损阶段
# - 若关闭 USE_TRAILING_STOP严格不自动移动止损避免你说的“仍然保本/仍然移动止损”)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
position_info['stopLoss'] = entry_price
position_info['trailingStopActivated'] = True
logger.info(f"{symbol} 剩余仓位止损移至成本价(保本),配合移动止损博取更大利润")
else:
logger.info(f"{symbol} 已部分止盈,但已关闭移动止损:不自动将止损移至成本价")
else:
# 兜底:可能遇到 -2022reduceOnly rejected等竞态重新查一次持仓
try:
live2 = await self._get_live_position_amt(symbol, position_side=close_position_side)
except Exception:
live2 = None
if live2 is None or abs(live2) <= 0:
logger.warning(f"{symbol} 部分止盈下单返回None但实时持仓已为0跳过")
continue
logger.warning(f"{symbol} 部分止盈下单返回None实时持仓仍存在: {live2}),稍后将继续由止损/止盈逻辑处理")
except Exception as e:
logger.error(f"{symbol} 部分止盈失败: {e}")
# 第二目标:原始止盈价,平掉剩余仓位(基于保证金收益比)
if partial_profit_taken and take_profit_2 is not None:
# 计算第二目标对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_2_amount = (take_profit_2 - entry_price) * remaining_quantity
else: # SELL
take_profit_2_amount = (entry_price - take_profit_2) * remaining_quantity
# 使用剩余仓位的保证金
remaining_margin = (entry_price * remaining_quantity) / leverage if leverage > 0 else (entry_price * remaining_quantity)
take_profit_2_pct_margin = (take_profit_2_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 计算剩余仓位的当前盈亏
if position_info['side'] == 'BUY':
remaining_pnl_amount = (current_price - entry_price) * remaining_quantity
else:
remaining_pnl_amount = (entry_price - current_price) * remaining_quantity
remaining_pnl_pct_margin = (remaining_pnl_amount / remaining_margin * 100) if remaining_margin > 0 else 0
# 直接比较剩余仓位盈亏百分比与第二目标(基于保证金)
if remaining_pnl_pct_margin >= take_profit_2_pct_margin:
logger.info(
f"{symbol} 触发第二目标止盈(基于保证金): "
f"剩余仓位盈亏={remaining_pnl_pct_margin:.2f}% of margin >= 目标={take_profit_2_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 目标价={take_profit_2:.4f}, "
f"剩余数量={remaining_quantity:.4f}"
)
exit_reason = 'take_profit'
# 计算总盈亏(原始仓位 + 部分止盈的盈亏)
# 部分止盈时的价格需要从数据库或记录中获取,这里简化处理
total_pnl_amount = remaining_pnl_amount # 简化:只计算剩余仓位盈亏
total_pnl_percent = (total_pnl_amount / margin * 100) if margin > 0 else 0
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
# 获取策略类型
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=total_pnl_amount,
pnl_percent=total_pnl_percent,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol, reason=exit_reason):
closed_positions.append(symbol)
continue
else:
# 如果未部分止盈,但达到止盈目标,直接全部平仓(基于保证金收益比)
take_profit = position_info.get('takeProfit')
if take_profit is not None:
# 计算止盈对应的保证金百分比
if position_info['side'] == 'BUY':
take_profit_amount = (take_profit - entry_price) * quantity
else: # SELL
take_profit_amount = (entry_price - take_profit) * quantity
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
if pnl_percent_margin >= take_profit_pct_margin:
logger.info(
f"{symbol} 触发止盈(基于保证金): "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 目标={take_profit_pct_margin:.2f}% of margin | "
f"当前价={current_price:.4f}, 止盈价={take_profit:.4f}"
)
exit_reason = 'take_profit'
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算持仓持续时间和策略类型
entry_time = position_info.get('entryTime')
duration_minutes = None
if entry_time:
try:
from datetime import datetime
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = position_info.get('strategyType', 'trend_following')
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_amount,
pnl_percent=pnl_percent_margin,
strategy_type=strategy_type,
duration_minutes=duration_minutes
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol, reason=exit_reason):
closed_positions.append(symbol)
continue
except Exception as e:
logger.error(f"检查止损止盈失败: {e}")
return closed_positions
async def get_position_summary(self) -> Dict:
"""
获取持仓摘要
Returns:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
return {
'totalPositions': len(positions),
'totalBalance': balance.get('total', 0),
'availableBalance': balance.get('available', 0),
'totalPnL': total_pnl,
'positions': [
{
'symbol': p['symbol'],
'positionAmt': p['positionAmt'],
'entryPrice': p['entryPrice'],
'pnl': p['unRealizedProfit']
}
for p in positions
]
}
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {}
async def sync_positions_with_binance(self):
"""
同步币安实际持仓状态与数据库状态
检查哪些持仓在数据库中还是open状态但在币安已经不存在了
"""
if not DB_AVAILABLE or not Trade:
logger.debug("数据库不可用,跳过持仓状态同步")
return
try:
logger.info("开始同步币安持仓状态与数据库...")
# 1. 获取币安实际持仓
binance_positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions}
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
# 2. 获取数据库中状态为open的交易记录
db_open_trades = Trade.get_all(status='open')
db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else ''})")
# 3. 找出在数据库中open但在币安已不存在的持仓
missing_in_binance = db_open_symbols - binance_symbols
if missing_in_binance:
logger.warning(
f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态但在币安已不存在: "
f"{', '.join(missing_in_binance)}"
)
# 4. 更新这些持仓的状态
for symbol in missing_in_binance:
try:
trades = Trade.get_by_symbol(symbol, status='open')
if not trades:
logger.warning(f"{symbol} [状态同步] ⚠️ 数据库中没有找到open状态的交易记录跳过")
continue
logger.info(f"{symbol} [状态同步] 找到 {len(trades)} 条open状态的交易记录开始更新...")
except Exception as get_trades_error:
logger.error(
f"{symbol} [状态同步] ❌ 获取交易记录失败: "
f"错误类型={type(get_trades_error).__name__}, 错误消息={str(get_trades_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
continue
for trade in trades:
trade_id = trade.get('id')
if not trade_id:
logger.warning(f"{symbol} [状态同步] ⚠️ 交易记录缺少ID字段跳过: {trade}")
continue
try:
logger.info(
f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})... | "
f"入场价: {trade.get('entry_price', 'N/A')}, "
f"数量: {trade.get('quantity', 'N/A')}, "
f"方向: {trade.get('side', 'N/A')}"
)
# 尝试从币安历史订单获取实际平仓价格
exit_price = None
close_orders = []
try:
# 获取最近的平仓订单reduceOnly=True的订单
import time
end_time = int(time.time() * 1000) # 当前时间(毫秒)
start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天
logger.debug(
f"{symbol} [状态同步] 获取历史订单: "
f"symbol={symbol}, startTime={start_time}, endTime={end_time}"
)
# 获取历史订单
orders = await self.client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time,
endTime=end_time
)
# 验证 orders 的类型
if not isinstance(orders, list):
logger.warning(
f"{symbol} [状态同步] ⚠️ futures_get_all_orders 返回的不是列表: "
f"类型={type(orders)}, 值={orders}"
)
orders = [] # 设置为空列表,避免后续错误
if not orders:
logger.debug(f"{symbol} [状态同步] 未找到历史订单")
else:
logger.debug(f"{symbol} [状态同步] 找到 {len(orders)} 个历史订单")
# 查找最近的平仓订单reduceOnly=True且已成交
close_orders = []
for o in orders:
try:
if isinstance(o, dict) and o.get('reduceOnly') == True and o.get('status') == 'FILLED':
close_orders.append(o)
except (AttributeError, TypeError) as e:
logger.warning(
f"{symbol} [状态同步] ⚠️ 处理订单数据时出错: "
f"错误类型={type(e).__name__}, 错误消息={str(e)}, "
f"订单数据类型={type(o)}, 订单数据={o}"
)
continue
if close_orders:
# 按时间倒序排序,取最近的
close_orders.sort(key=lambda x: x.get('updateTime', 0), reverse=True)
latest_order = close_orders[0]
# 获取平均成交价格
exit_price = float(latest_order.get('avgPrice', 0))
if exit_price > 0:
logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT")
else:
logger.warning(
f"{symbol} [状态同步] 历史订单中没有有效的avgPrice: {latest_order}"
)
except KeyError as key_error:
# KeyError 可能是访问 orders[symbol] 或其他字典访问错误
logger.error(
f"{symbol} [状态同步] ❌ 获取历史订单时KeyError: "
f"错误key={key_error}, 错误类型={type(key_error).__name__}"
)
import traceback
logger.debug(f"{symbol} [状态同步] KeyError详情:\n{traceback.format_exc()}")
except Exception as order_error:
logger.warning(
f"{symbol} [状态同步] 获取历史订单失败: "
f"错误类型={type(order_error).__name__}, 错误消息={str(order_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
# 如果无法从订单获取,使用当前价格
if not exit_price or exit_price <= 0:
try:
ticker = await self.client.get_ticker_24h(symbol)
if ticker and ticker.get('price'):
exit_price = float(ticker['price'])
logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
else:
exit_price = float(trade.get('entry_price', 0))
logger.warning(
f"{symbol} [状态同步] 无法获取当前价格ticker={ticker}"
f"使用入场价: {exit_price:.4f} USDT"
)
except KeyError as key_error:
# KeyError 可能是访问 ticker['price'] 时出错
logger.error(
f"{symbol} [状态同步] ❌ 获取当前价格时KeyError: {key_error}, "
f"ticker数据: {ticker if 'ticker' in locals() else 'N/A'}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
except Exception as ticker_error:
logger.warning(
f"{symbol} [状态同步] 获取当前价格失败: "
f"错误类型={type(ticker_error).__name__}, 错误消息={str(ticker_error)}"
f"使用入场价: {trade.get('entry_price', 'N/A')}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
# 计算盈亏确保所有值都是float类型避免Decimal类型问题
try:
entry_price = float(trade.get('entry_price', 0))
quantity = float(trade.get('quantity', 0))
if entry_price <= 0 or quantity <= 0:
logger.error(
f"{symbol} [状态同步] ❌ 交易记录数据无效: "
f"入场价={entry_price}, 数量={quantity}, 跳过更新"
)
continue
if trade.get('side') == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
logger.debug(
f"{symbol} [状态同步] 盈亏计算: "
f"入场价={entry_price:.4f}, 平仓价={exit_price:.4f}, "
f"数量={quantity:.4f}, 方向={trade.get('side', 'N/A')}, "
f"盈亏={pnl:.2f} USDT ({pnl_percent:.2f}%)"
)
except (ValueError, TypeError, KeyError) as calc_error:
logger.error(
f"{symbol} [状态同步] ❌ 计算盈亏失败 (ID: {trade_id}): "
f"错误类型={type(calc_error).__name__}, 错误消息={str(calc_error)}, "
f"交易记录数据: entry_price={trade.get('entry_price', 'N/A')}, "
f"quantity={trade.get('quantity', 'N/A')}, side={trade.get('side', 'N/A')}"
)
continue
# 从历史订单中获取平仓订单号
exit_order_id = None
latest_close_order = None
if close_orders:
exit_order_id = close_orders[0].get('orderId')
latest_close_order = close_orders[0]
if exit_order_id:
logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}")
# 使用 try-except 包裹,确保异常被正确处理
try:
# 计算持仓持续时间和策略类型
# exit_reason 细分:优先看币安平仓订单类型,其次用价格接近止损/止盈价做兜底
exit_reason = "sync"
exit_time_ts = None
try:
if latest_close_order and isinstance(latest_close_order, dict):
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
# STOP / STOP_MARKET 通常对应止损触发
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
exit_reason = "manual"
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
try:
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
except Exception:
# 保持默认 sync
pass
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
try:
def _close_to(a: float, b: float, max_pct: float = 0.01) -> bool:
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(exit_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
if sl is not None and _close_to(ep, float(sl), max_pct=0.01):
exit_reason = "stop_loss"
elif tp is not None and _close_to(ep, float(tp), max_pct=0.01):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.01):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.01):
exit_reason = "take_profit"
except Exception:
pass
# 持仓持续时间(分钟):优先用币安订单时间,否则用当前时间
entry_time = trade.get("entry_time")
duration_minutes = None
try:
et = None
if isinstance(entry_time, (int, float)):
et = int(entry_time)
elif isinstance(entry_time, str) and entry_time.strip():
# 兼容旧格式:字符串时间戳/日期字符串
s = entry_time.strip()
if s.isdigit():
et = int(s)
else:
from datetime import datetime
et = int(datetime.fromisoformat(s).timestamp())
xt = int(exit_time_ts) if exit_time_ts is not None else int(get_beijing_time())
if et is not None and xt >= et:
duration_minutes = int((xt - et) / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = 'trend_following' # 默认策略类型
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=exit_order_id, # 保存币安平仓订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
except Exception as update_error:
# update_exit 内部已经有异常处理,但如果仍然失败,记录错误但不中断同步流程
error_str = str(update_error)
if "Duplicate entry" in error_str and "exit_order_id" in error_str:
logger.warning(
f"{symbol} [状态同步] ⚠️ exit_order_id {exit_order_id} 唯一约束冲突,"
f"update_exit 内部处理失败,尝试不更新 exit_order_id"
)
# 再次尝试,不更新 exit_order_id
try:
from database.connection import db
from database.models import get_beijing_time
exit_time = int(exit_time_ts) if exit_time_ts is not None else get_beijing_time()
db.execute_update(
"""UPDATE trades
SET exit_price = %s, exit_time = %s,
exit_reason = %s, pnl = %s, pnl_percent = %s, status = 'closed'
WHERE id = %s""",
(exit_price, exit_time, exit_reason, pnl, pnl_percent, trade_id)
)
logger.info(f"{symbol} [状态同步] ✓ 已更新(跳过 exit_order_id")
except Exception as retry_error:
logger.error(f"{symbol} [状态同步] ❌ 重试更新也失败: {retry_error}")
raise
else:
# 其他错误,重新抛出
raise
logger.info(
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)"
)
# 清理本地记录
if symbol in self.active_positions:
await self._stop_position_monitoring(symbol)
del self.active_positions[symbol]
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
except Exception as e:
# 详细记录错误信息,包括异常类型、错误消息和堆栈跟踪
import traceback
error_type = type(e).__name__
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(
f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): "
f"错误类型={error_type}, 错误消息={error_msg}"
)
logger.debug(
f"{symbol} [状态同步] 错误详情:\n{error_traceback}"
)
# 如果是数据库相关错误,记录更多信息
if "Duplicate entry" in error_msg or "1062" in error_msg:
logger.error(
f"{symbol} [状态同步] 数据库唯一约束冲突,"
f"可能原因: exit_order_id={exit_order_id} 已被其他交易记录使用"
)
elif "database" in error_msg.lower() or "connection" in error_msg.lower():
logger.error(
f"{symbol} [状态同步] 数据库连接或执行错误,"
f"请检查数据库连接状态"
)
else:
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")
# 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的)
missing_in_db = binance_symbols - db_open_symbols
if missing_in_db:
logger.info(
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: "
f"{', '.join(missing_in_db)} (可能是手动开仓)"
)
# 为手动开仓的持仓创建数据库记录并启动监控
for symbol in missing_in_db:
try:
# 获取币安持仓详情
binance_position = next(
(p for p in binance_positions if p['symbol'] == symbol),
None
)
if not binance_position:
continue
position_amt = binance_position['positionAmt']
entry_price = binance_position['entryPrice']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
logger.info(
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
# 创建数据库记录
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=binance_position.get('leverage', 10),
entry_reason='manual_entry', # 标记为手动开仓
# 手动开仓无法拿到策略侧ATR/分步止盈,这里尽量补齐“规模字段”
notional_usdt=(float(entry_price) * float(quantity)) if entry_price and quantity else None,
margin_usdt=((float(entry_price) * float(quantity)) / float(binance_position.get('leverage', 10))) if entry_price and quantity and float(binance_position.get('leverage', 10) or 0) > 0 else None,
)
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
# 创建本地持仓记录(用于监控)
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# 计算止损止盈(基于保证金)
leverage = binance_position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
# 如果配置中没有设置止盈则使用止损的2倍作为默认
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0, # 手动开仓,无法计算涨跌幅
'orderId': None,
'tradeId': trade_id,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': leverage,
'entryReason': 'manual_entry',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
# 启动WebSocket监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控")
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
logger.info("持仓状态同步完成")
except Exception as e:
logger.error(f"同步持仓状态失败: {e}")
import traceback
logger.error(f"错误详情:\n{traceback.format_exc()}")
async def start_all_position_monitoring(self):
"""
启动所有持仓的WebSocket实时监控
"""
if not self._monitoring_enabled:
logger.info("实时监控已禁用,跳过启动")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning("客户端未初始化,无法启动实时监控")
return
# 获取当前所有持仓
positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in positions}
active_symbols = set(self.active_positions.keys())
logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else ''})")
# 为所有币安持仓启动监控即使不在active_positions中可能是手动开仓的
for position in positions:
symbol = position['symbol']
if symbol not in self._monitor_tasks:
# 如果不在active_positions中先创建记录
if symbol not in self.active_positions:
logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...")
# 这里会通过sync_positions_with_binance来处理但先启动监控
try:
entry_price = position.get('entryPrice', 0)
position_amt = position['positionAmt']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
# 创建临时记录用于监控
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# 计算止损止盈(基于保证金)
leverage = position.get('leverage', 10)
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
# 如果配置中没有设置止盈则使用止损的2倍作为默认
if take_profit_pct_margin is None or take_profit_pct_margin == 0:
take_profit_pct_margin = stop_loss_pct_margin * 2.0
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, quantity, leverage,
stop_loss_pct=stop_loss_pct_margin
)
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, quantity, leverage,
take_profit_pct=take_profit_pct_margin
)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0,
'orderId': None,
'tradeId': None,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': leverage,
'entryReason': 'manual_entry_temp',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
logger.info(f"{symbol} 已创建临时持仓记录用于监控")
except Exception as e:
logger.error(f"{symbol} 创建临时持仓记录失败: {e}")
await self._start_position_monitoring(symbol)
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
async def stop_all_position_monitoring(self):
"""
停止所有持仓的WebSocket监控
"""
symbols = list(self._monitor_tasks.keys())
for symbol in symbols:
await self._stop_position_monitoring(symbol)
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
async def _start_position_monitoring(self, symbol: str):
"""
启动单个持仓的WebSocket价格监控
Args:
symbol: 交易对
"""
if symbol in self._monitor_tasks:
logger.debug(f"{symbol} 监控任务已存在,跳过")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning(f"{symbol} 客户端未初始化,无法启动监控")
return
try:
task = asyncio.create_task(self._monitor_position_price(symbol))
self._monitor_tasks[symbol] = task
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
except Exception as e:
logger.error(f"启动 {symbol} 监控失败: {e}")
async def _stop_position_monitoring(self, symbol: str):
"""
停止单个持仓的WebSocket监控
Args:
symbol: 交易对
"""
# 幂等:可能会被多处/并发调用,先 pop 再处理,避免 KeyError
task = self._monitor_tasks.pop(symbol, None)
if task is None:
return
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.debug(f"已停止 {symbol} 的WebSocket监控")
async def _monitor_position_price(self, symbol: str):
"""
监控单个持仓的价格WebSocket实时推送
Args:
symbol: 交易对
"""
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
# 使用WebSocket订阅价格流
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
logger.debug(f"{symbol} WebSocket连接已建立开始接收价格更新")
retry_count = 0 # 连接成功,重置重试计数
async for msg in ws:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
# 根据文档ticker 流包含 'c' 字段(最后价格)
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
current_price = float(data['c'])
# 立即检查止损止盈
await self._check_single_position(symbol, current_price)
elif 'data' in data:
# 兼容组合流格式(如果使用 /stream 端点)
if isinstance(data['data'], dict) and 'c' in data['data']:
current_price = float(data['data']['c'])
await self._check_single_position(symbol, current_price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"{symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"{symbol} WebSocket连接关闭")
break
except asyncio.CancelledError:
logger.info(f"{symbol} 监控任务已取消")
break
except Exception as e:
retry_count += 1
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
if retry_count < max_retries:
# 指数退避重试
wait_time = min(2 ** retry_count, 30)
logger.info(f"{symbol} {wait_time}秒后重试连接...")
await asyncio.sleep(wait_time)
else:
logger.error(f"{symbol} WebSocket监控失败已达到最大重试次数")
# 回退到定时检查模式
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
break
# 清理任务
if symbol in self._monitor_tasks:
del self._monitor_tasks[symbol]
async def _check_single_position(self, symbol: str, current_price: float):
"""
检查单个持仓的止损止盈(实时检查)
Args:
symbol: 交易对
current_price: 当前价格
"""
position_info = self.active_positions.get(symbol)
if not position_info:
return
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
current_price_float = float(current_price)
# 计算当前盈亏(基于保证金)
leverage = position_info.get('leverage', 10)
position_value = entry_price * quantity
margin = position_value / leverage if leverage > 0 else position_value
# 计算盈亏金额
if position_info['side'] == 'BUY':
pnl_amount = (current_price_float - entry_price) * quantity
else: # SELL
pnl_amount = (entry_price - current_price_float) * quantity
# 计算盈亏百分比(相对于保证金,与币安一致)
pnl_percent_margin = (pnl_amount / margin * 100) if margin > 0 else 0
# 也计算价格百分比(用于显示)
if position_info['side'] == 'BUY':
pnl_percent_price = ((current_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl_percent_price = ((entry_price - current_price_float) / entry_price) * 100
# 更新最大盈利(基于保证金)
if pnl_percent_margin > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent_margin
# 移动止损逻辑(盈利后保护利润,基于保证金)
# 每次检查时从Redis重新加载配置确保配置修改能即时生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception as e:
logger.debug(f"从Redis重新加载配置失败: {e}")
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01) # 相对于保证金
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01) # 相对于保证金
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后(相对于保证金),激活移动止损
if pnl_percent_margin > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent_margin:.2f}% of margin)"
)
else:
# 盈利超过阈值后,止损移至保护利润位(基于保证金)
# 计算需要保护的利润金额
protect_amount = margin * trailing_protect
# 计算对应的止损价
if position_info['side'] == 'BUY':
# 保护利润:当前盈亏 - 保护金额 = (止损价 - 开仓价) × 数量
# 所以:止损价 = 开仓价 + (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price + (pnl_amount - protect_amount) / quantity
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
else: # SELL
# 做空:止损价 = 开仓价 - (当前盈亏 - 保护金额) / 数量
new_stop_loss = entry_price - (pnl_amount - protect_amount) / quantity
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}% of margin = {protect_amount:.4f} USDT)"
)
# 检查止损(基于保证金收益比)
stop_loss = position_info.get('stopLoss')
should_close = False
exit_reason = None
if stop_loss is not None:
# 计算止损对应的保证金百分比目标
# 止损金额 = (开仓价 - 止损价) × 数量 或 (止损价 - 开仓价) × 数量
if position_info['side'] == 'BUY':
stop_loss_amount = (entry_price - stop_loss) * quantity
else: # SELL
stop_loss_amount = (stop_loss - entry_price) * quantity
stop_loss_pct_margin = (stop_loss_amount / margin * 100) if margin > 0 else 0
# 每5%亏损记录一次诊断日志(帮助排查问题)
if pnl_percent_margin <= -5.0:
should_log = (int(abs(pnl_percent_margin)) % 5 == 0) or (pnl_percent_margin <= -10.0 and pnl_percent_margin > -10.5)
if should_log:
trigger_condition = pnl_percent_margin <= -stop_loss_pct_margin
logger.warning(
f"{symbol} [实时监控] 诊断: 亏损{pnl_percent_margin:.2f}% of margin | "
f"当前价: {current_price_float:.4f} | "
f"入场价: {entry_price:.4f} | "
f"止损价: {stop_loss:.4f} (目标: -{stop_loss_pct_margin:.2f}% of margin) | "
f"方向: {position_info['side']} | "
f"是否触发: {trigger_condition} | "
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
)
# 直接比较当前盈亏百分比与止损目标(基于保证金)
if pnl_percent_margin <= -stop_loss_pct_margin:
should_close = True
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
logger.warning(
f"{symbol} [实时监控] ⚠⚠⚠ 触发止损(基于保证金): "
f"当前盈亏={pnl_percent_margin:.2f}% of margin <= 止损目标=-{stop_loss_pct_margin:.2f}% of margin | "
f"当前价={current_price_float:.4f}, 止损价={stop_loss:.4f} | "
f"保证金={margin:.4f} USDT, 亏损金额={pnl_amount:.4f} USDT"
)
# 检查止盈(基于保证金收益比)
if not should_close:
take_profit = position_info.get('takeProfit')
if take_profit is not None:
# 计算止盈对应的保证金百分比目标
# 止盈金额 = (止盈价 - 开仓价) × 数量 或 (开仓价 - 止盈价) × 数量
if position_info['side'] == 'BUY':
take_profit_amount = (take_profit - entry_price) * quantity
else: # SELL
take_profit_amount = (entry_price - take_profit) * quantity
take_profit_pct_margin = (take_profit_amount / margin * 100) if margin > 0 else 0
# 每5%盈利记录一次诊断日志(帮助排查问题)
if pnl_percent_margin >= 5.0:
should_log = (int(pnl_percent_margin) % 5 == 0) or (pnl_percent_margin >= 10.0 and pnl_percent_margin < 10.5)
if should_log:
trigger_condition = pnl_percent_margin >= take_profit_pct_margin
logger.warning(
f"{symbol} [实时监控] 诊断: 盈利{pnl_percent_margin:.2f}% of margin | "
f"当前价: {current_price_float:.4f} | "
f"入场价: {entry_price:.4f} | "
f"止盈价: {take_profit:.4f} (目标: {take_profit_pct_margin:.2f}% of margin) | "
f"方向: {position_info['side']} | "
f"是否触发: {trigger_condition} | "
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
)
# 直接比较当前盈亏百分比与止盈目标(基于保证金)
if pnl_percent_margin >= take_profit_pct_margin:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发止盈(基于保证金): "
f"当前盈亏={pnl_percent_margin:.2f}% of margin >= 止盈目标={take_profit_pct_margin:.2f}% of margin | "
f"当前价={current_price_float:.4f}, 止盈价={take_profit:.4f}"
)
# 如果触发止损止盈,执行平仓
if should_close:
logger.info(
f"{symbol} [自动平仓] 开始执行平仓操作 | "
f"原因: {exit_reason} | "
f"入场价: {entry_price:.4f} | "
f"当前价: {current_price_float:.4f} | "
f"盈亏: {pnl_percent_margin:.2f}% of margin ({pnl_amount:.4f} USDT) | "
f"数量: {quantity:.4f}"
)
# 执行平仓(让 close_position 统一处理数据库更新,避免重复更新和状态不一致)
logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...")
success = await self.close_position(symbol, reason=exit_reason)
if success:
logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成")
# 平仓成功后,立即触发一次状态同步,确保数据库状态与币安一致
try:
await asyncio.sleep(2) # 等待2秒让币安订单完全成交
await self.sync_positions_with_binance()
logger.debug(f"{symbol} [自动平仓] 已触发状态同步")
except Exception as sync_error:
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
else:
logger.error(f"{symbol} [自动平仓] ❌ 平仓失败")
# 即使平仓失败,也尝试同步状态(可能币安已经平仓了)
try:
await self.sync_positions_with_binance()
except Exception as sync_error:
logger.warning(f"{symbol} [自动平仓] 状态同步失败: {sync_error}")
async def diagnose_position(self, symbol: str):
"""
诊断持仓状态(用于排查为什么没有自动平仓)
Args:
symbol: 交易对
"""
try:
logger.info(f"{symbol} [诊断] 开始诊断持仓状态...")
# 1. 检查是否在active_positions中
if symbol not in self.active_positions:
logger.warning(f"{symbol} [诊断] ❌ 不在本地持仓记录中 (active_positions)")
logger.warning(f" 可能原因: 手动开仓或系统重启后未同步")
logger.warning(f" 解决方案: 等待下次状态同步或手动触发同步")
else:
position_info = self.active_positions[symbol]
logger.info(f"{symbol} [诊断] ✓ 在本地持仓记录中")
logger.info(f" 入场价: {position_info['entryPrice']:.4f}")
logger.info(f" 方向: {position_info['side']}")
logger.info(f" 数量: {position_info['quantity']:.4f}")
logger.info(f" 止损价: {position_info['stopLoss']:.4f}")
logger.info(f" 止盈价: {position_info['takeProfit']:.4f}")
# 2. 检查WebSocket监控状态
if symbol in self._monitor_tasks:
task = self._monitor_tasks[symbol]
if task.done():
logger.warning(f"{symbol} [诊断] ⚠ WebSocket监控任务已结束")
try:
await task # 获取异常信息
except Exception as e:
logger.warning(f" 任务异常: {e}")
else:
logger.info(f"{symbol} [诊断] ✓ WebSocket监控任务运行中")
else:
logger.warning(f"{symbol} [诊断] ❌ 没有WebSocket监控任务")
logger.warning(f" 可能原因: 监控未启动或已停止")
# 3. 获取币安实际持仓
positions = await self.client.get_open_positions()
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
if not binance_position:
logger.warning(f"{symbol} [诊断] ❌ 币安账户中没有持仓")
return
logger.info(f"{symbol} [诊断] ✓ 币安账户中有持仓")
entry_price_binance = binance_position.get('entryPrice', 0)
mark_price = binance_position.get('markPrice', 0)
unrealized_pnl = binance_position.get('unRealizedProfit', 0)
logger.info(f" 币安入场价: {entry_price_binance:.4f}")
logger.info(f" 标记价格: {mark_price:.4f}")
logger.info(f" 未实现盈亏: {unrealized_pnl:.2f} USDT")
# 4. 计算实际盈亏
if symbol in self.active_positions:
position_info = self.active_positions[symbol]
entry_price = float(position_info['entryPrice'])
take_profit = float(position_info['takeProfit'])
if position_info['side'] == 'BUY':
pnl_percent = ((mark_price - entry_price) / entry_price) * 100
take_profit_pct = ((take_profit - entry_price) / entry_price) * 100
should_trigger = mark_price >= take_profit
else: # SELL
pnl_percent = ((entry_price - mark_price) / entry_price) * 100
take_profit_pct = ((entry_price - take_profit) / entry_price) * 100
should_trigger = mark_price <= take_profit
logger.info(f"{symbol} [诊断] 盈亏分析:")
logger.info(f" 当前盈亏: {pnl_percent:.2f}%")
logger.info(f" 止盈目标: {take_profit_pct:.2f}%")
logger.info(f" 当前价: {mark_price:.4f}")
logger.info(f" 止盈价: {take_profit:.4f}")
logger.info(f" 价格差: {abs(mark_price - take_profit):.4f}")
logger.info(f" 应该触发: {should_trigger}")
if pnl_percent > take_profit_pct and not should_trigger:
logger.error(f"{symbol} [诊断] ❌ 异常: 盈亏{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!")
logger.error(f" 可能原因: 浮点数精度问题或止盈价格计算错误")
logger.info(f"{symbol} [诊断] 诊断完成")
except Exception as e:
logger.error(f"{symbol} [诊断] 诊断失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")