""" 推荐交易对API路由 """ from fastapi import APIRouter, HTTPException, Query from typing import Optional, List from datetime import datetime, timedelta from database.models import TradeRecommendation import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/recommendations", tags=["recommendations"]) @router.get("") async def get_recommendations( type: Optional[str] = Query('realtime', description="类型: realtime(实时推荐), bookmarked(已标记的推荐)"), status: Optional[str] = Query(None, description="状态过滤: active, executed, expired, cancelled (仅用于bookmarked类型)"), direction: Optional[str] = Query(None, description="方向过滤: BUY, SELL"), limit: int = Query(50, ge=1, le=200, description="返回数量限制"), start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD, 仅用于bookmarked类型)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD, 仅用于bookmarked类型)"), min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度 (仅用于realtime类型)") ): """ 获取推荐交易对列表 Args: type: 类型 - realtime(实时推荐,基于当前行情), bookmarked(已标记的推荐,从数据库查询) status: 状态过滤(仅用于bookmarked类型) direction: 方向过滤 limit: 返回数量限制 start_date: 开始日期(仅用于bookmarked类型) end_date: 结束日期(仅用于bookmarked类型) min_signal_strength: 最小信号强度(仅用于realtime类型) """ try: if type == 'realtime': # 从Redis缓存读取推荐(如果存在) import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if not trading_system_path.exists(): alternative_path = backend_path / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path else: raise HTTPException( status_code=500, detail=f"交易系统模块不存在" ) sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient import config # 创建客户端(用于访问Redis) client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) # 连接Redis(如果还没有连接) try: await client.redis_cache.connect() except: pass # 从Redis读取推荐 recommendations = [] cache_available = False try: import time cache_key = "recommendations:realtime" cached_data = await client.redis_cache.hgetall(cache_key) if cached_data: cache_available = True current_time = time.time() max_age = 3600 * 2 # 推荐最大保留时间:2小时 # 过滤过期推荐 for rec in cached_data.values(): if isinstance(rec, dict): rec_timestamp = rec.get('timestamp', 0) # 如果推荐时间超过2小时,跳过 if current_time - rec_timestamp > max_age: continue recommendations.append(rec) # 按时间戳降序排序(最新的在前) recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True) logger.info(f"从Redis读取到 {len(recommendations)} 个有效推荐(已过滤过期)") except Exception as e: logger.warning(f"从Redis读取推荐失败: {e}") # 如果Redis中没有推荐,实时生成 if not recommendations: logger.info("Redis中没有推荐,实时生成推荐...") try: from market_scanner import MarketScanner from risk_manager import RiskManager from trade_recommender import TradeRecommender await client.connect() try: scanner = MarketScanner(client) risk_manager = RiskManager(client) recommender = TradeRecommender(client, scanner, risk_manager) # 生成推荐(会自动保存到Redis) recommendations = await recommender.generate_recommendations( min_signal_strength=min_signal_strength, max_recommendations=limit, add_to_cache=True, min_quality_score=0.0 ) logger.info(f"实时生成了 {len(recommendations)} 个推荐") finally: await client.disconnect() except Exception as e: logger.error(f"实时生成推荐失败: {e}") import traceback logger.error(traceback.format_exc()) # 方向过滤 if direction: recommendations = [r for r in recommendations if r.get('direction') == direction] # 限制返回数量 recommendations = recommendations[:limit] return { "success": True, "count": len(recommendations), "type": "realtime", "from_cache": cache_available, "data": recommendations } elif type == 'bookmarked': # 从数据库查询已标记的推荐 # 转换日期字符串 start_dt = None end_dt = None if start_date: try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") except ValueError: raise HTTPException(status_code=400, detail="开始日期格式错误,应为 YYYY-MM-DD") if end_date: try: end_dt = datetime.strptime(end_date, "%Y-%m-%d") end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: raise HTTPException(status_code=400, detail="结束日期格式错误,应为 YYYY-MM-DD") recommendations = TradeRecommendation.get_all( status=status, direction=direction, limit=limit, start_date=start_dt, end_date=end_dt ) # 更新实时价格 try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if trading_system_path.exists(): sys.path.insert(0, str(trading_system_path)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) # 连接Redis(如果还没有连接) try: await client.redis_cache.connect() except: pass # 更新推荐中的实时价格和涨跌幅 for rec in recommendations: symbol = rec.get('symbol') if symbol: # 先尝试从内存缓存获取实时价格(同步) realtime_price = client.get_realtime_price(symbol) # 如果内存缓存没有,尝试从Valkey异步读取 if realtime_price is None: try: realtime_price = await client.get_realtime_price_async(symbol) except: pass # 如果WebSocket/Valkey都没有,尝试从REST API获取 if realtime_price is None: try: ticker = await client.get_ticker_24h(symbol) if ticker: realtime_price = float(ticker.get('lastPrice', 0)) except: pass if realtime_price is not None and realtime_price > 0: old_price = rec.get('current_price', 0) rec['current_price'] = realtime_price if old_price > 0: change_percent = ((realtime_price - old_price) / old_price) * 100 rec['change_percent'] = round(change_percent, 4) rec['price_updated'] = True else: rec['price_updated'] = False else: rec['price_updated'] = False else: rec['price_updated'] = False except Exception as price_update_error: logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}") return { "success": True, "count": len(recommendations), "type": "bookmarked", "data": recommendations } else: raise HTTPException(status_code=400, detail="type参数必须是 'realtime' 或 'bookmarked'") except HTTPException: raise except Exception as e: logger.error(f"获取推荐列表失败: {e}") raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}") @router.get("/active") async def get_active_recommendations(): """ 获取当前有效的推荐(已标记的推荐中未过期、未执行、未取消的) 使用WebSocket实时价格更新推荐中的价格信息 同一交易对只返回最新的推荐(已去重) """ try: recommendations = TradeRecommendation.get_active() # 确保时间格式正确(转换为ISO格式字符串,包含时区信息) from datetime import datetime for rec in recommendations: if rec.get('recommendation_time'): if isinstance(rec['recommendation_time'], datetime): rec['recommendation_time'] = rec['recommendation_time'].isoformat() # 尝试从WebSocket缓存获取实时价格更新推荐中的价格 try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if trading_system_path.exists(): sys.path.insert(0, str(trading_system_path)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) # 连接Redis(如果还没有连接) try: await client.redis_cache.connect() except: pass # 更新推荐中的实时价格和涨跌幅 for rec in recommendations: symbol = rec.get('symbol') if symbol: # 先尝试从内存缓存获取实时价格(同步) realtime_price = client.get_realtime_price(symbol) # 如果内存缓存没有,尝试从Valkey异步读取 if realtime_price is None: try: realtime_price = await client.get_realtime_price_async(symbol) except: pass # 如果WebSocket/Valkey都没有,尝试从REST API获取 if realtime_price is None: try: ticker = await client.get_ticker_24h(symbol) if ticker: realtime_price = float(ticker.get('lastPrice', 0)) except: pass if realtime_price is not None and realtime_price > 0: old_price = rec.get('current_price', 0) rec['current_price'] = realtime_price if old_price > 0: change_percent = ((realtime_price - old_price) / old_price) * 100 rec['change_percent'] = round(change_percent, 4) rec['price_updated'] = True else: rec['price_updated'] = False else: rec['price_updated'] = False else: rec['price_updated'] = False except Exception as price_update_error: logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}") return { "success": True, "count": len(recommendations), "data": recommendations } except Exception as e: logger.error(f"获取有效推荐失败: {e}") raise HTTPException(status_code=500, detail=f"获取有效推荐失败: {str(e)}") @router.get("/{recommendation_id}") async def get_recommendation(recommendation_id: int): """ 根据ID获取推荐详情 """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") return { "success": True, "data": recommendation } except HTTPException: raise except Exception as e: logger.error(f"获取推荐详情失败: {e}") raise HTTPException(status_code=500, detail=f"获取推荐详情失败: {str(e)}") @router.get("/symbol/{symbol}") async def get_recommendations_by_symbol( symbol: str, limit: int = Query(10, ge=1, le=50, description="返回数量限制") ): """ 根据交易对获取推荐记录 """ try: recommendations = TradeRecommendation.get_by_symbol(symbol, limit=limit) return { "success": True, "count": len(recommendations), "data": recommendations } except Exception as e: logger.error(f"获取交易对推荐失败: {e}") raise HTTPException(status_code=500, detail=f"获取交易对推荐失败: {str(e)}") @router.post("/{recommendation_id}/execute") async def mark_recommendation_executed( recommendation_id: int, trade_id: Optional[int] = None ): """ 标记推荐已执行 Args: recommendation_id: 推荐ID trade_id: 关联的交易记录ID(可选) """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") if recommendation['status'] != 'active': raise HTTPException( status_code=400, detail=f"推荐状态为 {recommendation['status']},无法标记为已执行" ) TradeRecommendation.mark_executed(recommendation_id, trade_id) return { "success": True, "message": "推荐已标记为已执行" } except HTTPException: raise except Exception as e: logger.error(f"标记推荐已执行失败: {e}") raise HTTPException(status_code=500, detail=f"标记推荐已执行失败: {str(e)}") @router.post("/{recommendation_id}/cancel") async def cancel_recommendation( recommendation_id: int, notes: Optional[str] = None ): """ 取消推荐 Args: recommendation_id: 推荐ID notes: 取消原因备注 """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") if recommendation['status'] != 'active': raise HTTPException( status_code=400, detail=f"推荐状态为 {recommendation['status']},无法取消" ) TradeRecommendation.mark_cancelled(recommendation_id, notes) return { "success": True, "message": "推荐已取消" } except HTTPException: raise except Exception as e: logger.error(f"取消推荐失败: {e}") raise HTTPException(status_code=500, detail=f"取消推荐失败: {str(e)}") @router.post("/generate") async def generate_recommendations( min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度"), max_recommendations: int = Query(20, ge=1, le=50, description="最大推荐数量") ): """ 生成新的交易推荐 Args: min_signal_strength: 最小信号强度 max_recommendations: 最大推荐数量 """ try: # 导入推荐器 import sys from pathlib import Path # 从 backend/api/routes/recommendations.py 向上找到项目根目录 # backend/api/routes -> backend/api -> backend -> 项目根目录 current_file = Path(__file__) backend_path = current_file.parent.parent.parent # backend目录 project_root = backend_path.parent # 项目根目录 trading_system_path = project_root / 'trading_system' logger.info(f"查找交易系统模块: {trading_system_path}") logger.info(f"路径存在: {trading_system_path.exists()}") if not trading_system_path.exists(): # 尝试其他可能的路径 alternative_path = backend_path / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path logger.info(f"使用备用路径: {trading_system_path}") else: raise HTTPException( status_code=500, detail=f"交易系统模块不存在。查找路径: {trading_system_path}, 备用路径: {alternative_path}" ) sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from trade_recommender import TradeRecommender import config # 初始化组件 client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) await client.connect() try: scanner = MarketScanner(client) risk_manager = RiskManager(client) recommender = TradeRecommender(client, scanner, risk_manager) # 生成推荐(增量添加到Redis缓存) recommendations = await recommender.generate_recommendations( min_signal_strength=min_signal_strength, max_recommendations=max_recommendations, add_to_cache=True, # 添加到Redis缓存 min_quality_score=0.0 # 不过滤,保留所有推荐 ) return { "success": True, "count": len(recommendations), "message": f"成功生成 {len(recommendations)} 个推荐", "data": recommendations } finally: await client.disconnect() except HTTPException: raise except Exception as e: logger.error(f"生成推荐失败: {e}") import traceback logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}") @router.post("/clear-cache") async def clear_recommendations_cache(): """ 清理Redis中的推荐缓存 """ try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if not trading_system_path.exists(): alternative_path = backend_path / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) try: await client.redis_cache.connect() cache_key = "recommendations:realtime" await client.redis_cache.delete(cache_key) return { "success": True, "message": "推荐缓存已清理" } finally: await client.disconnect() except Exception as e: logger.error(f"清理推荐缓存失败: {e}") raise HTTPException(status_code=500, detail=f"清理推荐缓存失败: {str(e)}") @router.post("/bookmark") async def bookmark_recommendation(recommendation_data: dict): """ 标记推荐到数据库(用于复盘) Args: recommendation_data: 推荐数据(包含所有推荐字段) """ try: # 提取必需字段 required_fields = ['symbol', 'direction', 'current_price', 'change_percent', 'recommendation_reason', 'signal_strength'] for field in required_fields: if field not in recommendation_data: raise HTTPException(status_code=400, detail=f"缺少必需字段: {field}") # 保存到数据库 recommendation_id = TradeRecommendation.create( symbol=recommendation_data.get('symbol'), direction=recommendation_data.get('direction'), current_price=recommendation_data.get('current_price'), change_percent=recommendation_data.get('change_percent'), recommendation_reason=recommendation_data.get('recommendation_reason'), signal_strength=recommendation_data.get('signal_strength'), market_regime=recommendation_data.get('market_regime'), trend_4h=recommendation_data.get('trend_4h'), rsi=recommendation_data.get('rsi'), macd_histogram=recommendation_data.get('macd_histogram'), bollinger_upper=recommendation_data.get('bollinger_upper'), bollinger_middle=recommendation_data.get('bollinger_middle'), bollinger_lower=recommendation_data.get('bollinger_lower'), ema20=recommendation_data.get('ema20'), ema50=recommendation_data.get('ema50'), ema20_4h=recommendation_data.get('ema20_4h'), atr=recommendation_data.get('atr'), suggested_stop_loss=recommendation_data.get('suggested_stop_loss'), suggested_take_profit_1=recommendation_data.get('suggested_take_profit_1'), suggested_take_profit_2=recommendation_data.get('suggested_take_profit_2'), suggested_position_percent=recommendation_data.get('suggested_position_percent'), suggested_leverage=recommendation_data.get('suggested_leverage', 10), order_type=recommendation_data.get('order_type', 'LIMIT'), suggested_limit_price=recommendation_data.get('suggested_limit_price'), volume_24h=recommendation_data.get('volume_24h'), volatility=recommendation_data.get('volatility'), notes=recommendation_data.get('notes', '用户标记用于复盘') ) logger.info(f"✓ 推荐已标记到数据库: {recommendation_data.get('symbol')} {recommendation_data.get('direction')} (ID: {recommendation_id})") return { "success": True, "message": "推荐已标记到数据库", "recommendation_id": recommendation_id } except HTTPException: raise except Exception as e: logger.error(f"标记推荐失败: {e}") import traceback logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"标记推荐失败: {str(e)}")