auto_trade_sys/backend/api/routes/trades.py
薇薇安 e3ecaf1232 a
2026-01-18 19:44:24 +08:00

432 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易记录API
"""
from fastapi import APIRouter, Query, HTTPException
from typing import Optional
from datetime import datetime, timedelta
import sys
from pathlib import Path
import logging
import asyncio
import time
from datetime import timezone, timedelta
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
from database.models import Trade
router = APIRouter()
# 在模块级别创建logger与其他路由文件保持一致
logger = logging.getLogger(__name__)
def get_timestamp_range(period: Optional[str] = None):
"""
根据时间段参数返回开始和结束时间戳Unix时间戳秒数
Args:
period: 时间段 ('1d', '7d', '30d', 'today', 'week', 'month', 'custom')
Returns:
(start_timestamp, end_timestamp) 元组Unix时间戳秒数
"""
# 使用当前时间作为结束时间Unix时间戳秒数
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
end_timestamp = int(now.timestamp())
if period == '1d':
# 最近1天当前时间减去24小时
start_timestamp = end_timestamp - 24 * 3600
elif period == '7d':
# 最近7天当前时间减去7*24小时
start_timestamp = end_timestamp - 7 * 24 * 3600
elif period == '30d':
# 最近30天当前时间减去30*24小时
start_timestamp = end_timestamp - 30 * 24 * 3600
elif period == 'today':
# 今天从今天00:00:00到现在
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(today_start.timestamp())
elif period == 'week':
# 本周从本周一00:00:00到现在
days_since_monday = now.weekday() # 0=Monday, 6=Sunday
week_start = (now - timedelta(days=days_since_monday)).replace(hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(week_start.timestamp())
elif period == 'month':
# 本月从本月1日00:00:00到现在
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(month_start.timestamp())
else:
return None, None
return start_timestamp, end_timestamp
@router.get("")
@router.get("/")
async def get_trades(
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天), 'today'(今天), 'week'(本周), 'month'(本月)"),
symbol: Optional[str] = Query(None, description="交易对筛选"),
trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"),
exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"),
status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"),
limit: int = Query(100, ge=1, le=1000, description="返回记录数限制")
):
"""
获取交易记录
支持两种筛选方式:
1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d', 'today', 'week', 'month')
2. 自定义时间段筛选:使用 start_date 和 end_date 参数会转换为Unix时间戳
如果同时提供了 period 和 start_date/end_dateperiod 优先级更高
"""
try:
logger.info(f"获取交易记录请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}, status={status}, limit={limit}, trade_type={trade_type}, exit_reason={exit_reason}")
start_timestamp = None
end_timestamp = None
# 如果提供了 period使用快速时间段筛选
if period:
period_start, period_end = get_timestamp_range(period)
if period_start is not None and period_end is not None:
start_timestamp = period_start
end_timestamp = period_end
logger.info(f"使用快速时间段筛选: {period} -> {start_timestamp} ({datetime.fromtimestamp(start_timestamp)}) 至 {end_timestamp} ({datetime.fromtimestamp(end_timestamp)})")
elif start_date or end_date:
# 自定义时间段将日期字符串转换为Unix时间戳
beijing_tz = timezone(timedelta(hours=8))
if start_date:
if len(start_date) == 10: # YYYY-MM-DD
start_date = f"{start_date} 00:00:00"
try:
dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
start_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的开始日期格式: {start_date}")
if end_date:
if len(end_date) == 10: # YYYY-MM-DD
end_date = f"{end_date} 23:59:59"
try:
dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
end_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的结束日期格式: {end_date}")
trades = Trade.get_all(start_timestamp, end_timestamp, symbol, status, trade_type, exit_reason)
logger.info(f"查询到 {len(trades)} 条交易记录")
# 格式化交易记录,添加平仓类型的中文显示
formatted_trades = []
for trade in trades[:limit]:
formatted_trade = dict(trade)
# 将 exit_reason 转换为中文显示
exit_reason = trade.get('exit_reason', '')
if exit_reason:
exit_reason_map = {
'manual': '手动平仓',
'stop_loss': '自动平仓(止损)',
'take_profit': '自动平仓(止盈)',
'trailing_stop': '自动平仓(移动止损)',
'sync': '同步平仓'
}
formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason)
else:
formatted_trade['exit_reason_display'] = ''
formatted_trades.append(formatted_trade)
result = {
"total": len(trades),
"trades": formatted_trades,
"filters": {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
"start_date": datetime.fromtimestamp(start_timestamp).strftime('%Y-%m-%d %H:%M:%S') if start_timestamp else None,
"end_date": datetime.fromtimestamp(end_timestamp).strftime('%Y-%m-%d %H:%M:%S') if end_timestamp else None,
"period": period,
"symbol": symbol,
"status": status
}
}
logger.debug(f"返回交易记录: {len(result['trades'])} 条 (限制: {limit})")
return result
except Exception as e:
logger.error(f"获取交易记录失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/stats")
async def get_trade_stats(
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d', 'today', 'week', 'month'"),
symbol: Optional[str] = Query(None, description="交易对筛选")
):
"""获取交易统计"""
try:
logger.info(f"获取交易统计请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}")
start_timestamp = None
end_timestamp = None
# 如果提供了 period使用快速时间段筛选
if period:
period_start, period_end = get_timestamp_range(period)
if period_start is not None and period_end is not None:
start_timestamp = period_start
end_timestamp = period_end
elif start_date or end_date:
# 自定义时间段将日期字符串转换为Unix时间戳
beijing_tz = timezone(timedelta(hours=8))
if start_date:
if len(start_date) == 10: # YYYY-MM-DD
start_date = f"{start_date} 00:00:00"
try:
dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
start_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的开始日期格式: {start_date}")
if end_date:
if len(end_date) == 10: # YYYY-MM-DD
end_date = f"{end_date} 23:59:59"
try:
dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=beijing_tz)
end_timestamp = int(dt.timestamp())
except ValueError:
logger.warning(f"无效的结束日期格式: {end_date}")
trades = Trade.get_all(start_timestamp, end_timestamp, symbol, None)
closed_trades = [t for t in trades if t['status'] == 'closed']
# 排除0盈亏的订单abs(pnl) < 0.01 USDT视为0盈亏这些订单不应该影响胜率统计
ZERO_PNL_THRESHOLD = 0.01 # 0.01 USDT的阈值小于此值视为0盈亏
meaningful_trades = [t for t in closed_trades if abs(float(t['pnl'])) >= ZERO_PNL_THRESHOLD]
zero_pnl_trades = [t for t in closed_trades if abs(float(t['pnl'])) < ZERO_PNL_THRESHOLD]
# 只统计有意义的交易排除0盈亏的胜率
win_trades = [t for t in meaningful_trades if float(t['pnl']) > 0]
loss_trades = [t for t in meaningful_trades if float(t['pnl']) < 0]
stats = {
"total_trades": len(trades),
"closed_trades": len(closed_trades),
"open_trades": len(trades) - len(closed_trades),
"meaningful_trades": len(meaningful_trades), # 有意义的交易数排除0盈亏
"zero_pnl_trades": len(zero_pnl_trades), # 0盈亏交易数
"win_trades": len(win_trades),
"loss_trades": len(loss_trades),
"win_rate": len(win_trades) / len(meaningful_trades) * 100 if meaningful_trades else 0, # 基于有意义的交易计算胜率
"total_pnl": sum(float(t['pnl']) for t in closed_trades),
"avg_pnl": sum(float(t['pnl']) for t in closed_trades) / len(closed_trades) if closed_trades else 0,
# 总交易量(名义下单量口径):优先使用 notional_usdt新字段否则回退 entry_price * quantity
"total_notional_usdt": sum(
float(t.get('notional_usdt') or (float(t.get('entry_price', 0)) * float(t.get('quantity', 0))))
for t in trades
),
"filters": {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
"start_date": datetime.fromtimestamp(start_timestamp).strftime('%Y-%m-%d %H:%M:%S') if start_timestamp else None,
"end_date": datetime.fromtimestamp(end_timestamp).strftime('%Y-%m-%d %H:%M:%S') if end_timestamp else None,
"period": period,
"symbol": symbol
}
}
logger.info(
f"交易统计: 总交易数={stats['total_trades']}, 已平仓={stats['closed_trades']}, "
f"有意义交易={stats['meaningful_trades']}, 0盈亏交易={stats['zero_pnl_trades']}, "
f"胜率={stats['win_rate']:.2f}%, 总盈亏={stats['total_pnl']:.2f} USDT"
)
return stats
except Exception as e:
logger.error(f"获取交易统计失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync-binance")
async def sync_trades_from_binance(
days: int = Query(7, ge=1, le=30, description="同步最近N天的订单")
):
"""
从币安同步历史订单,确保数据库与币安一致
Args:
days: 同步最近N天的订单默认7天
"""
try:
logger.info(f"开始从币安同步历史订单(最近{days}天)...")
# 导入必要的模块
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
alternative_path = project_root / 'backend' / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
else:
raise HTTPException(status_code=500, detail="交易系统模块不存在")
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
import config
# 初始化客户端
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try:
import time
from datetime import datetime, timedelta
# 计算时间范围
end_time = int(time.time() * 1000) # 当前时间(毫秒)
start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
# 获取所有已成交的订单(包括开仓和平仓)
all_orders = []
try:
# 获取所有交易对的订单
# 注意币安API可能需要分交易对查询这里先获取所有交易对
symbols = await client.client.futures_exchange_info()
symbol_list = [s['symbol'] for s in symbols.get('symbols', []) if s.get('contractType') == 'PERPETUAL']
logger.info(f"开始同步 {len(symbol_list)} 个交易对的订单...")
for symbol in symbol_list:
try:
# 获取该交易对的历史订单
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time,
endTime=end_time
)
# 只保留已成交的订单
filled_orders = [o for o in orders if o.get('status') == 'FILLED']
all_orders.extend(filled_orders)
# 避免请求过快
await asyncio.sleep(0.1)
except Exception as e:
logger.debug(f"获取 {symbol} 订单失败: {e}")
continue
logger.info(f"从币安获取到 {len(all_orders)} 个已成交订单")
except Exception as e:
logger.error(f"获取币安订单失败: {e}")
raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}")
# 同步订单到数据库
synced_count = 0
updated_count = 0
# 按时间排序,从旧到新
all_orders.sort(key=lambda x: x.get('time', 0))
for order in all_orders:
symbol = order.get('symbol')
order_id = order.get('orderId')
side = order.get('side')
quantity = float(order.get('executedQty', 0))
avg_price = float(order.get('avgPrice', 0))
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
reduce_only = order.get('reduceOnly', False)
if quantity <= 0 or avg_price <= 0:
continue
try:
if reduce_only:
# 这是平仓订单
# 首先检查是否已经通过订单号同步过(避免重复)
existing_trade = Trade.get_by_exit_order_id(order_id)
if existing_trade:
logger.debug(f"订单 {order_id} 已同步过,跳过")
continue
# 查找数据库中该交易对的open状态记录
open_trades = Trade.get_by_symbol(symbol, status='open')
if open_trades:
# 找到匹配的交易记录通过symbol匹配如果有多个则取最近的
trade = open_trades[0] # 取第一个
trade_id = trade['id']
# 计算盈亏
entry_price = float(trade['entry_price'])
entry_quantity = float(trade['quantity'])
# 使用实际成交数量(可能部分平仓)
actual_quantity = min(quantity, entry_quantity)
if trade['side'] == 'BUY':
pnl = (avg_price - entry_price) * actual_quantity
pnl_percent = ((avg_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - avg_price) * actual_quantity
pnl_percent = ((entry_price - avg_price) / entry_price) * 100
# 更新数据库(包含订单号)
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_price,
exit_reason='sync',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order_id # 保存订单号,确保唯一性
)
updated_count += 1
logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})")
else:
# 这是开仓订单,检查数据库中是否已存在(通过订单号)
existing_trade = Trade.get_by_entry_order_id(order_id)
if not existing_trade:
# 如果不存在,可以创建新记录(但需要更多信息,暂时跳过)
logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建")
else:
logger.debug(f"开仓订单 {order_id} 已存在,跳过")
except Exception as e:
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
continue
result = {
"success": True,
"message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)",
"total_orders": len(all_orders),
"updated_trades": updated_count,
"close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]),
"open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)])
}
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录")
return result
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"同步币安订单失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")