auto_trade_sys/backend/api/routes/stats.py
薇薇安 0a4bbd3132 a
2026-02-01 22:30:53 +08:00

345 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统计分析API
"""
from fastapi import APIRouter, Query, Header, Depends
import sys
from pathlib import Path
from datetime import datetime, timedelta
import logging
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
from database.models import AccountSnapshot, Trade, MarketScan, TradingSignal, Account
from fastapi import HTTPException
from api.auth_deps import get_account_id, get_admin_user
from typing import Dict, Any
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/admin/dashboard")
async def get_admin_dashboard_stats(user: Dict[str, Any] = Depends(get_admin_user)):
"""获取管理员仪表板数据(所有用户统计)"""
try:
accounts = Account.list_all()
stats = []
total_assets = 0
total_pnl = 0
active_accounts = 0
for acc in accounts:
aid = acc['id']
# 获取最新快照
snapshots = AccountSnapshot.get_recent(1, account_id=aid)
acc_stat = {
"id": aid,
"name": acc['name'],
"status": acc['status'],
"total_balance": 0,
"total_pnl": 0,
"open_positions": 0
}
if snapshots:
snap = snapshots[0]
acc_stat["total_balance"] = snap.get('total_balance', 0)
acc_stat["total_pnl"] = snap.get('total_pnl', 0)
acc_stat["open_positions"] = snap.get('open_positions', 0)
total_assets += float(acc_stat["total_balance"])
total_pnl += float(acc_stat["total_pnl"])
if acc['status'] == 'active':
active_accounts += 1
stats.append(acc_stat)
return {
"summary": {
"total_accounts": len(accounts),
"active_accounts": active_accounts,
"total_assets_usdt": total_assets,
"total_pnl_usdt": total_pnl
},
"accounts": stats
}
except Exception as e:
logger.error(f"获取管理员仪表板数据失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/performance")
async def get_performance_stats(
days: int = Query(7, ge=1, le=365),
account_id: int = Depends(get_account_id),
):
"""获取性能统计"""
try:
# 账户快照
snapshots = AccountSnapshot.get_recent(days, account_id=account_id)
# 交易统计
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
trades = Trade.get_all(start_date=start_date, account_id=account_id)
return {
"snapshots": snapshots,
"trades": trades,
"period": f"Last {days} days"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/dashboard")
async def get_dashboard_data(account_id: int = Depends(get_account_id)):
"""获取仪表板数据"""
logger.info("=" * 60)
logger.info(f"获取仪表板数据 - account_id={account_id}")
logger.info("=" * 60)
try:
account_data = None
account_error = None
# 优先尝试获取实时账户数据
try:
from api.routes.account import get_realtime_account_data
logger.info(f"调用 get_realtime_account_data(account_id={account_id})")
account_data = await get_realtime_account_data(account_id=account_id)
logger.info(f"成功获取实时账户数据,返回的 total_balance={account_data.get('total_balance', 'N/A') if account_data else 'N/A'}")
except HTTPException as e:
# HTTPException 需要特殊处理,提取错误信息
account_error = e.detail
logger.warning(f"获取实时账户数据失败 (HTTP {e.status_code}): {account_error}")
# 回退到数据库快照
try:
snapshots = AccountSnapshot.get_recent(1, account_id=account_id)
if snapshots:
account_data = {
"total_balance": snapshots[0].get('total_balance', 0),
"available_balance": snapshots[0].get('available_balance', 0),
"total_position_value": snapshots[0].get('total_position_value', 0),
"total_pnl": snapshots[0].get('total_pnl', 0),
"open_positions": snapshots[0].get('open_positions', 0)
}
logger.info("使用数据库快照作为账户数据")
else:
logger.warning("数据库中没有账户快照数据")
except Exception as db_error:
logger.error(f"从数据库获取账户快照失败: {db_error}")
except Exception as e:
account_error = str(e)
logger.warning(f"获取实时账户数据失败: {account_error}", exc_info=True)
# 回退到数据库快照
try:
snapshots = AccountSnapshot.get_recent(1, account_id=account_id)
if snapshots:
account_data = {
"total_balance": snapshots[0].get('total_balance', 0),
"available_balance": snapshots[0].get('available_balance', 0),
"total_position_value": snapshots[0].get('total_position_value', 0),
"total_pnl": snapshots[0].get('total_pnl', 0),
"open_positions": snapshots[0].get('open_positions', 0)
}
logger.info("使用数据库快照作为账户数据")
except Exception as db_error:
logger.error(f"从数据库获取账户快照失败: {db_error}")
# 获取持仓数据(优先实时,回退到数据库)
open_trades = []
positions_error = None
try:
from api.routes.account import get_realtime_positions
logger.info(f"调用 get_realtime_positions(account_id={account_id})")
positions = await get_realtime_positions(account_id=account_id)
# 转换为前端需要的格式
open_trades = positions
logger.info(f"成功获取实时持仓数据: {len(open_trades)} 个持仓 (account_id={account_id})")
except HTTPException as e:
positions_error = e.detail
logger.warning(f"获取实时持仓失败 (HTTP {e.status_code}): {positions_error}")
# 回退到数据库记录
try:
db_trades = Trade.get_all(status='open', account_id=account_id)[:10]
# 格式化数据库记录,添加 entry_value_usdt 字段
open_trades = []
for trade in db_trades:
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
leverage = float(trade.get('leverage', 1))
pnl = float(trade.get('pnl', 0))
# 数据库中的pnl_percent是价格涨跌幅需要转换为收益率
# 收益率 = 盈亏 / 保证金
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
formatted_trade = {
**trade,
'entry_value_usdt': entry_value_usdt,
'mark_price': trade.get('entry_price', 0), # 数据库中没有标记价,使用入场价
'pnl': pnl,
'pnl_percent': pnl_percent # 使用重新计算的收益率
}
open_trades.append(formatted_trade)
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
except Exception as db_error:
logger.error(f"从数据库获取持仓记录失败: {db_error}")
except Exception as e:
positions_error = str(e)
logger.warning(f"获取实时持仓失败: {positions_error}", exc_info=True)
# 回退到数据库记录
try:
db_trades = Trade.get_all(status='open', account_id=account_id)[:10]
# 格式化数据库记录,添加 entry_value_usdt 字段
open_trades = []
for trade in db_trades:
entry_value_usdt = float(trade.get('quantity', 0)) * float(trade.get('entry_price', 0))
leverage = float(trade.get('leverage', 1))
pnl = float(trade.get('pnl', 0))
# 数据库中的pnl_percent是价格涨跌幅需要转换为收益率
# 收益率 = 盈亏 / 保证金
margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt
pnl_percent = (pnl / margin * 100) if margin > 0 else 0
formatted_trade = {
**trade,
'entry_value_usdt': entry_value_usdt,
'mark_price': trade.get('entry_price', 0), # 数据库中没有标记价,使用入场价
'pnl': pnl,
'pnl_percent': pnl_percent # 使用重新计算的收益率
}
open_trades.append(formatted_trade)
logger.info(f"使用数据库记录作为持仓数据: {len(open_trades)} 个持仓")
except Exception as db_error:
logger.error(f"从数据库获取持仓记录失败: {db_error}")
# 最近的扫描记录
recent_scans = []
try:
recent_scans = MarketScan.get_recent(10)
except Exception as e:
logger.error(f"获取扫描记录失败: {e}")
# 最近的信号
recent_signals = []
try:
recent_signals = TradingSignal.get_recent(20)
except Exception as e:
logger.error(f"获取交易信号失败: {e}")
# 计算仓位占比信息
position_stats = None
if account_data:
try:
from database.models import TradingConfig
total_balance = float(account_data.get('total_balance', 0))
max_total_position_percent = float(TradingConfig.get_value('MAX_TOTAL_POSITION_PERCENT', 0.30, account_id=account_id))
# 名义仓位notional与保证金占用margin是两个口径
# - 名义仓位可以 > 100%(高杠杆下非常正常)
# - MAX_TOTAL_POSITION_PERCENT 在当前系统语义里是“保证金占用比例”
total_notional_value = float(account_data.get('total_position_value', 0))
# 优先使用 account_data 里的 total_margin_value如果没有则从 open_trades 汇总兜底
total_margin_value = account_data.get('total_margin_value', None)
try:
total_margin_value = float(total_margin_value) if total_margin_value is not None else None
except Exception:
total_margin_value = None
if total_margin_value is None:
total_margin_value = 0.0
for t in open_trades or []:
try:
mv = t.get("margin_usdt", None)
if mv is None:
# fallback名义/杠杆
nv = float(t.get("notional_usdt", 0) or 0)
lv = float(t.get("leverage", 0) or 0)
if lv <= 0:
lv = 1.0
mv = nv / lv
total_margin_value += float(mv or 0)
except Exception:
continue
# 当前仓位占比(保证金口径,与你的 MAX_TOTAL_POSITION_PERCENT 对齐)
current_margin_percent = (total_margin_value / total_balance * 100) if total_balance > 0 else 0
# 名义占比(仅用于参考)
current_notional_percent = (total_notional_value / total_balance * 100) if total_balance > 0 else 0
# 最大允许保证金USDT
max_margin_value = total_balance * max_total_position_percent
position_stats = {
# 兼容旧字段current_position_percent 现在代表“保证金占比”
"current_position_percent": round(current_margin_percent, 2),
"current_position_percent_type": "margin",
"current_notional_percent": round(current_notional_percent, 2),
"max_position_percent": round(max_total_position_percent * 100, 2),
# 兼容旧字段max_position_value/total_position_value 现在代表“保证金(USDT)”
"max_position_value": round(max_margin_value, 2),
"total_balance": round(total_balance, 2),
"total_position_value": round(total_margin_value, 2),
# 额外信息名义价值USDT用于解释“名义占比可能>100%”
"total_notional_value": round(total_notional_value, 2),
"total_margin_value": round(total_margin_value, 2),
}
except Exception as e:
logger.warning(f"计算仓位占比信息失败: {e}")
# 获取交易配置(用于前端显示止损止盈等参数)
trading_config = {}
try:
from database.models import TradingConfig
config_keys = ['STOP_LOSS_PERCENT', 'TAKE_PROFIT_PERCENT', 'LEVERAGE', 'MAX_POSITION_PERCENT']
for key in config_keys:
config = TradingConfig.get(key, account_id=account_id)
if config:
trading_config[key] = {
'value': TradingConfig._convert_value(config['config_value'], config['config_type']),
'type': config['config_type']
}
except Exception as e:
logger.debug(f"获取交易配置失败: {e}")
result = {
"account": account_data,
"open_trades": open_trades,
"recent_scans": recent_scans,
"recent_signals": recent_signals,
"position_stats": position_stats,
"trading_config": trading_config, # 添加交易配置
"_debug": { # 添加调试信息
"account_id": account_id,
"account_data_total_balance": account_data.get('total_balance', 'N/A') if account_data else 'N/A',
"open_trades_count": len(open_trades),
}
}
# 如果有错误,在响应中包含错误信息(但不影响返回)
if account_error or positions_error:
result["warnings"] = {}
if account_error:
result["warnings"]["account"] = account_error
if positions_error:
result["warnings"]["positions"] = positions_error
logger.info(f"返回仪表板数据:")
logger.info(f" - account_id: {account_id}")
logger.info(f" - total_balance: {account_data.get('total_balance', 'N/A') if account_data else 'N/A'}")
logger.info(f" - available_balance: {account_data.get('available_balance', 'N/A') if account_data else 'N/A'}")
logger.info(f" - open_trades count: {len(open_trades)}")
if open_trades and len(open_trades) > 0:
logger.info(f" - 第一个持仓: {open_trades[0].get('symbol', 'N/A')}")
logger.info("=" * 60)
return result
except Exception as e:
logger.error(f"获取仪表板数据失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取仪表板数据失败: {str(e)}")