auto_trade_sys/backend/api/routes/recommendations.py
薇薇安 b08d97b442 a
2026-01-15 11:34:53 +08:00

264 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对API路由
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from datetime import datetime, timedelta
from database.models import TradeRecommendation
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/recommendations", tags=["recommendations"])
@router.get("")
async def get_recommendations(
status: Optional[str] = Query(None, description="状态过滤: active, executed, expired, cancelled"),
direction: Optional[str] = Query(None, description="方向过滤: BUY, SELL"),
limit: int = Query(50, ge=1, le=200, description="返回数量限制"),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD)")
):
"""
获取推荐交易对列表
Args:
status: 状态过滤
direction: 方向过滤
limit: 返回数量限制
start_date: 开始日期
end_date: 结束日期
"""
try:
# 转换日期字符串
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
raise HTTPException(status_code=400, detail="开始日期格式错误,应为 YYYY-MM-DD")
if end_date:
try:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# 设置为当天的23:59:59
end_dt = end_dt.replace(hour=23, minute=59, second=59)
except ValueError:
raise HTTPException(status_code=400, detail="结束日期格式错误,应为 YYYY-MM-DD")
recommendations = TradeRecommendation.get_all(
status=status,
direction=direction,
limit=limit,
start_date=start_dt,
end_date=end_dt
)
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取推荐列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}")
@router.get("/active")
async def get_active_recommendations():
"""
获取当前有效的推荐(未过期、未执行、未取消)
"""
try:
recommendations = TradeRecommendation.get_active()
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取有效推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取有效推荐失败: {str(e)}")
@router.get("/{recommendation_id}")
async def get_recommendation(recommendation_id: int):
"""
根据ID获取推荐详情
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
return {
"success": True,
"data": recommendation
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取推荐详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐详情失败: {str(e)}")
@router.get("/symbol/{symbol}")
async def get_recommendations_by_symbol(
symbol: str,
limit: int = Query(10, ge=1, le=50, description="返回数量限制")
):
"""
根据交易对获取推荐记录
"""
try:
recommendations = TradeRecommendation.get_by_symbol(symbol, limit=limit)
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取交易对推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取交易对推荐失败: {str(e)}")
@router.post("/{recommendation_id}/execute")
async def mark_recommendation_executed(
recommendation_id: int,
trade_id: Optional[int] = None
):
"""
标记推荐已执行
Args:
recommendation_id: 推荐ID
trade_id: 关联的交易记录ID可选
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法标记为已执行"
)
TradeRecommendation.mark_executed(recommendation_id, trade_id)
return {
"success": True,
"message": "推荐已标记为已执行"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"标记推荐已执行失败: {e}")
raise HTTPException(status_code=500, detail=f"标记推荐已执行失败: {str(e)}")
@router.post("/{recommendation_id}/cancel")
async def cancel_recommendation(
recommendation_id: int,
notes: Optional[str] = None
):
"""
取消推荐
Args:
recommendation_id: 推荐ID
notes: 取消原因备注
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法取消"
)
TradeRecommendation.mark_cancelled(recommendation_id, notes)
return {
"success": True,
"message": "推荐已取消"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"取消推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"取消推荐失败: {str(e)}")
@router.post("/generate")
async def generate_recommendations(
min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度"),
max_recommendations: int = Query(20, ge=1, le=50, description="最大推荐数量")
):
"""
生成新的交易推荐
Args:
min_signal_strength: 最小信号强度
max_recommendations: 最大推荐数量
"""
try:
# 导入推荐器
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent.parent
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
raise HTTPException(status_code=500, detail="交易系统模块不存在")
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from trade_recommender import TradeRecommender
import config
# 初始化组件
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try:
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
# 生成推荐
recommendations = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations
)
return {
"success": True,
"count": len(recommendations),
"message": f"成功生成 {len(recommendations)} 个推荐",
"data": recommendations
}
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"生成推荐失败: {e}")
import traceback
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}")