364 lines
16 KiB
Python
364 lines
16 KiB
Python
"""
|
||
交易记录API
|
||
"""
|
||
from fastapi import APIRouter, Query, HTTPException
|
||
from typing import Optional
|
||
from datetime import datetime, timedelta
|
||
import sys
|
||
from pathlib import Path
|
||
import logging
|
||
import asyncio
|
||
|
||
project_root = Path(__file__).parent.parent.parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
sys.path.insert(0, str(project_root / 'backend'))
|
||
|
||
from database.models import Trade
|
||
|
||
router = APIRouter()
|
||
# 在模块级别创建logger(与其他路由文件保持一致)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def get_date_range(period: Optional[str] = None):
|
||
"""
|
||
根据时间段参数返回开始和结束日期
|
||
|
||
Args:
|
||
period: 时间段 ('1d', '7d', '30d', 'custom')
|
||
|
||
Returns:
|
||
(start_date, end_date) 元组,格式为 'YYYY-MM-DD HH:MM:SS'
|
||
"""
|
||
# 使用当前时间作为结束时间,确保包含最新的单子
|
||
end_date = datetime.now()
|
||
|
||
if period == '1d':
|
||
# 最近1天:从今天00:00:00到现在(确保包含今天的所有单子)
|
||
# 这样即使查询时已经是晚上,也能查到今天早上开的单子
|
||
start_date = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
elif period == '7d':
|
||
# 最近7天:从7天前00:00:00到现在
|
||
start_date = (end_date - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
elif period == '30d':
|
||
# 最近30天:从30天前00:00:00到现在
|
||
start_date = (end_date - timedelta(days=30)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
else:
|
||
return None, None
|
||
|
||
# 开始时间:使用计算出的开始日期的00:00:00
|
||
start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
# 结束时间:使用当前时间,确保包含最新单子,北京时间
|
||
# 使用北京时间(东八区)
|
||
from datetime import timezone, timedelta
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
end_date = datetime.now(beijing_tz)
|
||
end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
return start_date_str, end_date_str
|
||
|
||
|
||
@router.get("")
|
||
@router.get("/")
|
||
async def get_trades(
|
||
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
|
||
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
|
||
period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天)"),
|
||
symbol: Optional[str] = Query(None, description="交易对筛选"),
|
||
trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"),
|
||
exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"),
|
||
status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"),
|
||
limit: int = Query(100, ge=1, le=1000, description="返回记录数限制")
|
||
):
|
||
"""
|
||
获取交易记录
|
||
|
||
支持两种筛选方式:
|
||
1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d')
|
||
2. 自定义时间段筛选:使用 start_date 和 end_date 参数
|
||
|
||
如果同时提供了 period 和 start_date/end_date,period 优先级更高
|
||
"""
|
||
try:
|
||
logger.info(f"获取交易记录请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}, status={status}, limit={limit}, trade_type={trade_type}, exit_reason={exit_reason}")
|
||
# 如果提供了 period,使用快速时间段筛选
|
||
if period:
|
||
period_start, period_end = get_date_range(period)
|
||
if period_start and period_end:
|
||
start_date = period_start
|
||
end_date = period_end
|
||
logger.info(f"使用快速时间段筛选: {period} -> {start_date} 至 {end_date}")
|
||
|
||
# 格式化日期(如果只提供了日期,添加时间部分)
|
||
if start_date and len(start_date) == 10: # YYYY-MM-DD
|
||
start_date = f"{start_date} 00:00:00"
|
||
if end_date and len(end_date) == 10: # YYYY-MM-DD
|
||
end_date = f"{end_date} 23:59:59"
|
||
|
||
trades = Trade.get_all(start_date, end_date, symbol, status, trade_type, exit_reason)
|
||
logger.info(f"查询到 {len(trades)} 条交易记录")
|
||
|
||
# 格式化交易记录,添加平仓类型的中文显示
|
||
formatted_trades = []
|
||
for trade in trades[:limit]:
|
||
formatted_trade = dict(trade)
|
||
|
||
# 将 exit_reason 转换为中文显示
|
||
exit_reason = trade.get('exit_reason', '')
|
||
if exit_reason:
|
||
exit_reason_map = {
|
||
'manual': '手动平仓',
|
||
'stop_loss': '自动平仓(止损)',
|
||
'take_profit': '自动平仓(止盈)',
|
||
'trailing_stop': '自动平仓(移动止损)',
|
||
'sync': '同步平仓'
|
||
}
|
||
formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason)
|
||
else:
|
||
formatted_trade['exit_reason_display'] = ''
|
||
|
||
formatted_trades.append(formatted_trade)
|
||
|
||
result = {
|
||
"total": len(trades),
|
||
"trades": formatted_trades,
|
||
"filters": {
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"period": period,
|
||
"symbol": symbol,
|
||
"status": status
|
||
}
|
||
}
|
||
|
||
logger.debug(f"返回交易记录: {len(result['trades'])} 条 (限制: {limit})")
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取交易记录失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/stats")
|
||
async def get_trade_stats(
|
||
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
|
||
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"),
|
||
period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d'"),
|
||
symbol: Optional[str] = Query(None, description="交易对筛选")
|
||
):
|
||
"""获取交易统计"""
|
||
try:
|
||
logger.info(f"获取交易统计请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}")
|
||
# 如果提供了 period,使用快速时间段筛选
|
||
if period:
|
||
period_start, period_end = get_date_range(period)
|
||
if period_start and period_end:
|
||
start_date = period_start
|
||
end_date = period_end
|
||
|
||
# 格式化日期
|
||
if start_date and len(start_date) == 10:
|
||
start_date = f"{start_date} 00:00:00"
|
||
if end_date and len(end_date) == 10:
|
||
end_date = f"{end_date} 23:59:59"
|
||
|
||
trades = Trade.get_all(start_date, end_date, symbol, None)
|
||
closed_trades = [t for t in trades if t['status'] == 'closed']
|
||
win_trades = [t for t in closed_trades if float(t['pnl']) > 0]
|
||
|
||
stats = {
|
||
"total_trades": len(trades),
|
||
"closed_trades": len(closed_trades),
|
||
"open_trades": len(trades) - len(closed_trades),
|
||
"win_trades": len(win_trades),
|
||
"loss_trades": len(closed_trades) - len(win_trades),
|
||
"win_rate": len(win_trades) / len(closed_trades) * 100 if closed_trades else 0,
|
||
"total_pnl": sum(float(t['pnl']) for t in closed_trades),
|
||
"avg_pnl": sum(float(t['pnl']) for t in closed_trades) / len(closed_trades) if closed_trades else 0,
|
||
"filters": {
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"period": period,
|
||
"symbol": symbol
|
||
}
|
||
}
|
||
|
||
logger.info(f"交易统计: 总交易数={stats['total_trades']}, 已平仓={stats['closed_trades']}, 胜率={stats['win_rate']:.2f}%, 总盈亏={stats['total_pnl']:.2f} USDT")
|
||
|
||
return stats
|
||
except Exception as e:
|
||
logger.error(f"获取交易统计失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.post("/sync-binance")
|
||
async def sync_trades_from_binance(
|
||
days: int = Query(7, ge=1, le=30, description="同步最近N天的订单")
|
||
):
|
||
"""
|
||
从币安同步历史订单,确保数据库与币安一致
|
||
|
||
Args:
|
||
days: 同步最近N天的订单(默认7天)
|
||
"""
|
||
try:
|
||
logger.info(f"开始从币安同步历史订单(最近{days}天)...")
|
||
|
||
# 导入必要的模块
|
||
trading_system_path = project_root / 'trading_system'
|
||
if not trading_system_path.exists():
|
||
alternative_path = project_root / 'backend' / 'trading_system'
|
||
if alternative_path.exists():
|
||
trading_system_path = alternative_path
|
||
else:
|
||
raise HTTPException(status_code=500, detail="交易系统模块不存在")
|
||
|
||
sys.path.insert(0, str(trading_system_path))
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from binance_client import BinanceClient
|
||
import config
|
||
|
||
# 初始化客户端
|
||
client = BinanceClient(
|
||
api_key=config.BINANCE_API_KEY,
|
||
api_secret=config.BINANCE_API_SECRET,
|
||
testnet=config.USE_TESTNET
|
||
)
|
||
|
||
await client.connect()
|
||
|
||
try:
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
|
||
# 计算时间范围
|
||
end_time = int(time.time() * 1000) # 当前时间(毫秒)
|
||
start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
|
||
|
||
# 获取所有已成交的订单(包括开仓和平仓)
|
||
all_orders = []
|
||
try:
|
||
# 获取所有交易对的订单
|
||
# 注意:币安API可能需要分交易对查询,这里先获取所有交易对
|
||
symbols = await client.client.futures_exchange_info()
|
||
symbol_list = [s['symbol'] for s in symbols.get('symbols', []) if s.get('contractType') == 'PERPETUAL']
|
||
|
||
logger.info(f"开始同步 {len(symbol_list)} 个交易对的订单...")
|
||
|
||
for symbol in symbol_list:
|
||
try:
|
||
# 获取该交易对的历史订单
|
||
orders = await client.client.futures_get_all_orders(
|
||
symbol=symbol,
|
||
startTime=start_time,
|
||
endTime=end_time
|
||
)
|
||
|
||
# 只保留已成交的订单
|
||
filled_orders = [o for o in orders if o.get('status') == 'FILLED']
|
||
all_orders.extend(filled_orders)
|
||
|
||
# 避免请求过快
|
||
await asyncio.sleep(0.1)
|
||
except Exception as e:
|
||
logger.debug(f"获取 {symbol} 订单失败: {e}")
|
||
continue
|
||
|
||
logger.info(f"从币安获取到 {len(all_orders)} 个已成交订单")
|
||
except Exception as e:
|
||
logger.error(f"获取币安订单失败: {e}")
|
||
raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}")
|
||
|
||
# 同步订单到数据库
|
||
synced_count = 0
|
||
updated_count = 0
|
||
|
||
# 按时间排序,从旧到新
|
||
all_orders.sort(key=lambda x: x.get('time', 0))
|
||
|
||
for order in all_orders:
|
||
symbol = order.get('symbol')
|
||
order_id = order.get('orderId')
|
||
side = order.get('side')
|
||
quantity = float(order.get('executedQty', 0))
|
||
avg_price = float(order.get('avgPrice', 0))
|
||
order_time = datetime.fromtimestamp(order.get('time', 0) / 1000)
|
||
reduce_only = order.get('reduceOnly', False)
|
||
|
||
if quantity <= 0 or avg_price <= 0:
|
||
continue
|
||
|
||
try:
|
||
if reduce_only:
|
||
# 这是平仓订单
|
||
# 首先检查是否已经通过订单号同步过(避免重复)
|
||
existing_trade = Trade.get_by_exit_order_id(order_id)
|
||
if existing_trade:
|
||
logger.debug(f"订单 {order_id} 已同步过,跳过")
|
||
continue
|
||
|
||
# 查找数据库中该交易对的open状态记录
|
||
open_trades = Trade.get_by_symbol(symbol, status='open')
|
||
if open_trades:
|
||
# 找到匹配的交易记录(通过symbol匹配,如果有多个则取最近的)
|
||
trade = open_trades[0] # 取第一个
|
||
trade_id = trade['id']
|
||
|
||
# 计算盈亏
|
||
entry_price = float(trade['entry_price'])
|
||
entry_quantity = float(trade['quantity'])
|
||
|
||
# 使用实际成交数量(可能部分平仓)
|
||
actual_quantity = min(quantity, entry_quantity)
|
||
|
||
if trade['side'] == 'BUY':
|
||
pnl = (avg_price - entry_price) * actual_quantity
|
||
pnl_percent = ((avg_price - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl = (entry_price - avg_price) * actual_quantity
|
||
pnl_percent = ((entry_price - avg_price) / entry_price) * 100
|
||
|
||
# 更新数据库(包含订单号)
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=avg_price,
|
||
exit_reason='sync',
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent,
|
||
exit_order_id=order_id # 保存订单号,确保唯一性
|
||
)
|
||
updated_count += 1
|
||
logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})")
|
||
else:
|
||
# 这是开仓订单,检查数据库中是否已存在(通过订单号)
|
||
existing_trade = Trade.get_by_entry_order_id(order_id)
|
||
if not existing_trade:
|
||
# 如果不存在,可以创建新记录(但需要更多信息,暂时跳过)
|
||
logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建")
|
||
else:
|
||
logger.debug(f"开仓订单 {order_id} 已存在,跳过")
|
||
except Exception as e:
|
||
logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}")
|
||
continue
|
||
|
||
result = {
|
||
"success": True,
|
||
"message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)",
|
||
"total_orders": len(all_orders),
|
||
"updated_trades": updated_count,
|
||
"close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]),
|
||
"open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)])
|
||
}
|
||
|
||
logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录")
|
||
return result
|
||
|
||
finally:
|
||
await client.disconnect()
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"同步币安订单失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")
|