auto_trade_sys/backend/api/routes/recommendations.py
薇薇安 2e672d1f25 a
2026-01-15 16:58:05 +08:00

390 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对API路由
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from datetime import datetime, timedelta
from database.models import TradeRecommendation
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/recommendations", tags=["recommendations"])
@router.get("")
async def get_recommendations(
status: Optional[str] = Query(None, description="状态过滤: active, executed, expired, cancelled"),
direction: Optional[str] = Query(None, description="方向过滤: BUY, SELL"),
limit: int = Query(50, ge=1, le=200, description="返回数量限制"),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD)")
):
"""
获取推荐交易对列表
Args:
status: 状态过滤
direction: 方向过滤
limit: 返回数量限制
start_date: 开始日期
end_date: 结束日期
"""
try:
# 转换日期字符串
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
raise HTTPException(status_code=400, detail="开始日期格式错误,应为 YYYY-MM-DD")
if end_date:
try:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# 设置为当天的23:59:59
end_dt = end_dt.replace(hour=23, minute=59, second=59)
except ValueError:
raise HTTPException(status_code=400, detail="结束日期格式错误,应为 YYYY-MM-DD")
recommendations = TradeRecommendation.get_all(
status=status,
direction=direction,
limit=limit,
start_date=start_dt,
end_date=end_dt
)
# 如果是获取有效推荐,尝试更新实时价格
if status == 'active' or (status is None and len(recommendations) > 0):
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if trading_system_path.exists():
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
import config
# 创建客户端实例(不需要连接,只需要访问价格缓存)
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
# 更新推荐中的实时价格和涨跌幅
for rec in recommendations:
symbol = rec.get('symbol')
if symbol:
# 尝试从WebSocket缓存获取实时价格
realtime_price = client.get_realtime_price(symbol)
if realtime_price is not None:
# 更新价格
old_price = rec.get('current_price', 0)
rec['current_price'] = realtime_price
# 计算新的涨跌幅
if old_price > 0:
change_percent = ((realtime_price - old_price) / old_price) * 100
rec['change_percent'] = round(change_percent, 4)
rec['price_updated'] = True # 标记价格已更新
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
except Exception as price_update_error:
# 如果价格更新失败,不影响返回推荐列表
logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}")
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取推荐列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}")
@router.get("/active")
async def get_active_recommendations():
"""
获取当前有效的推荐(未过期、未执行、未取消)
使用WebSocket实时价格更新推荐中的价格信息
同一交易对只返回最新的推荐(已去重)
"""
try:
recommendations = TradeRecommendation.get_active()
# 确保时间格式正确转换为ISO格式字符串包含时区信息
from datetime import datetime
for rec in recommendations:
if rec.get('recommendation_time'):
if isinstance(rec['recommendation_time'], datetime):
# 如果是datetime对象转换为ISO格式字符串UTC+8
rec['recommendation_time'] = rec['recommendation_time'].isoformat()
elif isinstance(rec['recommendation_time'], str):
# 如果已经是字符串,确保格式正确
pass
# 尝试从WebSocket缓存获取实时价格更新推荐中的价格
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if trading_system_path.exists():
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
import config
# 创建客户端实例(不需要连接,只需要访问价格缓存)
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
# 更新推荐中的实时价格和涨跌幅
for rec in recommendations:
symbol = rec.get('symbol')
if symbol:
# 尝试从WebSocket缓存获取实时价格
realtime_price = client.get_realtime_price(symbol)
if realtime_price is not None:
# 更新价格
old_price = rec.get('current_price', 0)
rec['current_price'] = realtime_price
# 计算新的涨跌幅
if old_price > 0:
change_percent = ((realtime_price - old_price) / old_price) * 100
rec['change_percent'] = round(change_percent, 4)
rec['price_updated'] = True # 标记价格已更新
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
except Exception as price_update_error:
# 如果价格更新失败,不影响返回推荐列表
logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}")
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取有效推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取有效推荐失败: {str(e)}")
@router.get("/{recommendation_id}")
async def get_recommendation(recommendation_id: int):
"""
根据ID获取推荐详情
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
return {
"success": True,
"data": recommendation
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取推荐详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐详情失败: {str(e)}")
@router.get("/symbol/{symbol}")
async def get_recommendations_by_symbol(
symbol: str,
limit: int = Query(10, ge=1, le=50, description="返回数量限制")
):
"""
根据交易对获取推荐记录
"""
try:
recommendations = TradeRecommendation.get_by_symbol(symbol, limit=limit)
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取交易对推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取交易对推荐失败: {str(e)}")
@router.post("/{recommendation_id}/execute")
async def mark_recommendation_executed(
recommendation_id: int,
trade_id: Optional[int] = None
):
"""
标记推荐已执行
Args:
recommendation_id: 推荐ID
trade_id: 关联的交易记录ID可选
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法标记为已执行"
)
TradeRecommendation.mark_executed(recommendation_id, trade_id)
return {
"success": True,
"message": "推荐已标记为已执行"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"标记推荐已执行失败: {e}")
raise HTTPException(status_code=500, detail=f"标记推荐已执行失败: {str(e)}")
@router.post("/{recommendation_id}/cancel")
async def cancel_recommendation(
recommendation_id: int,
notes: Optional[str] = None
):
"""
取消推荐
Args:
recommendation_id: 推荐ID
notes: 取消原因备注
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法取消"
)
TradeRecommendation.mark_cancelled(recommendation_id, notes)
return {
"success": True,
"message": "推荐已取消"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"取消推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"取消推荐失败: {str(e)}")
@router.post("/generate")
async def generate_recommendations(
min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度"),
max_recommendations: int = Query(20, ge=1, le=50, description="最大推荐数量")
):
"""
生成新的交易推荐
Args:
min_signal_strength: 最小信号强度
max_recommendations: 最大推荐数量
"""
try:
# 导入推荐器
import sys
from pathlib import Path
# 从 backend/api/routes/recommendations.py 向上找到项目根目录
# backend/api/routes -> backend/api -> backend -> 项目根目录
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent # backend目录
project_root = backend_path.parent # 项目根目录
trading_system_path = project_root / 'trading_system'
logger.info(f"查找交易系统模块: {trading_system_path}")
logger.info(f"路径存在: {trading_system_path.exists()}")
if not trading_system_path.exists():
# 尝试其他可能的路径
alternative_path = backend_path / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
logger.info(f"使用备用路径: {trading_system_path}")
else:
raise HTTPException(
status_code=500,
detail=f"交易系统模块不存在。查找路径: {trading_system_path}, 备用路径: {alternative_path}"
)
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from trade_recommender import TradeRecommender
import config
# 初始化组件
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try:
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
# 生成推荐
recommendations = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations
)
return {
"success": True,
"count": len(recommendations),
"message": f"成功生成 {len(recommendations)} 个推荐",
"data": recommendations
}
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"生成推荐失败: {e}")
import traceback
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}")