""" 交易记录API """ from fastapi import APIRouter, Query, HTTPException from typing import Optional from datetime import datetime, timedelta import sys from pathlib import Path import logging import asyncio project_root = Path(__file__).parent.parent.parent.parent sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root / 'backend')) from database.models import Trade router = APIRouter() # 在模块级别创建logger(与其他路由文件保持一致) logger = logging.getLogger(__name__) def get_date_range(period: Optional[str] = None): """ 根据时间段参数返回开始和结束日期 Args: period: 时间段 ('1d', '7d', '30d', 'custom') Returns: (start_date, end_date) 元组,格式为 'YYYY-MM-DD HH:MM:SS' """ end_date = datetime.now() if period == '1d': start_date = end_date - timedelta(days=1) elif period == '7d': start_date = end_date - timedelta(days=7) elif period == '30d': start_date = end_date - timedelta(days=30) else: return None, None return start_date.strftime('%Y-%m-%d 00:00:00'), end_date.strftime('%Y-%m-%d %H:%M:%S') @router.get("") @router.get("/") async def get_trades( start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天)"), symbol: Optional[str] = Query(None, description="交易对筛选"), trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"), exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"), status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"), limit: int = Query(100, ge=1, le=1000, description="返回记录数限制") ): """ 获取交易记录 支持两种筛选方式: 1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d') 2. 自定义时间段筛选:使用 start_date 和 end_date 参数 如果同时提供了 period 和 start_date/end_date,period 优先级更高 """ try: logger.info(f"获取交易记录请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}, status={status}, limit={limit}, trade_type={trade_type}, exit_reason={exit_reason}") # 如果提供了 period,使用快速时间段筛选 if period: period_start, period_end = get_date_range(period) if period_start and period_end: start_date = period_start end_date = period_end logger.info(f"使用快速时间段筛选: {period} -> {start_date} 至 {end_date}") # 格式化日期(如果只提供了日期,添加时间部分) if start_date and len(start_date) == 10: # YYYY-MM-DD start_date = f"{start_date} 00:00:00" if end_date and len(end_date) == 10: # YYYY-MM-DD end_date = f"{end_date} 23:59:59" trades = Trade.get_all(start_date, end_date, symbol, status, trade_type, exit_reason) logger.info(f"查询到 {len(trades)} 条交易记录") # 格式化交易记录,添加平仓类型的中文显示 formatted_trades = [] for trade in trades[:limit]: formatted_trade = dict(trade) # 将 exit_reason 转换为中文显示 exit_reason = trade.get('exit_reason', '') if exit_reason: exit_reason_map = { 'manual': '手动平仓', 'stop_loss': '自动平仓(止损)', 'take_profit': '自动平仓(止盈)', 'trailing_stop': '自动平仓(移动止损)', 'sync': '同步平仓' } formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason) else: formatted_trade['exit_reason_display'] = '' formatted_trades.append(formatted_trade) result = { "total": len(trades), "trades": formatted_trades, "filters": { "start_date": start_date, "end_date": end_date, "period": period, "symbol": symbol, "status": status } } logger.debug(f"返回交易记录: {len(result['trades'])} 条 (限制: {limit})") return result except Exception as e: logger.error(f"获取交易记录失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/stats") async def get_trade_stats( start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d'"), symbol: Optional[str] = Query(None, description="交易对筛选") ): """获取交易统计""" try: logger.info(f"获取交易统计请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}") # 如果提供了 period,使用快速时间段筛选 if period: period_start, period_end = get_date_range(period) if period_start and period_end: start_date = period_start end_date = period_end # 格式化日期 if start_date and len(start_date) == 10: start_date = f"{start_date} 00:00:00" if end_date and len(end_date) == 10: end_date = f"{end_date} 23:59:59" trades = Trade.get_all(start_date, end_date, symbol, None) closed_trades = [t for t in trades if t['status'] == 'closed'] win_trades = [t for t in closed_trades if float(t['pnl']) > 0] stats = { "total_trades": len(trades), "closed_trades": len(closed_trades), "open_trades": len(trades) - len(closed_trades), "win_trades": len(win_trades), "loss_trades": len(closed_trades) - len(win_trades), "win_rate": len(win_trades) / len(closed_trades) * 100 if closed_trades else 0, "total_pnl": sum(float(t['pnl']) for t in closed_trades), "avg_pnl": sum(float(t['pnl']) for t in closed_trades) / len(closed_trades) if closed_trades else 0, "filters": { "start_date": start_date, "end_date": end_date, "period": period, "symbol": symbol } } logger.info(f"交易统计: 总交易数={stats['total_trades']}, 已平仓={stats['closed_trades']}, 胜率={stats['win_rate']:.2f}%, 总盈亏={stats['total_pnl']:.2f} USDT") return stats except Exception as e: logger.error(f"获取交易统计失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/sync-binance") async def sync_trades_from_binance( days: int = Query(7, ge=1, le=30, description="同步最近N天的订单") ): """ 从币安同步历史订单,确保数据库与币安一致 Args: days: 同步最近N天的订单(默认7天) """ try: logger.info(f"开始从币安同步历史订单(最近{days}天)...") # 导入必要的模块 trading_system_path = project_root / 'trading_system' if not trading_system_path.exists(): alternative_path = project_root / 'backend' / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path else: raise HTTPException(status_code=500, detail="交易系统模块不存在") sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient import config # 初始化客户端 client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) await client.connect() try: import time from datetime import datetime, timedelta # 计算时间范围 end_time = int(time.time() * 1000) # 当前时间(毫秒) start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000) # 获取所有已成交的订单(包括开仓和平仓) all_orders = [] try: # 获取所有交易对的订单 # 注意:币安API可能需要分交易对查询,这里先获取所有交易对 symbols = await client.client.futures_exchange_info() symbol_list = [s['symbol'] for s in symbols.get('symbols', []) if s.get('contractType') == 'PERPETUAL'] logger.info(f"开始同步 {len(symbol_list)} 个交易对的订单...") for symbol in symbol_list: try: # 获取该交易对的历史订单 orders = await client.client.futures_get_all_orders( symbol=symbol, startTime=start_time, endTime=end_time ) # 只保留已成交的订单 filled_orders = [o for o in orders if o.get('status') == 'FILLED'] all_orders.extend(filled_orders) # 避免请求过快 await asyncio.sleep(0.1) except Exception as e: logger.debug(f"获取 {symbol} 订单失败: {e}") continue logger.info(f"从币安获取到 {len(all_orders)} 个已成交订单") except Exception as e: logger.error(f"获取币安订单失败: {e}") raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}") # 同步订单到数据库 synced_count = 0 updated_count = 0 # 按时间排序,从旧到新 all_orders.sort(key=lambda x: x.get('time', 0)) for order in all_orders: symbol = order.get('symbol') order_id = order.get('orderId') side = order.get('side') quantity = float(order.get('executedQty', 0)) avg_price = float(order.get('avgPrice', 0)) order_time = datetime.fromtimestamp(order.get('time', 0) / 1000) reduce_only = order.get('reduceOnly', False) if quantity <= 0 or avg_price <= 0: continue try: if reduce_only: # 这是平仓订单 # 首先检查是否已经通过订单号同步过(避免重复) existing_trade = Trade.get_by_exit_order_id(order_id) if existing_trade: logger.debug(f"订单 {order_id} 已同步过,跳过") continue # 查找数据库中该交易对的open状态记录 open_trades = Trade.get_by_symbol(symbol, status='open') if open_trades: # 找到匹配的交易记录(通过symbol匹配,如果有多个则取最近的) trade = open_trades[0] # 取第一个 trade_id = trade['id'] # 计算盈亏 entry_price = float(trade['entry_price']) entry_quantity = float(trade['quantity']) # 使用实际成交数量(可能部分平仓) actual_quantity = min(quantity, entry_quantity) if trade['side'] == 'BUY': pnl = (avg_price - entry_price) * actual_quantity pnl_percent = ((avg_price - entry_price) / entry_price) * 100 else: # SELL pnl = (entry_price - avg_price) * actual_quantity pnl_percent = ((entry_price - avg_price) / entry_price) * 100 # 更新数据库(包含订单号) Trade.update_exit( trade_id=trade_id, exit_price=avg_price, exit_reason='sync', pnl=pnl, pnl_percent=pnl_percent, exit_order_id=order_id # 保存订单号,确保唯一性 ) updated_count += 1 logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})") else: # 这是开仓订单,检查数据库中是否已存在(通过订单号) existing_trade = Trade.get_by_entry_order_id(order_id) if not existing_trade: # 如果不存在,可以创建新记录(但需要更多信息,暂时跳过) logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建") else: logger.debug(f"开仓订单 {order_id} 已存在,跳过") except Exception as e: logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}") continue result = { "success": True, "message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)", "total_orders": len(all_orders), "updated_trades": updated_count, "close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]), "open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)]) } logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录") return result finally: await client.disconnect() except HTTPException: raise except Exception as e: logger.error(f"同步币安订单失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")