""" 交易记录API """ from fastapi import APIRouter, Query, HTTPException from typing import Optional from datetime import datetime, timedelta import sys from pathlib import Path import logging import asyncio import time from datetime import timezone, timedelta project_root = Path(__file__).parent.parent.parent.parent sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root / 'backend')) from database.models import Trade router = APIRouter() # 在模块级别创建logger(与其他路由文件保持一致) logger = logging.getLogger(__name__) def get_date_range(period: Optional[str] = None): """ 根据时间段参数返回开始和结束日期 Args: period: 时间段 ('1d', '7d', '30d', 'custom') Returns: (start_date, end_date) 元组,格式为 'YYYY-MM-DD HH:MM:SS' """ # 使用当前时间作为结束时间,确保包含最新的单子 end_date = datetime.now() if period == '1d': # 最近1天:从今天00:00:00到现在(确保包含今天的所有单子) # 这样即使查询时已经是晚上,也能查到今天早上开的单子 start_date = end_date.replace(hour=0, minute=0, second=0, microsecond=0) elif period == '7d': # 最近7天:从7天前00:00:00到现在 start_date = (end_date - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0) elif period == '30d': # 最近30天:从30天前00:00:00到现在 start_date = (end_date - timedelta(days=30)).replace(hour=0, minute=0, second=0, microsecond=0) else: return None, None # 开始时间:使用计算出的开始日期的00:00:00 start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S') # 结束时间:使用当前时间,确保包含最新单子,北京时间 # 使用北京时间(东八区) beijing_tz = timezone(timedelta(hours=8)) end_date = datetime.now(beijing_tz) end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S') return start_date_str, end_date_str @router.get("") @router.get("/") async def get_trades( start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), period: Optional[str] = Query(None, description="快速时间段筛选: '1d'(最近1天), '7d'(最近7天), '30d'(最近30天)"), symbol: Optional[str] = Query(None, description="交易对筛选"), trade_type: Optional[str] = Query(None, description="交易类型筛选: 'buy', 'sell'"), exit_reason: Optional[str] = Query(None, description="平仓原因筛选: 'stop_loss', 'take_profit', 'trailing_stop', 'manual', 'sync'"), status: Optional[str] = Query(None, description="状态筛选: 'open', 'closed', 'cancelled'"), limit: int = Query(100, ge=1, le=1000, description="返回记录数限制") ): """ 获取交易记录 支持两种筛选方式: 1. 快速时间段筛选:使用 period 参数 ('1d', '7d', '30d') 2. 自定义时间段筛选:使用 start_date 和 end_date 参数 如果同时提供了 period 和 start_date/end_date,period 优先级更高 """ try: logger.info(f"获取交易记录请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}, status={status}, limit={limit}, trade_type={trade_type}, exit_reason={exit_reason}") # 如果提供了 period,使用快速时间段筛选 if period: period_start, period_end = get_date_range(period) if period_start and period_end: start_date = period_start end_date = period_end logger.info(f"使用快速时间段筛选: {period} -> {start_date} 至 {end_date}") # 格式化日期(如果只提供了日期,添加时间部分) if start_date and len(start_date) == 10: # YYYY-MM-DD start_date = f"{start_date} 00:00:00" if end_date and len(end_date) == 10: # YYYY-MM-DD end_date = f"{end_date} 23:59:59" trades = Trade.get_all(start_date, end_date, symbol, status, trade_type, exit_reason) logger.info(f"查询到 {len(trades)} 条交易记录") # 格式化交易记录,添加平仓类型的中文显示 formatted_trades = [] for trade in trades[:limit]: formatted_trade = dict(trade) # 将 exit_reason 转换为中文显示 exit_reason = trade.get('exit_reason', '') if exit_reason: exit_reason_map = { 'manual': '手动平仓', 'stop_loss': '自动平仓(止损)', 'take_profit': '自动平仓(止盈)', 'trailing_stop': '自动平仓(移动止损)', 'sync': '同步平仓' } formatted_trade['exit_reason_display'] = exit_reason_map.get(exit_reason, exit_reason) else: formatted_trade['exit_reason_display'] = '' formatted_trades.append(formatted_trade) result = { "total": len(trades), "trades": formatted_trades, "filters": { "start_date": start_date, "end_date": end_date, "period": period, "symbol": symbol, "status": status } } logger.debug(f"返回交易记录: {len(result['trades'])} 条 (限制: {limit})") return result except Exception as e: logger.error(f"获取交易记录失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/stats") async def get_trade_stats( start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"), period: Optional[str] = Query(None, description="快速时间段筛选: '1d', '7d', '30d'"), symbol: Optional[str] = Query(None, description="交易对筛选") ): """获取交易统计""" try: logger.info(f"获取交易统计请求: start_date={start_date}, end_date={end_date}, period={period}, symbol={symbol}") # 如果提供了 period,使用快速时间段筛选 if period: period_start, period_end = get_date_range(period) if period_start and period_end: start_date = period_start end_date = period_end # 格式化日期 if start_date and len(start_date) == 10: start_date = f"{start_date} 00:00:00" if end_date and len(end_date) == 10: end_date = f"{end_date} 23:59:59" trades = Trade.get_all(start_date, end_date, symbol, None) closed_trades = [t for t in trades if t['status'] == 'closed'] win_trades = [t for t in closed_trades if float(t['pnl']) > 0] stats = { "total_trades": len(trades), "closed_trades": len(closed_trades), "open_trades": len(trades) - len(closed_trades), "win_trades": len(win_trades), "loss_trades": len(closed_trades) - len(win_trades), "win_rate": len(win_trades) / len(closed_trades) * 100 if closed_trades else 0, "total_pnl": sum(float(t['pnl']) for t in closed_trades), "avg_pnl": sum(float(t['pnl']) for t in closed_trades) / len(closed_trades) if closed_trades else 0, "filters": { "start_date": start_date, "end_date": end_date, "period": period, "symbol": symbol } } logger.info(f"交易统计: 总交易数={stats['total_trades']}, 已平仓={stats['closed_trades']}, 胜率={stats['win_rate']:.2f}%, 总盈亏={stats['total_pnl']:.2f} USDT") return stats except Exception as e: logger.error(f"获取交易统计失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/sync-binance") async def sync_trades_from_binance( days: int = Query(7, ge=1, le=30, description="同步最近N天的订单") ): """ 从币安同步历史订单,确保数据库与币安一致 Args: days: 同步最近N天的订单(默认7天) """ try: logger.info(f"开始从币安同步历史订单(最近{days}天)...") # 导入必要的模块 trading_system_path = project_root / 'trading_system' if not trading_system_path.exists(): alternative_path = project_root / 'backend' / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path else: raise HTTPException(status_code=500, detail="交易系统模块不存在") sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient import config # 初始化客户端 client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) await client.connect() try: import time from datetime import datetime, timedelta # 计算时间范围 end_time = int(time.time() * 1000) # 当前时间(毫秒) start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000) # 获取所有已成交的订单(包括开仓和平仓) all_orders = [] try: # 获取所有交易对的订单 # 注意:币安API可能需要分交易对查询,这里先获取所有交易对 symbols = await client.client.futures_exchange_info() symbol_list = [s['symbol'] for s in symbols.get('symbols', []) if s.get('contractType') == 'PERPETUAL'] logger.info(f"开始同步 {len(symbol_list)} 个交易对的订单...") for symbol in symbol_list: try: # 获取该交易对的历史订单 orders = await client.client.futures_get_all_orders( symbol=symbol, startTime=start_time, endTime=end_time ) # 只保留已成交的订单 filled_orders = [o for o in orders if o.get('status') == 'FILLED'] all_orders.extend(filled_orders) # 避免请求过快 await asyncio.sleep(0.1) except Exception as e: logger.debug(f"获取 {symbol} 订单失败: {e}") continue logger.info(f"从币安获取到 {len(all_orders)} 个已成交订单") except Exception as e: logger.error(f"获取币安订单失败: {e}") raise HTTPException(status_code=500, detail=f"获取币安订单失败: {str(e)}") # 同步订单到数据库 synced_count = 0 updated_count = 0 # 按时间排序,从旧到新 all_orders.sort(key=lambda x: x.get('time', 0)) for order in all_orders: symbol = order.get('symbol') order_id = order.get('orderId') side = order.get('side') quantity = float(order.get('executedQty', 0)) avg_price = float(order.get('avgPrice', 0)) order_time = datetime.fromtimestamp(order.get('time', 0) / 1000) reduce_only = order.get('reduceOnly', False) if quantity <= 0 or avg_price <= 0: continue try: if reduce_only: # 这是平仓订单 # 首先检查是否已经通过订单号同步过(避免重复) existing_trade = Trade.get_by_exit_order_id(order_id) if existing_trade: logger.debug(f"订单 {order_id} 已同步过,跳过") continue # 查找数据库中该交易对的open状态记录 open_trades = Trade.get_by_symbol(symbol, status='open') if open_trades: # 找到匹配的交易记录(通过symbol匹配,如果有多个则取最近的) trade = open_trades[0] # 取第一个 trade_id = trade['id'] # 计算盈亏 entry_price = float(trade['entry_price']) entry_quantity = float(trade['quantity']) # 使用实际成交数量(可能部分平仓) actual_quantity = min(quantity, entry_quantity) if trade['side'] == 'BUY': pnl = (avg_price - entry_price) * actual_quantity pnl_percent = ((avg_price - entry_price) / entry_price) * 100 else: # SELL pnl = (entry_price - avg_price) * actual_quantity pnl_percent = ((entry_price - avg_price) / entry_price) * 100 # 更新数据库(包含订单号) Trade.update_exit( trade_id=trade_id, exit_price=avg_price, exit_reason='sync', pnl=pnl, pnl_percent=pnl_percent, exit_order_id=order_id # 保存订单号,确保唯一性 ) updated_count += 1 logger.debug(f"✓ 更新平仓记录: {symbol} (ID: {trade_id}, 订单号: {order_id}, 成交价: {avg_price:.4f})") else: # 这是开仓订单,检查数据库中是否已存在(通过订单号) existing_trade = Trade.get_by_entry_order_id(order_id) if not existing_trade: # 如果不存在,可以创建新记录(但需要更多信息,暂时跳过) logger.debug(f"发现新的开仓订单 {order_id},但缺少必要信息,跳过创建") else: logger.debug(f"开仓订单 {order_id} 已存在,跳过") except Exception as e: logger.warning(f"同步订单失败 {symbol} (订单ID: {order_id}): {e}") continue result = { "success": True, "message": f"同步完成:更新了 {updated_count} 条平仓记录(基于订单号匹配,确保唯一性)", "total_orders": len(all_orders), "updated_trades": updated_count, "close_orders": len([o for o in all_orders if o.get('reduceOnly', False)]), "open_orders": len([o for o in all_orders if not o.get('reduceOnly', False)]) } logger.info(f"✓ 同步完成:处理了 {len(all_orders)} 个订单,更新了 {updated_count} 条记录") return result finally: await client.disconnect() except HTTPException: raise except Exception as e: logger.error(f"同步币安订单失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"同步币安订单失败: {str(e)}")