This commit is contained in:
薇薇安 2026-01-17 10:07:31 +08:00
parent be44cd02a7
commit fccc9b2717
3 changed files with 102 additions and 28 deletions

View File

@ -100,11 +100,16 @@ class Database:
def execute_update(self, query, params=None):
"""执行更新,返回影响行数"""
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
affected = cursor.execute(query, params)
conn.commit()
return affected
except Exception as e:
# 重新抛出异常,让调用者处理(如 update_exit 中的异常处理)
# 不要在这里记录为"数据库连接错误",因为这可能是业务逻辑错误(如唯一约束冲突)
raise
def execute_many(self, query, params_list):
"""批量执行"""

View File

@ -129,7 +129,12 @@ class Trade:
if exit_order_id is not None:
try:
existing_trade = Trade.get_by_exit_order_id(exit_order_id)
if existing_trade and existing_trade['id'] != trade_id:
if existing_trade:
if existing_trade['id'] == trade_id:
# 如果 exit_order_id 属于当前交易记录,说明已经更新过了,直接返回
logger.debug(f"交易记录 {trade_id} 的 exit_order_id {exit_order_id} 已存在,跳过更新")
return
else:
# 如果 exit_order_id 已被其他交易记录使用,记录警告但不更新 exit_order_id
logger.warning(
f"交易记录 {trade_id} 的 exit_order_id {exit_order_id} 已被交易记录 {existing_trade['id']} 使用,"

View File

@ -344,6 +344,41 @@ class PositionManager:
f"(涨跌幅: {change_percent:.2f}%)"
)
# 验证持仓是否真的在币安存在
try:
await asyncio.sleep(0.5) # 等待一小段时间让币安更新持仓
positions = await self.client.get_open_positions()
binance_position = next(
(p for p in positions if p['symbol'] == symbol and float(p.get('positionAmt', 0)) != 0),
None
)
if binance_position:
logger.info(
f"{symbol} [开仓验证] ✓ 币安持仓确认: "
f"数量={float(binance_position.get('positionAmt', 0)):.4f}, "
f"入场价={float(binance_position.get('entryPrice', 0)):.4f}"
)
else:
logger.warning(
f"{symbol} [开仓验证] ⚠️ 币安账户中没有持仓,可能订单未成交或被立即平仓"
)
# 清理本地记录
if symbol in self.active_positions:
del self.active_positions[symbol]
# 如果数据库已保存,标记为取消
if trade_id and DB_AVAILABLE and Trade:
try:
db.execute_update(
"UPDATE trades SET status = 'cancelled' WHERE id = %s",
(trade_id,)
)
logger.info(f"{symbol} [开仓验证] 已更新数据库状态为 cancelled (ID: {trade_id})")
except Exception as e:
logger.warning(f"{symbol} [开仓验证] 更新数据库状态失败: {e}")
return None
except Exception as verify_error:
logger.warning(f"{symbol} [开仓验证] 验证持仓时出错: {verify_error},继续使用本地记录")
# 启动WebSocket实时监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
@ -1027,6 +1062,8 @@ class PositionManager:
if exit_order_id:
logger.info(f"{symbol} [状态同步] 找到平仓订单号: {exit_order_id}")
# 使用 try-except 包裹,确保异常被正确处理
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
@ -1035,6 +1072,33 @@ class PositionManager:
pnl_percent=pnl_percent,
exit_order_id=exit_order_id # 保存币安平仓订单号
)
except Exception as update_error:
# update_exit 内部已经有异常处理,但如果仍然失败,记录错误但不中断同步流程
error_str = str(update_error)
if "Duplicate entry" in error_str and "exit_order_id" in error_str:
logger.warning(
f"{symbol} [状态同步] ⚠️ exit_order_id {exit_order_id} 唯一约束冲突,"
f"update_exit 内部处理失败,尝试不更新 exit_order_id"
)
# 再次尝试,不更新 exit_order_id
try:
from database.connection import db
from database.models import get_beijing_time
exit_time = get_beijing_time()
db.execute_update(
"""UPDATE trades
SET exit_price = %s, exit_time = %s,
exit_reason = %s, pnl = %s, pnl_percent = %s, status = 'closed'
WHERE id = %s""",
(exit_price, exit_time, 'sync', pnl, pnl_percent, trade_id)
)
logger.info(f"{symbol} [状态同步] ✓ 已更新(跳过 exit_order_id")
except Exception as retry_error:
logger.error(f"{symbol} [状态同步] ❌ 重试更新也失败: {retry_error}")
raise
else:
# 其他错误,重新抛出
raise
logger.info(
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "