This commit is contained in:
薇薇安 2026-01-19 20:30:57 +08:00
parent e3b6dfb65d
commit 4023f7807e
4 changed files with 263 additions and 40 deletions

View File

@ -788,6 +788,29 @@ async def sync_positions():
for symbol in missing_in_binance:
try:
# 尝试从币安历史订单获取“真实平仓信息”(价格/时间/原因/订单号)
latest_close_order = None
try:
end_time_ms = int(time.time() * 1000)
start_time_ms = end_time_ms - (7 * 24 * 60 * 60 * 1000)
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time_ms,
endTime=end_time_ms,
)
if isinstance(orders, list) and orders:
close_orders = [
o for o in orders
if isinstance(o, dict)
and o.get("reduceOnly") is True
and o.get("status") == "FILLED"
]
if close_orders:
close_orders.sort(key=lambda x: x.get("updateTime", 0), reverse=True)
latest_close_order = close_orders[0]
except Exception:
latest_close_order = None
# 获取该交易对的所有open记录
open_trades = Trade.get_by_symbol(symbol, status='open')
@ -797,8 +820,42 @@ async def sync_positions():
quantity = float(trade['quantity'])
# 获取当前价格作为平仓价格
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else entry_price
exit_price = None
exit_order_id = None
exit_time_ts = None
exit_reason = "sync"
otype = ""
if latest_close_order and isinstance(latest_close_order, dict):
try:
exit_price = float(latest_close_order.get("avgPrice", 0) or 0) or None
except Exception:
exit_price = None
exit_order_id = latest_close_order.get("orderId") or None
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
try:
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
exit_reason = "manual"
if not exit_price or exit_price <= 0:
ticker = await client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else entry_price
# 计算盈亏
if trade['side'] == 'BUY':
@ -813,18 +870,32 @@ async def sync_positions():
pnl_percent_margin = (pnl / margin * 100) if margin > 0 else 0
# 更新数据库记录
duration_minutes = None
try:
et = trade.get("entry_time")
if et is not None and exit_time_ts is not None:
et_i = int(et)
if exit_time_ts >= et_i:
duration_minutes = int((exit_time_ts - et_i) / 60)
except Exception:
duration_minutes = None
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason='sync', # 标记为同步平仓
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent_margin, # 使用基于保证金的盈亏百分比
exit_order_id=None
exit_order_id=exit_order_id,
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
updated_count += 1
logger.info(
f"{symbol} 已更新为closed (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent_margin:.2f}% of margin)"
f"盈亏: {pnl:.2f} USDT, {pnl_percent_margin:.2f}% of margin, "
f"原因: {exit_reason}, 类型: {otype or '-'}"
f")"
)
except Exception as e:
logger.error(f"{symbol} 更新失败: {e}")

View File

@ -231,16 +231,29 @@ async def get_trade_stats(
# 平仓原因分布(用来快速定位胜率低的主要来源:止损/止盈/同步等)
exit_reason_counts = Counter((t.get("exit_reason") or "unknown") for t in meaningful_trades)
# 平均持仓时长(分钟):优先使用 duration_minutes 字段(若为空则跳过)
# 平均持仓时长(分钟):
# - 优先使用 duration_minutes若历史没写入则用 exit_time-entry_time 实时计算)
durations = []
for t in meaningful_trades:
dm = t.get("duration_minutes")
try:
if dm is None:
if dm is not None:
try:
dm_f = float(dm)
if dm_f >= 0:
durations.append(dm_f)
continue
dm_f = float(dm)
if dm_f >= 0:
durations.append(dm_f)
except Exception:
pass
et = t.get("entry_time")
xt = t.get("exit_time")
try:
if et is None or xt is None:
continue
et_i = int(et)
xt_i = int(xt)
if xt_i >= et_i:
durations.append((xt_i - et_i) / 60.0)
except Exception:
continue
avg_duration_minutes = (sum(durations) / len(durations)) if durations else None
@ -384,6 +397,14 @@ async def sync_trades_from_binance(
avg_price = float(order.get('avgPrice', 0))
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
reduce_only = order.get('reduceOnly', False)
otype = str(order.get('type') or order.get('origType') or '').upper()
exit_time_ts = None
try:
ms = order.get('updateTime') or order.get('time')
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
if quantity <= 0 or avg_price <= 0:
continue
@ -393,15 +414,15 @@ async def sync_trades_from_binance(
# 这是平仓订单
# 首先检查是否已经通过订单号同步过(避免重复)
existing_trade = Trade.get_by_exit_order_id(order_id)
if existing_trade:
logger.debug(f"订单 {order_id} 已同步过,跳过")
if existing_trade and (existing_trade.get("exit_reason") not in (None, "", "sync")):
logger.debug(f"订单 {order_id} 已同步过且 exit_reason={existing_trade.get('exit_reason')},跳过")
continue
# 查找数据库中该交易对的open状态记录
open_trades = Trade.get_by_symbol(symbol, status='open')
if open_trades:
if existing_trade or open_trades:
# 找到匹配的交易记录通过symbol匹配如果有多个则取最近的
trade = open_trades[0] # 取第一个
trade = existing_trade or open_trades[0] # 取第一个
trade_id = trade['id']
# 计算盈亏
@ -418,17 +439,67 @@ async def sync_trades_from_binance(
pnl = (entry_price - avg_price) * actual_quantity
pnl_percent = ((entry_price - avg_price) / entry_price) * 100
# 细分 exit_reason优先使用币安订单类型其次用价格接近止损/止盈做兜底
exit_reason = "sync"
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
exit_reason = "manual"
try:
def _close_to(a: float, b: float, max_pct: float = 0.01) -> bool:
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(avg_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
if sl is not None and _close_to(ep, float(sl), max_pct=0.01):
exit_reason = "stop_loss"
elif tp is not None and _close_to(ep, float(tp), max_pct=0.01):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.01):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.01):
exit_reason = "take_profit"
except Exception:
pass
# 持仓持续时间(分钟)
duration_minutes = None
try:
et = trade.get("entry_time")
if et is not None and exit_time_ts is not None:
et_i = int(et)
if exit_time_ts >= et_i:
duration_minutes = int((exit_time_ts - et_i) / 60)
except Exception:
duration_minutes = None
# 更新数据库(包含订单号)
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_price,
exit_reason='sync',
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order_id # 保存订单号,确保唯一性
exit_order_id=order_id, # 保存订单号,确保唯一性
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
updated_count += 1
logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})")
logger.debug(
f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, "
f"类型: {otype or '-'}, 原因: {exit_reason}, 成交价: {avg_price:.4f})"
)
else:
# 这是开仓订单,检查数据库中是否已存在(通过订单号)
existing_trade = Trade.get_by_entry_order_id(order_id)

View File

@ -186,7 +186,17 @@ class Trade:
return db.execute_one("SELECT LAST_INSERT_ID() as id")['id']
@staticmethod
def update_exit(trade_id, exit_price, exit_reason, pnl, pnl_percent, exit_order_id=None, strategy_type=None, duration_minutes=None):
def update_exit(
trade_id,
exit_price,
exit_reason,
pnl,
pnl_percent,
exit_order_id=None,
strategy_type=None,
duration_minutes=None,
exit_time_ts=None,
):
"""更新平仓信息(使用北京时间)
Args:
@ -199,7 +209,11 @@ class Trade:
注意如果 exit_order_id 已存在且属于其他交易记录会跳过更新 exit_order_id 以避免唯一约束冲突
"""
exit_time = get_beijing_time()
# exit_time_ts: 允许外部传入“真实成交时间”Unix秒以便统计持仓时长更准确
try:
exit_time = int(exit_time_ts) if exit_time_ts is not None else get_beijing_time()
except Exception:
exit_time = get_beijing_time()
# 如果提供了 exit_order_id先检查是否已被其他交易记录使用
if exit_order_id is not None:
@ -207,9 +221,11 @@ class Trade:
existing_trade = Trade.get_by_exit_order_id(exit_order_id)
if existing_trade:
if existing_trade['id'] == trade_id:
# 如果 exit_order_id 属于当前交易记录,说明已经更新过了,直接返回
logger.debug(f"交易记录 {trade_id} 的 exit_order_id {exit_order_id} 已存在,跳过更新")
return
# exit_order_id 属于当前交易记录:允许继续更新(比如补写 exit_reason / exit_time / duration
# 不需要提前 return
logger.debug(
f"交易记录 {trade_id} 的 exit_order_id {exit_order_id} 已存在,将继续更新其他字段"
)
else:
# 如果 exit_order_id 已被其他交易记录使用,记录警告但不更新 exit_order_id
logger.warning(

View File

@ -1672,40 +1672,105 @@ class PositionManager:
# 从历史订单中获取平仓订单号
exit_order_id = None
latest_close_order = None
if close_orders:
exit_order_id = close_orders[0].get('orderId')
latest_close_order = close_orders[0]
if exit_order_id:
logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}")
# 使用 try-except 包裹,确保异常被正确处理
try:
# 计算持仓持续时间和策略类型
entry_time = trade.get('entry_time')
# exit_reason 细分:优先看币安平仓订单类型,其次用价格接近止损/止盈价做兜底
exit_reason = "sync"
exit_time_ts = None
try:
if latest_close_order and isinstance(latest_close_order, dict):
otype = str(
latest_close_order.get("type")
or latest_close_order.get("origType")
or ""
).upper()
if "TRAILING" in otype:
exit_reason = "trailing_stop"
elif "TAKE_PROFIT" in otype:
exit_reason = "take_profit"
elif "STOP" in otype:
# STOP / STOP_MARKET 通常对应止损触发
exit_reason = "stop_loss"
elif otype in ("MARKET", "LIMIT"):
exit_reason = "manual"
ms = latest_close_order.get("updateTime") or latest_close_order.get("time")
try:
if ms:
exit_time_ts = int(int(ms) / 1000)
except Exception:
exit_time_ts = None
except Exception:
# 保持默认 sync
pass
# 价格兜底:如果能明显命中止损/止盈价,则覆盖 exit_reason
try:
def _close_to(a: float, b: float, max_pct: float = 0.01) -> bool:
if a <= 0 or b <= 0:
return False
return abs((a - b) / b) <= max_pct
ep = float(exit_price or 0)
if ep > 0:
sl = trade.get("stop_loss_price")
tp = trade.get("take_profit_price")
tp1 = trade.get("take_profit_1")
tp2 = trade.get("take_profit_2")
if sl is not None and _close_to(ep, float(sl), max_pct=0.01):
exit_reason = "stop_loss"
elif tp is not None and _close_to(ep, float(tp), max_pct=0.01):
exit_reason = "take_profit"
elif tp1 is not None and _close_to(ep, float(tp1), max_pct=0.01):
exit_reason = "take_profit"
elif tp2 is not None and _close_to(ep, float(tp2), max_pct=0.01):
exit_reason = "take_profit"
except Exception:
pass
# 持仓持续时间(分钟):优先用币安订单时间,否则用当前时间
entry_time = trade.get("entry_time")
duration_minutes = None
if entry_time:
try:
from datetime import datetime
if isinstance(entry_time, str):
entry_dt = datetime.strptime(entry_time, '%Y-%m-%d %H:%M:%S')
try:
et = None
if isinstance(entry_time, (int, float)):
et = int(entry_time)
elif isinstance(entry_time, str) and entry_time.strip():
# 兼容旧格式:字符串时间戳/日期字符串
s = entry_time.strip()
if s.isdigit():
et = int(s)
else:
entry_dt = entry_time
exit_dt = get_beijing_time() # 使用北京时间计算持续时间
duration = exit_dt - entry_dt
duration_minutes = int(duration.total_seconds() / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
from datetime import datetime
et = int(datetime.fromisoformat(s).timestamp())
xt = int(exit_time_ts) if exit_time_ts is not None else int(get_beijing_time())
if et is not None and xt >= et:
duration_minutes = int((xt - et) / 60)
except Exception as e:
logger.debug(f"计算持仓持续时间失败: {e}")
strategy_type = 'trend_following' # 默认策略类型
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason='sync', # 标记为同步更新
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=exit_order_id, # 保存币安平仓订单号
strategy_type=strategy_type,
duration_minutes=duration_minutes
duration_minutes=duration_minutes,
exit_time_ts=exit_time_ts,
)
except Exception as update_error:
# update_exit 内部已经有异常处理,但如果仍然失败,记录错误但不中断同步流程
@ -1719,13 +1784,13 @@ class PositionManager:
try:
from database.connection import db
from database.models import get_beijing_time
exit_time = get_beijing_time()
exit_time = int(exit_time_ts) if exit_time_ts is not None else get_beijing_time()
db.execute_update(
"""UPDATE trades
SET exit_price = %s, exit_time = %s,
exit_reason = %s, pnl = %s, pnl_percent = %s, status = 'closed'
WHERE id = %s""",
(exit_price, exit_time, 'sync', pnl, pnl_percent, trade_id)
(exit_price, exit_time, exit_reason, pnl, pnl_percent, trade_id)
)
logger.info(f"{symbol} [状态同步] ✓ 已更新(跳过 exit_order_id")
except Exception as retry_error: