This commit is contained in:
薇薇安 2026-01-17 10:14:52 +08:00
parent fccc9b2717
commit 5d0b0a9974
3 changed files with 208 additions and 30 deletions

View File

@ -55,10 +55,22 @@ def setup_logging():
# 清除现有的处理器 # 清除现有的处理器
root_logger.handlers.clear() root_logger.handlers.clear()
# 创建格式器 # 设置日志时间格式为北京时间UTC+8
formatter = logging.Formatter( from datetime import datetime, timezone, timedelta
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S' class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
# 转换为北京时间UTC+8
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
# 创建格式器(使用北京时间)
formatter = BeijingTimeFormatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
# 文件处理器(带轮转) # 文件处理器(带轮转)

View File

@ -31,13 +31,36 @@ if not Path(log_file).is_absolute():
project_root = Path(__file__).parent.parent project_root = Path(__file__).parent.parent
log_file = project_root / log_file log_file = project_root / log_file
# 设置日志时间格式为北京时间UTC+8
import time
from datetime import timezone, timedelta
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
# 转换为北京时间UTC+8
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
from datetime import datetime
# 创建格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 创建处理器
file_handler = logging.FileHandler(str(log_file), encoding='utf-8')
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 配置日志
logging.basicConfig( logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL), level=getattr(logging, config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[file_handler, console_handler]
handlers=[
logging.FileHandler(str(log_file), encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -999,20 +999,50 @@ class PositionManager:
# 4. 更新这些持仓的状态 # 4. 更新这些持仓的状态
for symbol in missing_in_binance: for symbol in missing_in_binance:
trades = Trade.get_by_symbol(symbol, status='open') try:
trades = Trade.get_by_symbol(symbol, status='open')
if not trades:
logger.warning(f"{symbol} [状态同步] ⚠️ 数据库中没有找到open状态的交易记录跳过")
continue
logger.info(f"{symbol} [状态同步] 找到 {len(trades)} 条open状态的交易记录开始更新...")
except Exception as get_trades_error:
logger.error(
f"{symbol} [状态同步] ❌ 获取交易记录失败: "
f"错误类型={type(get_trades_error).__name__}, 错误消息={str(get_trades_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
continue
for trade in trades: for trade in trades:
trade_id = trade['id'] trade_id = trade.get('id')
if not trade_id:
logger.warning(f"{symbol} [状态同步] ⚠️ 交易记录缺少ID字段跳过: {trade}")
continue
try: try:
logger.info(f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})...") logger.info(
f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})... | "
f"入场价: {trade.get('entry_price', 'N/A')}, "
f"数量: {trade.get('quantity', 'N/A')}, "
f"方向: {trade.get('side', 'N/A')}"
)
# 尝试从币安历史订单获取实际平仓价格 # 尝试从币安历史订单获取实际平仓价格
exit_price = None exit_price = None
close_orders = []
try: try:
# 获取最近的平仓订单reduceOnly=True的订单 # 获取最近的平仓订单reduceOnly=True的订单
import time import time
end_time = int(time.time() * 1000) # 当前时间(毫秒) end_time = int(time.time() * 1000) # 当前时间(毫秒)
start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天 start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 最近7天
logger.debug(
f"{symbol} [状态同步] 获取历史订单: "
f"symbol={symbol}, startTime={start_time}, endTime={end_time}"
)
# 获取历史订单 # 获取历史订单
orders = await self.client.client.futures_get_all_orders( orders = await self.client.client.futures_get_all_orders(
symbol=symbol, symbol=symbol,
@ -1020,12 +1050,32 @@ class PositionManager:
endTime=end_time endTime=end_time
) )
# 验证 orders 的类型
if not isinstance(orders, list):
logger.warning(
f"{symbol} [状态同步] ⚠️ futures_get_all_orders 返回的不是列表: "
f"类型={type(orders)}, 值={orders}"
)
orders = [] # 设置为空列表,避免后续错误
if not orders:
logger.debug(f"{symbol} [状态同步] 未找到历史订单")
else:
logger.debug(f"{symbol} [状态同步] 找到 {len(orders)} 个历史订单")
# 查找最近的平仓订单reduceOnly=True且已成交 # 查找最近的平仓订单reduceOnly=True且已成交
close_orders = [ close_orders = []
o for o in orders for o in orders:
if o.get('reduceOnly') == True try:
and o.get('status') == 'FILLED' if isinstance(o, dict) and o.get('reduceOnly') == True and o.get('status') == 'FILLED':
] close_orders.append(o)
except (AttributeError, TypeError) as e:
logger.warning(
f"{symbol} [状态同步] ⚠️ 处理订单数据时出错: "
f"错误类型={type(e).__name__}, 错误消息={str(e)}, "
f"订单数据类型={type(o)}, 订单数据={o}"
)
continue
if close_orders: if close_orders:
# 按时间倒序排序,取最近的 # 按时间倒序排序,取最近的
@ -1036,24 +1086,93 @@ class PositionManager:
exit_price = float(latest_order.get('avgPrice', 0)) exit_price = float(latest_order.get('avgPrice', 0))
if exit_price > 0: if exit_price > 0:
logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT") logger.info(f"{symbol} [状态同步] 从币安历史订单获取平仓价格: {exit_price:.4f} USDT")
else:
logger.warning(
f"{symbol} [状态同步] 历史订单中没有有效的avgPrice: {latest_order}"
)
except KeyError as key_error:
# KeyError 可能是访问 orders[symbol] 或其他字典访问错误
logger.error(
f"{symbol} [状态同步] ❌ 获取历史订单时KeyError: "
f"错误key={key_error}, 错误类型={type(key_error).__name__}"
)
import traceback
logger.debug(f"{symbol} [状态同步] KeyError详情:\n{traceback.format_exc()}")
except Exception as order_error: except Exception as order_error:
logger.debug(f"{symbol} [状态同步] 获取历史订单失败: {order_error}") logger.warning(
f"{symbol} [状态同步] 获取历史订单失败: "
f"错误类型={type(order_error).__name__}, 错误消息={str(order_error)}"
)
import traceback
logger.debug(f"{symbol} [状态同步] 错误详情:\n{traceback.format_exc()}")
# 如果无法从订单获取,使用当前价格 # 如果无法从订单获取,使用当前价格
if not exit_price or exit_price <= 0: if not exit_price or exit_price <= 0:
ticker = await self.client.get_ticker_24h(symbol) try:
exit_price = float(ticker['price']) if ticker else float(trade['entry_price']) ticker = await self.client.get_ticker_24h(symbol)
logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT") if ticker and ticker.get('price'):
exit_price = float(ticker['price'])
logger.warning(f"{symbol} [状态同步] 使用当前价格作为平仓价格: {exit_price:.4f} USDT")
else:
exit_price = float(trade.get('entry_price', 0))
logger.warning(
f"{symbol} [状态同步] 无法获取当前价格ticker={ticker}"
f"使用入场价: {exit_price:.4f} USDT"
)
except KeyError as key_error:
# KeyError 可能是访问 ticker['price'] 时出错
logger.error(
f"{symbol} [状态同步] ❌ 获取当前价格时KeyError: {key_error}, "
f"ticker数据: {ticker if 'ticker' in locals() else 'N/A'}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
except Exception as ticker_error:
logger.warning(
f"{symbol} [状态同步] 获取当前价格失败: "
f"错误类型={type(ticker_error).__name__}, 错误消息={str(ticker_error)}"
f"使用入场价: {trade.get('entry_price', 'N/A')}"
)
exit_price = float(trade.get('entry_price', 0))
if exit_price <= 0:
logger.error(f"{symbol} [状态同步] ❌ 无法获取有效的平仓价格,跳过更新")
continue
# 计算盈亏确保所有值都是float类型避免Decimal类型问题 # 计算盈亏确保所有值都是float类型避免Decimal类型问题
entry_price = float(trade['entry_price']) try:
quantity = float(trade['quantity']) entry_price = float(trade.get('entry_price', 0))
if trade['side'] == 'BUY': quantity = float(trade.get('quantity', 0))
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100 if entry_price <= 0 or quantity <= 0:
else: # SELL logger.error(
pnl = (entry_price - exit_price) * quantity f"{symbol} [状态同步] ❌ 交易记录数据无效: "
pnl_percent = ((entry_price - exit_price) / entry_price) * 100 f"入场价={entry_price}, 数量={quantity}, 跳过更新"
)
continue
if trade.get('side') == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
logger.debug(
f"{symbol} [状态同步] 盈亏计算: "
f"入场价={entry_price:.4f}, 平仓价={exit_price:.4f}, "
f"数量={quantity:.4f}, 方向={trade.get('side', 'N/A')}, "
f"盈亏={pnl:.2f} USDT ({pnl_percent:.2f}%)"
)
except (ValueError, TypeError, KeyError) as calc_error:
logger.error(
f"{symbol} [状态同步] ❌ 计算盈亏失败 (ID: {trade_id}): "
f"错误类型={type(calc_error).__name__}, 错误消息={str(calc_error)}, "
f"交易记录数据: entry_price={trade.get('entry_price', 'N/A')}, "
f"quantity={trade.get('quantity', 'N/A')}, side={trade.get('side', 'N/A')}"
)
continue
# 从历史订单中获取平仓订单号 # 从历史订单中获取平仓订单号
exit_order_id = None exit_order_id = None
@ -1112,7 +1231,31 @@ class PositionManager:
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录") logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
except Exception as e: except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): {e}") # 详细记录错误信息,包括异常类型、错误消息和堆栈跟踪
import traceback
error_type = type(e).__name__
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(
f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): "
f"错误类型={error_type}, 错误消息={error_msg}"
)
logger.debug(
f"{symbol} [状态同步] 错误详情:\n{error_traceback}"
)
# 如果是数据库相关错误,记录更多信息
if "Duplicate entry" in error_msg or "1062" in error_msg:
logger.error(
f"{symbol} [状态同步] 数据库唯一约束冲突,"
f"可能原因: exit_order_id={exit_order_id} 已被其他交易记录使用"
)
elif "database" in error_msg.lower() or "connection" in error_msg.lower():
logger.error(
f"{symbol} [状态同步] 数据库连接或执行错误,"
f"请检查数据库连接状态"
)
else: else:
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致") logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")