auto_trade_sys/backend/api/routes/trades.py
薇薇安 1d0f445ed7 a
2026-01-17 12:26:02 +08:00

360 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易记录API
"""
from fastapi import APIRouter, Query, HTTPException
from typing import Optional
from datetime import datetime, timedelta
import sys
from pathlib import Path
import logging
import asyncio
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
from database.models import Trade
router = APIRouter()
# 在模块级别创建logger与其他路由文件保持一致
logger = logging.getLogger(__name__)
def get_date_range(period: Optional[str] = None):
"""
根据时间段参数返回开始和结束日期
Args:
period: 时间段 ('1d', '7d', '30d', 'custom')
Returns:
(start_date, end_date) 元组,格式为 'YYYY-MM-DD HH:MM:SS'
"""
# 使用当前时间作为结束时间,确保包含最新的单子
end_date = datetime.now()
if period == '1d':
# 最近1天从今天00:00:00到现在确保包含今天的所有单子
# 这样即使查询时已经是晚上,也能查到今天早上开的单子
start_date = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
elif period == '7d':
# 最近7天从7天前00:00:00到现在
start_date = (end_date - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
elif period == '30d':
# 最近30天从30天前00:00:00到现在
start_date = (end_date - timedelta(days=30)).replace(hour=0, minute=0, second=0, microsecond=0)
else:
return None, None
# 开始时间使用计算出的开始日期的00:00:00
start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
# 结束时间:使用当前时间,确保包含最新单子
end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
return start_date_str, end_date_str
@router.get("")
@router.get("/")
async def get_trades(
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天)"),
symbol: Optional[str] = Query(None, description="交易对筛选"),
trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"),
exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"),
status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"),
limit: int = Query(100, ge=1, le=1000, description="返回记录数限制")
):
"""
获取交易记录
支持两种筛选方式:
1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d')
2. 自定义时间段筛选:使用 start_date 和 end_date 参数
如果同时提供了 period 和 start_date/end_dateperiod 优先级更高
"""
try:
logger.info(f"获取交易记录请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}, status={status}, limit={limit}, trade_type={trade_type}, exit_reason={exit_reason}")
# 如果提供了 period使用快速时间段筛选
if period:
period_start, period_end = get_date_range(period)
if period_start and period_end:
start_date = period_start
end_date = period_end
logger.info(f"使用快速时间段筛选: {period} -> {start_date}{end_date}")
# 格式化日期(如果只提供了日期,添加时间部分)
if start_date and len(start_date) == 10: # YYYY-MM-DD
start_date = f"{start_date} 00:00:00"
if end_date and len(end_date) == 10: # YYYY-MM-DD
end_date = f"{end_date} 23:59:59"
trades = Trade.get_all(start_date, end_date, symbol, status, trade_type, exit_reason)
logger.info(f"查询到 {len(trades)} 条交易记录")
# 格式化交易记录,添加平仓类型的中文显示
formatted_trades = []
for trade in trades[:limit]:
formatted_trade = dict(trade)
# 将 exit_reason 转换为中文显示
exit_reason = trade.get('exit_reason', '')
if exit_reason:
exit_reason_map = {
'manual': '手动平仓',
'stop_loss': '自动平仓(止损)',
'take_profit': '自动平仓(止盈)',
'trailing_stop': '自动平仓(移动止损)',
'sync': '同步平仓'
}
formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason)
else:
formatted_trade['exit_reason_display'] = ''
formatted_trades.append(formatted_trade)
result = {
"total": len(trades),
"trades": formatted_trades,
"filters": {
"start_date": start_date,
"end_date": end_date,
"period": period,
"symbol": symbol,
"status": status
}
}
logger.debug(f"返回交易记录: {len(result['trades'])} 条 (限制: {limit})")
return result
except Exception as e:
logger.error(f"获取交易记录失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/stats")
async def get_trade_stats(
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d'"),
symbol: Optional[str] = Query(None, description="交易对筛选")
):
"""获取交易统计"""
try:
logger.info(f"获取交易统计请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}")
# 如果提供了 period使用快速时间段筛选
if period:
period_start, period_end = get_date_range(period)
if period_start and period_end:
start_date = period_start
end_date = period_end
# 格式化日期
if start_date and len(start_date) == 10:
start_date = f"{start_date} 00:00:00"
if end_date and len(end_date) == 10:
end_date = f"{end_date} 23:59:59"
trades = Trade.get_all(start_date, end_date, symbol, None)
closed_trades = [t for t in trades if t['status'] == 'closed']
win_trades = [t for t in closed_trades if float(t['pnl']) > 0]
stats = {
"total_trades": len(trades),
"closed_trades": len(closed_trades),
"open_trades": len(trades) - len(closed_trades),
"win_trades": len(win_trades),
"loss_trades": len(closed_trades) - len(win_trades),
"win_rate": len(win_trades) / len(closed_trades) * 100 if closed_trades else 0,
"total_pnl": sum(float(t['pnl']) for t in closed_trades),
"avg_pnl": sum(float(t['pnl']) for t in closed_trades) / len(closed_trades) if closed_trades else 0,
"filters": {
"start_date": start_date,
"end_date": end_date,
"period": period,
"symbol": symbol
}
}
logger.info(f"交易统计: 总交易数={stats['total_trades']}, 已平仓={stats['closed_trades']}, 胜率={stats['win_rate']:.2f}%, 总盈亏={stats['total_pnl']:.2f} USDT")
return stats
except Exception as e:
logger.error(f"获取交易统计失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync-binance")
async def sync_trades_from_binance(
days: int = Query(7, ge=1, le=30, description="同步最近N天的订单")
):
"""
从币安同步历史订单,确保数据库与币安一致
Args:
days: 同步最近N天的订单默认7天
"""
try:
logger.info(f"开始从币安同步历史订单(最近{days}天)...")
# 导入必要的模块
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
alternative_path = project_root / 'backend' / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
else:
raise HTTPException(status_code=500, detail="交易系统模块不存在")
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
import config
# 初始化客户端
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try:
import time
from datetime import datetime, timedelta
# 计算时间范围
end_time = int(time.time() * 1000) # 当前时间(毫秒)
start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
# 获取所有已成交的订单(包括开仓和平仓)
all_orders = []
try:
# 获取所有交易对的订单
# 注意币安API可能需要分交易对查询这里先获取所有交易对
symbols = await client.client.futures_exchange_info()
symbol_list = [s['symbol'] for s in symbols.get('symbols', []) if s.get('contractType') == 'PERPETUAL']
logger.info(f"开始同步 {len(symbol_list)} 个交易对的订单...")
for symbol in symbol_list:
try:
# 获取该交易对的历史订单
orders = await client.client.futures_get_all_orders(
symbol=symbol,
startTime=start_time,
endTime=end_time
)
# 只保留已成交的订单
filled_orders = [o for o in orders if o.get('status') == 'FILLED']
all_orders.extend(filled_orders)
# 避免请求过快
await asyncio.sleep(0.1)
except Exception as e:
logger.debug(f"获取 {symbol} 订单失败: {e}")
continue
logger.info(f"从币安获取到 {len(all_orders)} 个已成交订单")
except Exception as e:
logger.error(f"获取币安订单失败: {e}")
raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}")
# 同步订单到数据库
synced_count = 0
updated_count = 0
# 按时间排序,从旧到新
all_orders.sort(key=lambda x: x.get('time', 0))
for order in all_orders:
symbol = order.get('symbol')
order_id = order.get('orderId')
side = order.get('side')
quantity = float(order.get('executedQty', 0))
avg_price = float(order.get('avgPrice', 0))
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
reduce_only = order.get('reduceOnly', False)
if quantity <= 0 or avg_price <= 0:
continue
try:
if reduce_only:
# 这是平仓订单
# 首先检查是否已经通过订单号同步过(避免重复)
existing_trade = Trade.get_by_exit_order_id(order_id)
if existing_trade:
logger.debug(f"订单 {order_id} 已同步过,跳过")
continue
# 查找数据库中该交易对的open状态记录
open_trades = Trade.get_by_symbol(symbol, status='open')
if open_trades:
# 找到匹配的交易记录通过symbol匹配如果有多个则取最近的
trade = open_trades[0] # 取第一个
trade_id = trade['id']
# 计算盈亏
entry_price = float(trade['entry_price'])
entry_quantity = float(trade['quantity'])
# 使用实际成交数量(可能部分平仓)
actual_quantity = min(quantity, entry_quantity)
if trade['side'] == 'BUY':
pnl = (avg_price - entry_price) * actual_quantity
pnl_percent = ((avg_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - avg_price) * actual_quantity
pnl_percent = ((entry_price - avg_price) / entry_price) * 100
# 更新数据库(包含订单号)
Trade.update_exit(
trade_id=trade_id,
exit_price=avg_price,
exit_reason='sync',
pnl=pnl,
pnl_percent=pnl_percent,
exit_order_id=order_id # 保存订单号,确保唯一性
)
updated_count += 1
logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})")
else:
# 这是开仓订单,检查数据库中是否已存在(通过订单号)
existing_trade = Trade.get_by_entry_order_id(order_id)
if not existing_trade:
# 如果不存在,可以创建新记录(但需要更多信息,暂时跳过)
logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建")
else:
logger.debug(f"开仓订单 {order_id} 已存在,跳过")
except Exception as e:
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
continue
result = {
"success": True,
"message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)",
"total_orders": len(all_orders),
"updated_trades": updated_count,
"close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]),
"open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)])
}
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录")
return result
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"同步币安订单失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")