auto_trade_sys/backend/api/routes/account.py
薇薇安 1fcd692368 a
2026-01-23 19:31:20 +08:00

1649 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
账户实时数据API - 从币安API获取实时账户和订单数据
"""
from fastapi import APIRouter, HTTPException, Header, Depends
from fastapi import Query
import sys
from pathlib import Path
import logging
import time
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
sys.path.insert(0, str(project_root / 'trading_system'))
from database.models import TradingConfig, Account
from api.auth_deps import get_account_id
logger = logging.getLogger(__name__)
router = APIRouter()
async def _ensure_exchange_sltp_for_symbol(symbol: str, account_id: int = 1):
"""
在币安侧补挂该 symbol 的止损/止盈保护单STOP_MARKET + TAKE_PROFIT_MARKET
该接口用于“手动补挂”,不依赖 trading_system 的监控任务。
"""
# 从 accounts 表读取账号私有API密钥
account_id_int = int(account_id or 1)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id_int)
if (not api_key or not api_secret) and status == "active":
logger.error(f"[account_id={account_id_int}] API密钥未配置")
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id_int}")
# 导入交易系统的BinanceClient复用其精度/持仓模式处理)
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
# 1) 获取当前持仓(需要知道方向)
raw_positions = await client.client.futures_position_information(symbol=symbol)
nonzero = []
for p in raw_positions or []:
try:
amt = float(p.get("positionAmt", 0) or 0)
if amt != 0:
nonzero.append((amt, p))
except Exception:
continue
if not nonzero:
raise HTTPException(status_code=400, detail=f"{symbol} 当前无持仓,无法补挂止盈止损")
# 2) 获取持仓模式
dual_side = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side = bool(mode_res.get("dualSidePosition"))
except Exception:
dual_side = None
# 3) 取净持仓(单向)或第一条非零腿(对冲/兜底)
amt, p0 = nonzero[0]
net_amt = sum([a for a, _ in nonzero])
if dual_side is False:
amt = net_amt
side = "BUY" if amt > 0 else "SELL"
mark_price = None
try:
mark_price = float(p0.get("markPrice", 0) or 0) or None
except Exception:
mark_price = None
# 4) 从数据库 open trade 取止损/止盈价(优先取最近一条)
from database.models import Trade
open_trades = Trade.get_by_symbol(symbol, status='open') or []
if not open_trades:
raise HTTPException(status_code=400, detail=f"{symbol} 数据库无 open 交易记录,无法确定止损止盈价")
# 尽量取最新一条 open trade避免同一symbol多条 open 时取到旧记录)
try:
open_trades.sort(key=lambda x: int(x.get("id", 0) or 0), reverse=True)
except Exception:
pass
trade = open_trades[0]
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_2") or trade.get("take_profit_price") or trade.get("take_profit_1")
try:
sl = float(sl) if sl is not None else None
except Exception:
sl = None
try:
tp = float(tp) if tp is not None else None
except Exception:
tp = None
# 兼容旧数据库:如果 trades 表还没迁移 stop_loss_price / take_profit_price 字段,
# 则回退用 entry_price/quantity/leverage + 配置的 STOP_LOSS_PERCENT/TAKE_PROFIT_PERCENT 计算。
if not sl or not tp:
try:
entry_price = float(trade.get("entry_price") or 0)
qty = float(trade.get("quantity") or 0)
lv = float(trade.get("leverage") or 0) or float(p0.get("leverage") or 0) or 10.0
if entry_price <= 0 or qty <= 0 or lv <= 0:
raise ValueError("entry_price/quantity/leverage invalid")
def _ratio(v, default):
try:
x = float(v)
# 兼容:若误存成 5表示5%),则转为 0.05
if x > 1:
x = x / 100.0
if x < 0:
x = default
return x
except Exception:
return default
sl_pct = _ratio(TradingConfig.get_value("STOP_LOSS_PERCENT", 0.05), 0.05)
tp_pct = _ratio(TradingConfig.get_value("TAKE_PROFIT_PERCENT", 0.15), 0.15)
notional = entry_price * qty
margin = notional / lv
sl_amount = margin * sl_pct
tp_amount = margin * tp_pct
if side == "BUY":
sl = entry_price - (sl_amount / qty)
tp = entry_price + (tp_amount / qty)
else:
sl = entry_price + (sl_amount / qty)
tp = entry_price - (tp_amount / qty)
if not sl or not tp or sl <= 0 or tp <= 0:
raise ValueError("computed sl/tp invalid")
except Exception:
raise HTTPException(status_code=400, detail=f"{symbol} 数据库缺少止损/止盈价,且无法回退计算,无法补挂")
# 5) 取消旧的保护单Algo 条件单),避免重复
try:
await client.cancel_open_algo_orders_by_order_types(
symbol, {"STOP_MARKET", "TAKE_PROFIT_MARKET", "TRAILING_STOP_MARKET"}
)
except Exception:
pass
# 6) 下保护单closePosition=True
symbol_info = await client.get_symbol_info(symbol)
# 使用 trading_system/binance_client 的格式化方法(如果不存在则回退简单格式)
try:
fmt_price = BinanceClient._format_price_str_with_rounding # type: ignore[attr-defined]
except Exception:
fmt_price = None
def _fmt(price: float, rounding_mode: str) -> str:
if fmt_price:
return fmt_price(price, symbol_info, rounding_mode) # type: ignore[misc]
return str(round(float(price), int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8))
# 触发价避免“立即触发”
cp = float(mark_price) if mark_price else None
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
min_step = tick if tick and tick > 0 else (10 ** (-pp) if pp and pp > 0 else 1e-8)
sl_price = float(sl)
tp_price = float(tp)
if cp and cp > 0:
# stop
if side == "BUY" and sl_price >= cp:
sl_price = max(0.0, cp - min_step)
if side == "SELL" and sl_price <= cp:
sl_price = cp + min_step
# tp
if side == "BUY" and tp_price <= cp:
tp_price = cp + min_step
if side == "SELL" and tp_price >= cp:
tp_price = max(0.0, cp - min_step)
# rounding止损 long 用 UPshort 用 DOWN止盈 long 用 DOWNshort 用 UP
sl_round = "UP" if side == "BUY" else "DOWN"
tp_round = "DOWN" if side == "BUY" else "UP"
close_side = "SELL" if side == "BUY" else "BUY"
# Algo 条件单使用 triggerPrice不是 stopPrice
sl_params = {
"algoType": "CONDITIONAL",
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"triggerPrice": _fmt(sl_price, sl_round),
"closePosition": True,
"workingType": "MARK_PRICE",
}
tp_params = {
"algoType": "CONDITIONAL",
"symbol": symbol,
"side": close_side,
"type": "TAKE_PROFIT_MARKET",
"triggerPrice": _fmt(tp_price, tp_round),
"closePosition": True,
"workingType": "MARK_PRICE",
}
if dual_side is True:
sl_params["positionSide"] = "LONG" if side == "BUY" else "SHORT"
tp_params["positionSide"] = "LONG" if side == "BUY" else "SHORT"
sl_order = await client.futures_create_algo_order(sl_params)
tp_order = await client.futures_create_algo_order(tp_params)
# 再查一次未成交委托,确认是否真的挂上(并用于前端展示/排查)
open_orders = []
try:
oo = await client.futures_get_open_algo_orders(symbol=symbol, algo_type="CONDITIONAL")
if isinstance(oo, list):
for o in oo:
try:
if not isinstance(o, dict):
continue
otype2 = str(o.get("orderType") or o.get("type") or "").upper()
if otype2 in {"STOP_MARKET", "TAKE_PROFIT_MARKET", "TRAILING_STOP_MARKET", "STOP", "TAKE_PROFIT"}:
open_orders.append(
{
"algoId": o.get("algoId"),
"orderType": otype2,
"side": o.get("side"),
"triggerPrice": o.get("triggerPrice"),
"workingType": o.get("workingType"),
"positionSide": o.get("positionSide"),
"closePosition": o.get("closePosition"),
"algoStatus": o.get("algoStatus"),
"updateTime": o.get("updateTime"),
}
)
except Exception:
continue
except Exception:
open_orders = []
return {
"symbol": symbol,
"position_side": side,
"dual_side_position": dual_side,
"stop_loss_price": sl_price,
"take_profit_price": tp_price,
"orders": {
"stop_market": sl_order,
"take_profit_market": tp_order,
},
"open_protection_orders": open_orders,
"ui_hint": "在币安【U本位合约】里这类 STOP/TP 通常显示在【条件单/止盈止损/计划委托】而不一定在普通【当前委托(限价)】列表。",
}
finally:
await client.disconnect()
@router.post("/positions/{symbol}/sltp/ensure")
async def ensure_position_sltp(symbol: str, account_id: int = Depends(get_account_id)):
"""
手动补挂该 symbol 的止盈止损保护单(币安侧可见)。
"""
try:
return await _ensure_exchange_sltp_for_symbol(symbol, account_id=int(account_id))
except HTTPException:
raise
except Exception as e:
msg = str(e) or repr(e) or "unknown error"
logger.error(f"{symbol} 补挂止盈止损失败: {msg}", exc_info=True)
raise HTTPException(status_code=500, detail=f"补挂止盈止损失败: {msg}")
@router.post("/positions/sltp/ensure-all")
async def ensure_all_positions_sltp(
limit: int = Query(50, ge=1, le=200, description="最多处理多少个持仓symbol"),
account_id: int = Depends(get_account_id),
):
"""
批量补挂当前所有持仓的止盈止损保护单。
"""
# 先拿当前持仓symbol列表
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
logger.error(f"[account_id={account_id}] API密钥未配置")
raise HTTPException(status_code=400, detail=f"API密钥未配置account_id={account_id}")
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
await client.connect()
try:
positions = await client.get_open_positions()
symbols = [p["symbol"] for p in (positions or []) if float(p.get("positionAmt", 0) or 0) != 0]
finally:
await client.disconnect()
symbols = symbols[: int(limit or 50)]
results = []
errors = []
for sym in symbols:
try:
res = await _ensure_exchange_sltp_for_symbol(sym, account_id=account_id)
results.append(
{
"symbol": sym,
"ok": True,
"orders": res.get("orders"),
"open_protection_orders": res.get("open_protection_orders"),
}
)
except HTTPException as he:
errors.append(
{
"symbol": sym,
"ok": False,
"status_code": getattr(he, "status_code", None),
"detail": getattr(he, "detail", None),
}
)
except Exception as e:
msg = str(e) or repr(e) or "unknown error"
errors.append({"symbol": sym, "ok": False, "error": msg})
return {
"total": len(symbols),
"ok": len([r for r in results if r.get("ok")]),
"failed": len(errors),
"results": results,
"errors": errors,
}
async def get_realtime_account_data(account_id: int = 1):
"""从币安API实时获取账户数据"""
logger.info("=" * 60)
logger.info("开始获取实时账户数据")
logger.info("=" * 60)
try:
# 从 accounts 表读取账号私有API密钥
logger.info(f"步骤1: 从accounts读取API配置... (account_id={account_id})")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
logger.info(f" - API密钥存在: {bool(api_key)}")
if api_key:
logger.info(f" - API密钥长度: {len(api_key)} 字符")
else:
logger.warning(" - API密钥为空!")
logger.info(f" - API密钥Secret存在: {bool(api_secret)}")
if api_secret:
logger.info(f" - API密钥Secret长度: {len(api_secret)} 字符")
else:
logger.warning(" - API密钥Secret为空!")
logger.info(f" - 使用测试网: {use_testnet}")
if not api_key or not api_secret:
error_msg = f"API密钥未配置account_id={account_id}请在配置界面设置该账号的BINANCE_API_KEY和BINANCE_API_SECRET"
logger.error(f"[account_id={account_id}] API密钥未配置")
logger.error(f"{error_msg}")
raise HTTPException(
status_code=400,
detail=error_msg
)
# 导入交易系统的BinanceClient
logger.info("步骤2: 导入BinanceClient...")
try:
from binance_client import BinanceClient
logger.info(" ✓ 从当前路径导入BinanceClient成功")
except ImportError as e:
logger.warning(f" - 从当前路径导入失败: {e}")
# 如果直接导入失败尝试从trading_system导入
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
logger.info(f" - 添加路径到sys.path: {trading_system_path}")
try:
from binance_client import BinanceClient
logger.info(" ✓ 从trading_system路径导入BinanceClient成功")
except ImportError as e2:
logger.error(f" ✗ 导入BinanceClient失败: {e2}")
raise
# 创建客户端
logger.info("步骤3: 创建BinanceClient实例...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info(f" ✓ 客户端创建成功 (testnet={use_testnet})")
# 连接币安API
logger.info("步骤4: 连接币安API...")
try:
await client.connect()
logger.info(" ✓ 币安API连接成功")
except Exception as e:
logger.error(f" ✗ 币安API连接失败: {e}", exc_info=True)
raise
# 读取持仓模式(单向/对冲),用于前端“重点说明/自检”
dual_side_position = None
position_mode = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side_position = bool(mode_res.get("dualSidePosition"))
position_mode = "hedge" if dual_side_position else "one_way"
except Exception as e:
logger.warning(f"读取持仓模式失败(将显示为未知): {e}")
# 获取账户余额
logger.info("步骤5: 获取账户余额...")
try:
balance = await client.get_account_balance()
logger.info(" ✓ 账户余额获取成功")
logger.info(f" - 返回数据类型: {type(balance)}")
logger.info(f" - 返回数据内容: {balance}")
if balance:
logger.info(f" - 总余额: {balance.get('total', 'N/A')} USDT")
logger.info(f" - 可用余额: {balance.get('available', 'N/A')} USDT")
logger.info(f" - 保证金: {balance.get('margin', 'N/A')} USDT")
if balance.get('total', 0) == 0:
logger.warning(" ⚠ 账户余额为0可能是API权限问题或账户确实无余额")
else:
logger.warning(" ⚠ 返回的余额数据为空")
except Exception as e:
logger.error(f" ✗ 获取账户余额失败: {e}", exc_info=True)
raise
# 获取持仓
logger.info("步骤6: 获取持仓信息...")
try:
positions = await client.get_open_positions()
logger.info(" ✓ 持仓信息获取成功")
logger.info(f" - 返回数据类型: {type(positions)}")
logger.info(f" - 持仓数量: {len(positions)}")
if positions:
logger.info(" - 持仓详情:")
for i, pos in enumerate(positions[:5], 1): # 只显示前5个
logger.info(f" {i}. {pos.get('symbol', 'N/A')}: "
f"数量={pos.get('positionAmt', 0)}, "
f"入场价={pos.get('entryPrice', 0)}, "
f"盈亏={pos.get('unRealizedProfit', 0)}")
if len(positions) > 5:
logger.info(f" ... 还有 {len(positions) - 5} 个持仓")
else:
logger.info(" - 当前无持仓")
except Exception as e:
logger.error(f" ✗ 获取持仓信息失败: {e}", exc_info=True)
raise
# 计算总仓位价值和总盈亏
logger.info("步骤7: 计算仓位统计...")
# total_position_value历史上这里代表“名义仓位价值(notional)”(按标记价)
total_position_value = 0
# total_margin_value更贴近风控配置语义保证金占用
total_margin_value = 0
total_pnl = 0
open_positions_count = 0
for pos in positions:
position_amt = float(pos.get('positionAmt', 0))
if position_amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0))
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
if mark_price == 0:
# 如果没有标记价格,使用入场价
mark_price = entry_price
position_value = abs(position_amt * mark_price)
total_position_value += position_value
# 保证金占用(粗略口径):名义/杠杆(币安页面的展示会更复杂,但这个口径与 MAX_TOTAL_POSITION_PERCENT 对齐)
try:
lv = float(pos.get('leverage', 0) or 0)
if lv <= 0:
lv = 1.0
except Exception:
lv = 1.0
total_margin_value += (position_value / lv)
total_pnl += unrealized_pnl
open_positions_count += 1
logger.debug(f" - {pos.get('symbol')}: 价值={position_value:.2f}, 盈亏={unrealized_pnl:.2f}")
logger.info(" ✓ 仓位统计计算完成")
logger.info(f" - 总名义仓位: {total_position_value:.2f} USDT")
logger.info(f" - 总保证金占用(估算): {total_margin_value:.2f} USDT")
logger.info(f" - 总盈亏: {total_pnl:.2f} USDT")
logger.info(f" - 持仓数量: {open_positions_count}")
# 断开连接
logger.info("步骤8: 断开币安API连接...")
try:
await client.disconnect()
logger.info(" ✓ 连接已断开")
except Exception as e:
logger.warning(f" ⚠ 断开连接时出错: {e}")
# 构建返回结果
result = {
"total_balance": balance.get('total', 0) if balance else 0,
"available_balance": balance.get('available', 0) if balance else 0,
# 名义仓位(按标记价汇总)
"total_position_value": total_position_value,
# 保证金占用(名义/杠杆汇总)
"total_margin_value": total_margin_value,
"total_pnl": total_pnl,
"open_positions": open_positions_count,
# 账户持仓模式(重点:建议使用 one_way
"position_mode": position_mode,
"dual_side_position": dual_side_position,
}
logger.info("=" * 60)
logger.info("账户数据获取成功!")
logger.info(f"最终结果: {result}")
logger.info("=" * 60)
return result
except HTTPException as e:
logger.error("=" * 60)
logger.error(f"HTTP异常: {e.status_code} - {e.detail}")
logger.error("=" * 60)
raise
except Exception as e:
error_msg = f"获取账户数据失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"异常类型: {type(e).__name__}")
logger.error(f"错误信息: {error_msg}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.get("/realtime")
async def get_realtime_account(account_id: int = Depends(get_account_id)):
"""获取实时账户数据"""
return await get_realtime_account_data(account_id=account_id)
@router.get("/positions")
async def get_realtime_positions(account_id: int = Depends(get_account_id)):
"""获取实时持仓数据"""
client = None
try:
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
logger.info(f"尝试获取实时持仓数据 (testnet={use_testnet}, account_id={account_id})")
if not api_key or not api_secret:
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(
status_code=400,
detail=error_msg
)
# 导入BinanceClient
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API获取持仓...")
await client.connect()
positions = await client.get_open_positions()
logger.info(f"获取到 {len(positions)} 个持仓")
# 格式化持仓数据
formatted_positions = []
for pos in positions:
position_amt = float(pos.get('positionAmt', 0))
if position_amt == 0:
continue
entry_price = float(pos.get('entryPrice', 0))
mark_price = float(pos.get('markPrice', 0))
unrealized_pnl = float(pos.get('unRealizedProfit', 0))
if mark_price == 0:
mark_price = entry_price
# === 名义/保证金口径说明(与币安展示更接近)===
# - 币安的名义价值/仓位价值通常随标记价(markPrice)变动
# - DB 中的 notional_usdt/margin_usdt 通常是“开仓时”写入,用于复盘/统计
# - 若发生部分止盈/减仓:币安 positionAmt 会变小,但 DB 里的 notional/margin 可能仍是“原始开仓量”
# → 会出现:数量=6.8,但名义/保证金像是 13.6 的两倍(与你反馈一致)
#
# 因此:实时持仓展示统一使用“当前数量×标记价”的实时名义/保证金,
# 并额外返回 original_* 字段保留 DB 开仓口径,避免混用导致误解。
# 兼容旧字段entry_value_usdt 仍保留(但它是按入场价计算的名义)
entry_value_usdt = abs(position_amt) * entry_price
leverage = float(pos.get('leverage', 1))
if leverage <= 0:
leverage = 1.0
# 当前持仓名义价值USDT按标记价
notional_usdt_live = abs(position_amt) * mark_price
# 当前持仓保证金USDT名义/杠杆
margin_usdt_live = notional_usdt_live / leverage
pnl_percent = 0
if margin_usdt_live > 0:
pnl_percent = (unrealized_pnl / margin_usdt_live) * 100
# 尝试从数据库获取开仓时间、止损止盈价格(以及交易规模字段)
entry_time = None
stop_loss_price = None
take_profit_price = None
take_profit_1 = None
take_profit_2 = None
atr_value = None
db_margin_usdt = None
db_notional_usdt = None
entry_order_id = None
entry_order_type = None
try:
from database.models import Trade
db_trades = Trade.get_by_symbol(pos.get('symbol'), status='open')
if db_trades:
# 找到匹配的交易记录(优先通过 entry_price 近似匹配;否则取最新一条 open 记录兜底)
matched = None
for db_trade in db_trades:
try:
if abs(float(db_trade.get('entry_price', 0)) - entry_price) < 0.01:
matched = db_trade
break
except Exception:
continue
if matched is None:
matched = db_trades[0]
entry_time = matched.get('entry_time')
stop_loss_price = matched.get('stop_loss_price')
take_profit_price = matched.get('take_profit_price')
take_profit_1 = matched.get('take_profit_1')
take_profit_2 = matched.get('take_profit_2')
atr_value = matched.get('atr')
db_margin_usdt = matched.get('margin_usdt')
db_notional_usdt = matched.get('notional_usdt')
entry_order_id = matched.get('entry_order_id')
except Exception as e:
logger.debug(f"获取数据库信息失败: {e}")
# 如果数据库中有 entry_order_id尝试从币安查询订单类型LIMIT/MARKET
if entry_order_id:
try:
info = await client.client.futures_get_order(symbol=pos.get('symbol'), orderId=int(entry_order_id))
if isinstance(info, dict):
entry_order_type = info.get("type")
except Exception:
entry_order_type = None
# 如果没有从数据库获取到止损止盈价格,前端会自己计算
# 注意:数据库可能没有存储止损止盈价格,这是正常的
formatted_positions.append({
"symbol": pos.get('symbol'),
"side": "BUY" if position_amt > 0 else "SELL",
"quantity": abs(position_amt),
"entry_price": entry_price,
# 兼容旧字段entry_value_usdt 仍保留(前端已有使用)
"entry_value_usdt": entry_value_usdt,
# 实时展示字段:与币安更一致(按当前数量×标记价)
"notional_usdt": notional_usdt_live,
"margin_usdt": margin_usdt_live,
# 额外返回“开仓记录口径”(用于排查部分止盈/减仓导致的不一致)
"original_notional_usdt": db_notional_usdt,
"original_margin_usdt": db_margin_usdt,
"mark_price": mark_price,
"pnl": unrealized_pnl,
"pnl_percent": pnl_percent, # 基于保证金的盈亏百分比
"leverage": int(pos.get('leverage', 1)),
"entry_time": entry_time, # 开仓时间
"stop_loss_price": stop_loss_price, # 止损价格(如果可用)
"take_profit_price": take_profit_price, # 止盈价格(如果可用)
"take_profit_1": take_profit_1,
"take_profit_2": take_profit_2,
"atr": atr_value,
"entry_order_id": entry_order_id,
"entry_order_type": entry_order_type, # LIMIT / MARKET用于仪表板展示“限价/市价”)
})
logger.info(f"格式化后 {len(formatted_positions)} 个有效持仓")
return formatted_positions
except HTTPException:
raise
except Exception as e:
error_msg = f"获取持仓数据失败: {str(e)}"
logger.error(error_msg, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
finally:
# 确保断开连接(避免连接泄漏)
try:
if client is not None:
await client.disconnect()
except Exception:
pass
@router.post("/positions/{symbol}/close")
async def close_position(symbol: str, account_id: int = Depends(get_account_id)):
"""手动平仓指定交易对的持仓"""
try:
logger.info(f"=" * 60)
logger.info(f"收到平仓请求: {symbol}")
logger.info(f"=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
logger.info("✓ 成功导入交易系统模块")
except ImportError as import_error:
logger.warning(f"首次导入失败: {import_error}尝试从trading_system路径导入")
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
logger.info("✓ 从trading_system路径导入成功")
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 检查币安是否有持仓(使用原始 position_information确保能拿到 positionSide 以处理 -4061
logger.info(f"检查 {symbol} 在币安的持仓状态...")
# 读取持仓模式dualSidePosition=True => 对冲模式(必须传 positionSide=LONG/SHORT
dual_side = None
try:
mode_res = await client.client.futures_get_position_mode()
if isinstance(mode_res, dict):
dual_side = bool(mode_res.get("dualSidePosition"))
except Exception as e:
logger.warning(f"读取持仓模式失败(将按单向模式兜底): {e}")
dual_side = None
raw_positions = await client.client.futures_position_information(symbol=symbol)
nonzero_positions = []
for p in raw_positions or []:
try:
amt = float(p.get("positionAmt", 0))
except Exception:
continue
if abs(amt) > 0:
nonzero_positions.append((amt, p))
# 兼容旧逻辑:如果原始接口异常,回退到封装方法
if not nonzero_positions:
try:
positions = await client.get_open_positions()
position = next((p for p in positions if p['symbol'] == symbol and float(p['positionAmt']) != 0), None)
if position:
nonzero_positions = [(float(position["positionAmt"]), {"positionAmt": position["positionAmt"]})]
except Exception:
nonzero_positions = []
if not nonzero_positions:
logger.warning(f"{symbol} 币安账户中没有持仓,可能已被平仓")
# 检查数据库中是否有未平仓的记录,如果有则更新
open_trades = Trade.get_by_symbol(symbol, status='open')
if open_trades:
trade = open_trades[0]
# 获取当前价格作为平仓价格
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(trade['entry_price'])
# 计算盈亏
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
# 更新数据库
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=None
)
logger.info(f"✓ 已更新数据库记录(币安无持仓但数据库有记录)")
return {
"message": f"{symbol} 平仓操作完成(币安账户中没有持仓,可能已被平仓)",
"symbol": symbol,
"status": "closed"
}
# 获取交易对精度信息,调整数量精度(平仓不要向上补 minQty避免超过持仓数量
symbol_info = None
try:
symbol_info = await client.get_symbol_info(symbol)
except Exception:
symbol_info = None
def _adjust_close_qty(qty: float) -> float:
if qty is None:
return 0.0
q = float(qty)
if not symbol_info:
return q
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = float(symbol_info.get('stepSize', 0) or 0)
if step_size and step_size > 0:
# 向下取整,避免超过持仓
q = float(int(q / step_size)) * step_size
else:
q = round(q, quantity_precision)
q = round(q, quantity_precision)
return q
# 组装平仓订单(对冲模式可能同币种有 LONG/SHORT 两个仓位,这里一并平掉)
orders = []
order_ids = []
# 如果 dual_side 无法读取,按 raw_positions 是否包含 positionSide 来推断
if dual_side is None:
if any(isinstance(p, dict) and (p.get("positionSide") in ("LONG", "SHORT")) for _, p in nonzero_positions):
dual_side = True
else:
dual_side = False
logger.info(f"{symbol} 持仓模式: {'HEDGE(对冲)' if dual_side else 'ONE-WAY(单向)'}")
# 构造待平仓列表:[(positionSide, amt)]
to_close = []
if dual_side:
for amt, p in nonzero_positions:
ps = (p.get("positionSide") or "").upper()
if ps not in ("LONG", "SHORT"):
ps = "LONG" if amt > 0 else "SHORT"
to_close.append((ps, amt))
else:
# 单向模式只应存在一个净仓位;如果有多个,按合计处理
net_amt = sum([amt for amt, _ in nonzero_positions])
if abs(net_amt) > 0:
to_close.append(("BOTH", net_amt))
logger.info(f"✓ 币安账户中 {symbol} 待平仓: {to_close}")
for ps, amt in to_close:
side = 'SELL' if float(amt) > 0 else 'BUY'
quantity = abs(float(amt))
quantity = _adjust_close_qty(quantity)
if quantity <= 0:
logger.warning(f"{symbol} 平仓数量调整后为0跳过该仓位: positionSide={ps}, amt={amt}")
continue
order_params = {
"symbol": symbol,
"side": side,
"type": "MARKET",
"quantity": quantity,
}
# 对冲模式必须传 positionSide=LONG/SHORT并且某些账户会 -1106因此这里不再传 reduceOnly
if dual_side and ps in ("LONG", "SHORT"):
order_params["positionSide"] = ps
else:
# 单向模式用 reduceOnly 防止反向开仓
order_params["reduceOnly"] = True
logger.info(
f"开始执行平仓下单: {symbol} side={side} qty={quantity} "
f"positionSide={order_params.get('positionSide')} reduceOnly={order_params.get('reduceOnly')}"
)
try:
order = await client.client.futures_create_order(**order_params)
if not order:
raise RuntimeError("币安API返回 None")
orders.append(order)
oid = order.get("orderId")
if oid:
order_ids.append(oid)
except Exception as order_error:
error_msg = f"{symbol} 平仓失败:下单异常 - {str(order_error)}"
logger.error(error_msg)
logger.error(f" 错误类型: {type(order_error).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=error_msg)
if not orders:
raise HTTPException(status_code=400, detail=f"{symbol} 无可平仓的有效仓位数量调整后为0或无持仓")
logger.info(f"{symbol} 平仓订单已提交: {order_ids}")
# 等待订单成交,获取实际成交价格
import asyncio
await asyncio.sleep(1)
# 获取订单详情(可能多个订单,按订单号分别取价)
exit_prices = {}
for oid in order_ids:
try:
order_info = await client.client.futures_get_order(symbol=symbol, orderId=oid)
if order_info:
p = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0))
if p <= 0 and order_info.get('fills'):
total_qty = 0
total_value = 0
for fill in order_info.get('fills', []):
qty = float(fill.get('qty', 0))
price = float(fill.get('price', 0))
total_qty += qty
total_value += qty * price
if total_qty > 0:
p = total_value / total_qty
if p > 0:
exit_prices[oid] = p
except Exception as e:
logger.warning(f"获取订单详情失败 (orderId={oid}): {e}")
# 兜底:如果无法获取订单价格,使用当前价格
fallback_exit_price = None
try:
ticker = await client.get_ticker_24h(symbol)
fallback_exit_price = float(ticker['price']) if ticker else None
except Exception:
fallback_exit_price = None
# 更新数据库记录
open_trades = Trade.get_by_symbol(symbol, status='open', account_id=account_id)
if open_trades:
# 对冲模式可能有多条 tradeBUY/LONG 和 SELL/SHORT尽量按方向匹配订单更新
used_order_ids = set()
for trade in open_trades:
try:
entry_price = float(trade['entry_price'])
trade_quantity = float(trade['quantity'])
except Exception:
continue
# 选择一个未使用的 orderId如果只有一个就复用
chosen_oid = None
for oid in order_ids:
if oid not in used_order_ids:
chosen_oid = oid
break
if chosen_oid is None and order_ids:
chosen_oid = order_ids[0]
if chosen_oid:
used_order_ids.add(chosen_oid)
exit_price = exit_prices.get(chosen_oid) if chosen_oid else None
if not exit_price:
exit_price = fallback_exit_price or entry_price
# 计算盈亏(数据库侧依旧按名义盈亏;收益率展示用保证金口径在前端/统计里另算)
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * trade_quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * trade_quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=chosen_oid
)
logger.info(f"✓ 已更新数据库记录 trade_id={trade['id']} order_id={chosen_oid} (盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)")
logger.info(f"{symbol} 平仓成功")
return {
"message": f"{symbol} 平仓成功",
"symbol": symbol,
"status": "closed"
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"平仓失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"平仓操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/positions/close-all")
async def close_all_positions(account_id: int = Depends(get_account_id)):
"""一键全平:平仓所有持仓"""
try:
logger.info("=" * 60)
logger.info("收到一键全平请求")
logger.info("=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
logger.info("✓ 成功导入交易系统模块")
except ImportError as import_error:
logger.warning(f"首次导入失败: {import_error}尝试从trading_system路径导入")
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
logger.info("✓ 从trading_system路径导入成功")
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 获取所有持仓
positions = await client.get_open_positions()
if not positions:
logger.info("当前没有持仓")
return {
"message": "当前没有持仓",
"closed": 0,
"failed": 0,
"results": []
}
logger.info(f"发现 {len(positions)} 个持仓,开始逐一平仓...")
results = []
closed_count = 0
failed_count = 0
for position in positions:
symbol = position.get('symbol')
position_amt = float(position.get('positionAmt', 0))
if abs(position_amt) <= 0:
continue
try:
logger.info(f"开始平仓 {symbol} (数量: {position_amt})...")
# 确定平仓方向
side = 'SELL' if position_amt > 0 else 'BUY'
# 使用市价单平仓
order = await client.place_order(
symbol=symbol,
side=side,
order_type='MARKET',
quantity=abs(position_amt),
reduce_only=True
)
if order and order.get('orderId'):
logger.info(f"{symbol} 平仓订单已提交: {order.get('orderId')}")
# 获取成交价格
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
if not exit_price:
# 如果订单中没有价格,获取当前价格
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else 0
# 更新数据库记录
open_trades = Trade.get_by_symbol(symbol, status='open')
for trade in open_trades:
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade['id'],
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order.get('orderId')
)
logger.info(f"✓ 已更新数据库记录 trade_id={trade['id']} (盈亏: {pnl:.2f} USDT)")
closed_count += 1
results.append({
"symbol": symbol,
"status": "success",
"order_id": order.get('orderId'),
"message": f"{symbol} 平仓成功"
})
else:
logger.warning(f"{symbol} 平仓订单提交失败")
failed_count += 1
results.append({
"symbol": symbol,
"status": "failed",
"message": f"{symbol} 平仓失败: 订单未提交"
})
except Exception as e:
logger.error(f"{symbol} 平仓失败: {e}")
failed_count += 1
results.append({
"symbol": symbol,
"status": "failed",
"message": f"{symbol} 平仓失败: {str(e)}"
})
logger.info(f"一键全平完成: 成功 {closed_count} / 失败 {failed_count}")
return {
"message": f"一键全平完成: 成功 {closed_count} / 失败 {failed_count}",
"closed": closed_count,
"failed": failed_count,
"results": results
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"一键全平失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"一键全平操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/positions/{symbol}/open")
async def open_position_from_recommendation(
symbol: str,
entry_price: float = Query(..., description="入场价格"),
stop_loss_price: float = Query(..., description="止损价格"),
direction: str = Query(..., description="交易方向: BUY 或 SELL"),
notional_usdt: float = Query(..., description="下单名义价值USDT"),
leverage: int = Query(10, description="杠杆倍数"),
account_id: int = Depends(get_account_id)
):
"""根据推荐信息手动开仓"""
try:
logger.info("=" * 60)
logger.info(f"收到手动开仓请求: {symbol}")
logger.info(f" 入场价: {entry_price}, 止损价: {stop_loss_price}")
logger.info(f" 方向: {direction}, 名义价值: {notional_usdt} USDT, 杠杆: {leverage}x")
logger.info("=" * 60)
if direction not in ('BUY', 'SELL'):
raise HTTPException(status_code=400, detail="交易方向必须是 BUY 或 SELL")
if notional_usdt <= 0:
raise HTTPException(status_code=400, detail="下单名义价值必须大于0")
if entry_price <= 0 or stop_loss_price <= 0:
raise HTTPException(status_code=400, detail="入场价和止损价必须大于0")
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
# 导入数据库模型
from database.models import Trade
# 创建客户端
logger.info(f"创建BinanceClient (testnet={use_testnet})...")
client = BinanceClient(
api_key=api_key,
api_secret=api_secret,
testnet=use_testnet
)
logger.info("连接币安API...")
await client.connect()
logger.info("✓ 币安API连接成功")
try:
# 设置杠杆
await client.set_leverage(symbol, leverage)
logger.info(f"✓ 已设置杠杆: {leverage}x")
# 获取交易对信息
symbol_info = await client.get_symbol_info(symbol)
if not symbol_info:
raise HTTPException(status_code=400, detail=f"无法获取 {symbol} 的交易对信息")
# 计算下单数量:数量 = 名义价值 / 入场价
quantity = notional_usdt / entry_price
logger.info(f"计算下单数量: {quantity:.8f} (名义价值: {notional_usdt} USDT / 入场价: {entry_price})")
# 调整数量精度
adjusted_quantity = client._adjust_quantity_precision(quantity, symbol_info)
if adjusted_quantity <= 0:
raise HTTPException(status_code=400, detail=f"调整后的数量无效: {adjusted_quantity}")
logger.info(f"调整后的数量: {adjusted_quantity:.8f}")
# 检查最小名义价值
min_notional = symbol_info.get('minNotional', 5.0)
actual_notional = adjusted_quantity * entry_price
if actual_notional < min_notional:
raise HTTPException(
status_code=400,
detail=f"订单名义价值不足: {actual_notional:.2f} USDT < 最小要求: {min_notional:.2f} USDT"
)
# 下 limit 订单
logger.info(f"开始下 limit 订单: {symbol} {direction} {adjusted_quantity} @ {entry_price}")
order = await client.place_order(
symbol=symbol,
side=direction,
quantity=adjusted_quantity,
order_type='LIMIT',
price=entry_price,
reduce_only=False
)
if not order:
raise HTTPException(status_code=500, detail="下单失败币安API返回None")
order_id = order.get('orderId')
logger.info(f"✓ 订单已提交: orderId={order_id}")
# 等待订单成交最多等待30秒
import asyncio
filled_order = None
for i in range(30):
await asyncio.sleep(1)
try:
order_status = await client.client.futures_get_order(symbol=symbol, orderId=order_id)
if order_status.get('status') == 'FILLED':
filled_order = order_status
logger.info(f"✓ 订单已成交: orderId={order_id}")
break
elif order_status.get('status') in ('CANCELED', 'EXPIRED', 'REJECTED'):
raise HTTPException(status_code=400, detail=f"订单未成交,状态: {order_status.get('status')}")
except Exception as e:
if i == 29: # 最后一次尝试
logger.warning(f"订单状态查询失败或未成交: {e}")
continue
if not filled_order:
logger.warning(f"订单 {order_id} 在30秒内未成交但订单已提交")
return {
"message": f"{symbol} 订单已提交但未成交(请稍后检查)",
"symbol": symbol,
"order_id": order_id,
"status": "pending"
}
# 订单已成交,保存到数据库
avg_price = float(filled_order.get('avgPrice', entry_price))
executed_qty = float(filled_order.get('executedQty', adjusted_quantity))
# 计算实际使用的名义价值和保证金
actual_notional = executed_qty * avg_price
actual_margin = actual_notional / leverage
# 保存交易记录
# trade_id = Trade.create(
# account_id=account_id,
# symbol=symbol,
# side=direction,
# quantity=executed_qty,
# entry_price=avg_price,
# leverage=leverage,
# entry_order_id=order_id,
# entry_reason='manual_from_recommendation',
# notional_usdt=actual_notional,
# margin_usdt=actual_margin,
# stop_loss_price=stop_loss_price,
# # 如果有推荐中的止盈价,也可以传入,这里先不传
# )
# logger.info(f"✓ 交易记录已保存: trade_id={trade_id}")
# 尝试挂止损/止盈保护单(如果系统支持)
# try:
# # 这里可以调用 _ensure_exchange_sltp_for_symbol 来挂保护单
# # 但需要先获取持仓信息来确定方向
# positions = await client.get_open_positions()
# position = next((p for p in positions if p['symbol'] == symbol), None)
# if position:
# # 可以在这里挂止损单,但需要知道 take_profit_price
# # 暂时只记录止损价到数据库,由系统自动监控
# logger.info(f"止损价已记录到数据库: {stop_loss_price}")
# except Exception as e:
# logger.warning(f"挂保护单失败(不影响开仓): {e}")
return {
"message": f"{symbol} 开仓成功",
"symbol": symbol,
"order_id": order_id,
"trade_id": None,
"quantity": executed_qty,
"entry_price": avg_price,
"notional_usdt": actual_notional,
"margin_usdt": actual_margin,
"status": "filled"
}
finally:
logger.info("断开币安API连接...")
await client.disconnect()
logger.info("✓ 已断开连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"开仓失败: {str(e)}"
logger.error("=" * 60)
logger.error(f"开仓操作异常: {error_msg}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error("=" * 60, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/positions/sync")
async def sync_positions(account_id: int = Depends(get_account_id)):
"""同步币安实际持仓状态与数据库状态"""
try:
logger.info("=" * 60)
logger.info("收到持仓状态同步请求")
logger.info("=" * 60)
api_key, api_secret, use_testnet, status = Account.get_credentials(account_id)
if (not api_key or not api_secret) and status == "active":
error_msg = f"API密钥未配置account_id={account_id}"
logger.warning(f"[account_id={account_id}] {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 导入必要的模块
try:
from binance_client import BinanceClient
except ImportError:
trading_system_path = project_root / 'trading_system'
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
# 导入数据库模型
from database.models import Trade
# 创建客户端
client = BinanceClient(api_key=api_key, api_secret=api_secret, testnet=use_testnet)
logger.info("连接币安API...")
await client.connect()
try:
# 1. 获取币安实际持仓
binance_positions = await client.get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions if float(p.get('positionAmt', 0)) != 0}
logger.info(f"币安实际持仓: {len(binance_symbols)}")
if binance_symbols:
logger.info(f" 持仓列表: {', '.join(binance_symbols)}")
# 2. 获取数据库中状态为open的交易记录
db_open_trades = Trade.get_all(status='open', account_id=account_id)
db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.info(f"数据库open状态: {len(db_open_symbols)}")
if db_open_symbols:
logger.info(f" 持仓列表: {', '.join(db_open_symbols)}")
# 3. 找出在数据库中open但在币安已不存在的持仓需要更新为closed
missing_in_binance = db_open_symbols - binance_symbols
updated_count = 0
if missing_in_binance:
logger.info(f"发现 {len(missing_in_binance)} 个持仓在数据库中是open但币安已不存在: {', '.join(missing_in_binance)}")
for symbol in missing_in_binance:
try:
# 尝试从币安历史订单获取“真实平仓信息”(价格/时间/原因/订单号)
latest_close_order = None
try:
end_time_ms = int(time.time() * 1000)
start_time_ms = end_time_ms - (7 * 24 * 60 * 60 * 1000)
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time_ms,
endTime=end_time_ms,
)
if isinstance(orders, list) and orders:
close_orders = [
o for o in orders
if isinstance(o, dict)
and o.get("reduceOnly") is True
and o.get("status") == "FILLED"
]
if close_orders:
close_orders.sort(key=lambda x: x.get("updateTime", 0), reverse=True)
latest_close_order = close_orders[0]
except Exception:
latest_close_order = None
# 获取该交易对的所有open记录
open_trades = Trade.get_by_symbol(symbol, status='open')
for trade in open_trades:
trade_id = trade['id']
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
# 获取当前价格作为平仓价格
exit_price = None
exit_order_id = None
exit_time_ts = None
exit_reason = "sync"
otype = ""
if latest_close_order and isinstance(latest_close_order, dict):
try:
exit_price = float(latest_close_order.get("avgPrice", 0) or 0) or None
except Exception:
exit_price = None
exit_order_id = latest_close_order.get("orderId") or None
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
try:
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
# 检查订单的 reduceOnly 字段:如果是 true说明是自动平仓不应该标记为 manual
is_reduce_only = latest_close_order.get("reduceOnly", False) if latest_close_order else False
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
# 如果是 reduceOnly 订单,说明是自动平仓(可能是保护单触发的),先标记为 sync后续用价格判断
if is_reduce_only:
exit_reason = "sync" # 临时标记,后续用价格判断
else:
exit_reason = "manual" # 非 reduceOnly 的 MARKET/LIMIT 订单才是真正的手动平仓
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
# 这对于保护单触发的 MARKET 订单特别重要
if exit_reason == "sync" or exit_reason == "manual":
try:
def _close_to(a: float, b: float, max_pct: float = 0.02) -> bool:
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(exit_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
# 优先检查止损
if sl is not None and _close_to(ep, float(sl), max_pct=0.02):
exit_reason = "stop_loss"
# 然后检查止盈
elif tp is not None and _close_to(ep, float(tp), max_pct=0.02):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.02):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.02):
exit_reason = "take_profit"
# 如果价格接近入场价,可能是移动止损触发的
elif is_reduce_only:
entry_price_val = float(trade.get("entry_price", 0) or 0)
if entry_price_val > 0 and _close_to(ep, entry_price_val, max_pct=0.01):
exit_reason = "trailing_stop"
except Exception:
pass
if not exit_price or exit_price <= 0:
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else entry_price
# 计算盈亏
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
else:
pnl = (entry_price - exit_price) * quantity
# 计算基于保证金的盈亏百分比
leverage = float(trade.get('leverage', 10))
entry_value = entry_price * quantity
margin = entry_value / leverage if leverage > 0 else entry_value
pnl_percent_margin = (pnl / margin * 100) if margin > 0 else 0
# 更新数据库记录
duration_minutes = None
try:
et = trade.get("entry_time")
if et is not None and exit_time_ts is not None:
et_i = int(et)
if exit_time_ts >= et_i:
duration_minutes = int((exit_time_ts - et_i) / 60)
except Exception:
duration_minutes = None
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent_margin, # 使用基于保证金的盈亏百分比
exit_order_id=exit_order_id,
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
updated_count += 1
logger.info(
f"{symbol} 已更新为closed (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent_margin:.2f}% of margin, "
f"原因: {exit_reason}, 类型: {otype or '-'}"
f")"
)
except Exception as e:
logger.error(f"{symbol} 更新失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.info("✓ 数据库与币安状态一致,无需更新")
# 4. 检查币安有但数据库没有记录的持仓
missing_in_db = binance_symbols - db_open_symbols
if missing_in_db:
logger.info(f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: {', '.join(missing_in_db)}")
logger.info(" 这些持仓可能是手动开仓的,建议手动处理")
result = {
"message": "持仓状态同步完成",
"binance_positions": len(binance_symbols),
"db_open_positions": len(db_open_symbols),
"updated_to_closed": updated_count,
"missing_in_binance": list(missing_in_binance),
"missing_in_db": list(missing_in_db)
}
logger.info("=" * 60)
logger.info("持仓状态同步完成!")
logger.info(f"结果: {result}")
logger.info("=" * 60)
return result
finally:
await client.disconnect()
logger.info("✓ 已断开币安API连接")
except HTTPException:
raise
except Exception as e:
error_msg = f"同步持仓状态失败: {str(e)}"
logger.error(error_msg, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg)