""" 推荐交易对API路由 """ from fastapi import APIRouter, HTTPException, Query from typing import Optional, List from datetime import datetime, timedelta from database.models import TradeRecommendation import logging import os import json from typing import Any, Dict, Tuple import aiohttp try: import redis.asyncio as redis_async except Exception: # pragma: no cover redis_async = None logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/recommendations", tags=["recommendations"]) REDIS_KEY_RECOMMENDATIONS_SNAPSHOT = "recommendations:snapshot" REDIS_KEY_RECOMMENDATIONS_HASH = "recommendations:realtime" REDIS_KEY_MARK_PRICE_ALL = "market:mark_price:all" REDIS_KEY_MARK_PRICE_ALL_LOCK = "lock:market:mark_price:all" def _beijing_time_str() -> str: from datetime import timezone return datetime.now(tz=timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S") def _redis_connection_kwargs() -> Tuple[str, Dict[str, Any]]: """ 从环境变量构建 Redis 连接参数(兼容 Valkey Serverless + TLS)。 """ redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" username = os.getenv("REDIS_USERNAME", None) password = os.getenv("REDIS_PASSWORD", None) ssl_cert_reqs = (os.getenv("REDIS_SSL_CERT_REQS", "required") or "required").strip() ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) kwargs: Dict[str, Any] = {"decode_responses": True} if username: kwargs["username"] = username if password: kwargs["password"] = password use_tls = redis_url.startswith("rediss://") or (os.getenv("REDIS_USE_TLS", "False").lower() == "true") if use_tls and not redis_url.startswith("rediss://"): if redis_url.startswith("redis://"): redis_url = redis_url.replace("redis://", "rediss://", 1) else: redis_url = f"rediss://{redis_url}" if use_tls or redis_url.startswith("rediss://"): kwargs["ssl_cert_reqs"] = ssl_cert_reqs if ssl_ca_certs: kwargs["ssl_ca_certs"] = ssl_ca_certs # redis-py: ssl_check_hostname 用于控制主机名校验 if ssl_cert_reqs == "required": kwargs["ssl_check_hostname"] = True else: kwargs["ssl_check_hostname"] = False return redis_url, kwargs async def _get_redis(): if redis_async is None: return None redis_url, kwargs = _redis_connection_kwargs() try: client = redis_async.from_url(redis_url, **kwargs) await client.ping() return client except Exception as e: logger.warning(f"Redis 不可用(recommendations): {e}") return None async def _get_cached_json(client, key: str) -> Optional[Any]: try: raw = await client.get(key) if not raw: return None return json.loads(raw) except Exception: return None async def _set_cached_json(client, key: str, value: Any, ttl_sec: int) -> None: await client.setex(key, ttl_sec, json.dumps(value, ensure_ascii=False)) async def _refresh_mark_price_all_if_needed(client, min_refresh_sec: int = 10, ttl_sec: int = 30) -> Dict[str, Any]: """ 获取并缓存 Binance U本位合约 Mark Price(premiumIndex)。 - 使用单 key 全量缓存,适合给“推荐页面当前价格”用 - 用 Redis NX lock 防止并发刷新风暴 """ now_ms = int(__import__("time").time() * 1000) cached = await _get_cached_json(client, REDIS_KEY_MARK_PRICE_ALL) if isinstance(cached, dict): updated_at_ms = cached.get("updated_at_ms") try: updated_at_ms = int(updated_at_ms) if updated_at_ms is not None else None except Exception: updated_at_ms = None if updated_at_ms and (now_ms - updated_at_ms) < (min_refresh_sec * 1000): return cached # 尝试抢锁刷新(锁很短,避免慢请求时阻塞) try: locked = await client.set(REDIS_KEY_MARK_PRICE_ALL_LOCK, str(now_ms), nx=True, ex=min_refresh_sec) except Exception: locked = False if not locked: # 没拿到锁就直接返回旧值(如果有) return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}} # 真的去 Binance 拉一次 url = "https://fapi.binance.com/fapi/v1/premiumIndex" try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=6)) as session: async with session.get(url) as resp: if resp.status != 200: raise RuntimeError(f"binance premiumIndex http={resp.status}") payload = await resp.json() except Exception as e: logger.warning(f"刷新 mark price 失败: {e}") # 刷新失败,返回旧值 return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}} items: Dict[str, Dict[str, Any]] = {} if isinstance(payload, list): for row in payload: if not isinstance(row, dict): continue sym = row.get("symbol") if not sym or not isinstance(sym, str): continue # 只保留 USDT 永续相关(避免无关数据撑爆) if not sym.endswith("USDT"): continue try: mp = float(row.get("markPrice", 0) or 0) except Exception: mp = 0.0 t = row.get("time") # 毫秒 try: t = int(t) if t is not None else None except Exception: t = None items[sym] = {"markPrice": mp, "time": t} snapshot = { "updated_at_ms": now_ms, "updated_at": _beijing_time_str(), "items": items, "source": "binance_premiumIndex", } try: await _set_cached_json(client, REDIS_KEY_MARK_PRICE_ALL, snapshot, ttl_sec=ttl_sec) except Exception as e: logger.warning(f"写入 {REDIS_KEY_MARK_PRICE_ALL} 失败(不影响返回): {e}") return snapshot @router.get("") async def get_recommendations( type: Optional[str] = Query('realtime', description="类型: realtime(实时推荐), bookmarked(已标记的推荐)"), status: Optional[str] = Query(None, description="状态过滤: active, executed, expired, cancelled (仅用于bookmarked类型)"), direction: Optional[str] = Query(None, description="方向过滤: BUY, SELL"), limit: int = Query(50, ge=1, le=200, description="返回数量限制"), start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD, 仅用于bookmarked类型)"), end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD, 仅用于bookmarked类型)"), min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度 (仅用于realtime类型)") ): """ 获取推荐交易对列表 Args: type: 类型 - realtime(实时推荐,基于当前行情), bookmarked(已标记的推荐,从数据库查询) status: 状态过滤(仅用于bookmarked类型) direction: 方向过滤 limit: 返回数量限制 start_date: 开始日期(仅用于bookmarked类型) end_date: 结束日期(仅用于bookmarked类型) min_signal_strength: 最小信号强度(仅用于realtime类型) """ try: if type == 'realtime': # 实时推荐:统一只读 Redis(全局一份 snapshot,不在请求里实时生成,避免“页面刷新=触发扫描”) recommendations = [] cache_available = False snapshot_meta: Dict[str, Any] = {} rds = await _get_redis() if rds is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法读取推荐缓存") # 1) 优先读取 snapshot(单 key) try: snapshot = await _get_cached_json(rds, REDIS_KEY_RECOMMENDATIONS_SNAPSHOT) if isinstance(snapshot, dict): items = snapshot.get("items", []) if isinstance(items, list) and items: recommendations = items cache_available = True snapshot_meta = { "generated_at_ms": snapshot.get("generated_at_ms"), "generated_at": snapshot.get("generated_at"), "ttl_sec": snapshot.get("ttl_sec"), } except Exception as e: logger.warning(f"读取 {REDIS_KEY_RECOMMENDATIONS_SNAPSHOT} 失败: {e}") # 2) 兼容旧缓存(Hash) if not recommendations: try: import time as _time cached_data = await rds.hgetall(REDIS_KEY_RECOMMENDATIONS_HASH) if cached_data: cache_available = True current_time = _time.time() max_age = 3600 * 2 # 推荐最大保留时间:2小时 for raw in cached_data.values(): try: rec = json.loads(raw) if isinstance(raw, str) else raw except Exception: rec = None if isinstance(rec, dict): rec_timestamp = rec.get('timestamp', 0) if current_time - float(rec_timestamp or 0) > max_age: continue recommendations.append(rec) recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True) logger.info(f"从Redis(Hash)读取到 {len(recommendations)} 个有效推荐(已过滤过期)") except Exception as e: logger.warning(f"从Redis(Hash)读取推荐失败: {e}") # 3) 合并“标记价”(mark price)作为真正的“当前价格”,并携带时间戳用于前端展示 mark_snapshot = {} try: mark_snapshot = await _refresh_mark_price_all_if_needed(rds) except Exception as e: logger.debug(f"刷新/读取 mark price 失败(不影响返回): {e}") mark_items = mark_snapshot.get("items", {}) if isinstance(mark_snapshot, dict) else {} mark_updated_at = mark_snapshot.get("updated_at") mark_updated_at_ms = mark_snapshot.get("updated_at_ms") if isinstance(mark_items, dict) and mark_items: for rec in recommendations: if not isinstance(rec, dict): continue sym = rec.get("symbol") if not sym: continue mp = mark_items.get(sym) if isinstance(mp, dict) and mp.get("markPrice"): rec["current_price"] = float(mp.get("markPrice") or rec.get("current_price") or 0) rec["current_price_source"] = "mark_price" rec["current_price_time_ms"] = mp.get("time") or mark_updated_at_ms rec["price_updated"] = True else: # 保留推荐生成时的 current_price,但给出来源/时间(用于提示“可能过时”) rec.setdefault("current_price_source", rec.get("current_price_source") or "snapshot") rec.setdefault("price_updated", False) # 方向过滤 if direction: recommendations = [r for r in recommendations if r.get('direction') == direction] # 限制返回数量 recommendations = recommendations[:limit] return { "success": True, "count": len(recommendations), "type": "realtime", "from_cache": cache_available, "meta": { **snapshot_meta, "price_source": "mark_price" if mark_items else None, "price_updated_at": mark_updated_at, "price_updated_at_ms": mark_updated_at_ms, }, "data": recommendations } elif type == 'bookmarked': # 从数据库查询已标记的推荐 # 转换日期字符串 start_dt = None end_dt = None if start_date: try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") except ValueError: raise HTTPException(status_code=400, detail="开始日期格式错误,应为 YYYY-MM-DD") if end_date: try: end_dt = datetime.strptime(end_date, "%Y-%m-%d") end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: raise HTTPException(status_code=400, detail="结束日期格式错误,应为 YYYY-MM-DD") recommendations = TradeRecommendation.get_all( status=status, direction=direction, limit=limit, start_date=start_dt, end_date=end_dt ) # 更新实时价格 try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if trading_system_path.exists(): sys.path.insert(0, str(trading_system_path)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) # 连接Redis(如果还没有连接) try: await client.redis_cache.connect() except: pass # 更新推荐中的实时价格和涨跌幅 for rec in recommendations: symbol = rec.get('symbol') if symbol: # 先尝试从内存缓存获取实时价格(同步) realtime_price = client.get_realtime_price(symbol) # 如果内存缓存没有,尝试从Valkey异步读取 if realtime_price is None: try: realtime_price = await client.get_realtime_price_async(symbol) except: pass # 如果WebSocket/Valkey都没有,尝试从REST API获取 if realtime_price is None: try: ticker = await client.get_ticker_24h(symbol) if ticker: realtime_price = float(ticker.get('lastPrice', 0)) except: pass if realtime_price is not None and realtime_price > 0: old_price = rec.get('current_price', 0) rec['current_price'] = realtime_price if old_price > 0: change_percent = ((realtime_price - old_price) / old_price) * 100 rec['change_percent'] = round(change_percent, 4) rec['price_updated'] = True else: rec['price_updated'] = False else: rec['price_updated'] = False else: rec['price_updated'] = False except Exception as price_update_error: logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}") return { "success": True, "count": len(recommendations), "type": "bookmarked", "data": recommendations } else: raise HTTPException(status_code=400, detail="type参数必须是 'realtime' 或 'bookmarked'") except HTTPException: raise except Exception as e: logger.error(f"获取推荐列表失败: {e}") raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}") @router.get("/active") async def get_active_recommendations(): """ 获取当前有效的推荐(已标记的推荐中未过期、未执行、未取消的) 使用WebSocket实时价格更新推荐中的价格信息 同一交易对只返回最新的推荐(已去重) """ try: recommendations = TradeRecommendation.get_active() # 确保时间格式正确(转换为ISO格式字符串,包含时区信息) from datetime import datetime for rec in recommendations: if rec.get('recommendation_time'): if isinstance(rec['recommendation_time'], datetime): rec['recommendation_time'] = rec['recommendation_time'].isoformat() # 尝试从WebSocket缓存获取实时价格更新推荐中的价格 try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if trading_system_path.exists(): sys.path.insert(0, str(trading_system_path)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) # 连接Redis(如果还没有连接) try: await client.redis_cache.connect() except: pass # 更新推荐中的实时价格和涨跌幅 for rec in recommendations: symbol = rec.get('symbol') if symbol: # 先尝试从内存缓存获取实时价格(同步) realtime_price = client.get_realtime_price(symbol) # 如果内存缓存没有,尝试从Valkey异步读取 if realtime_price is None: try: realtime_price = await client.get_realtime_price_async(symbol) except: pass # 如果WebSocket/Valkey都没有,尝试从REST API获取 if realtime_price is None: try: ticker = await client.get_ticker_24h(symbol) if ticker: realtime_price = float(ticker.get('lastPrice', 0)) except: pass if realtime_price is not None and realtime_price > 0: old_price = rec.get('current_price', 0) rec['current_price'] = realtime_price if old_price > 0: change_percent = ((realtime_price - old_price) / old_price) * 100 rec['change_percent'] = round(change_percent, 4) rec['price_updated'] = True else: rec['price_updated'] = False else: rec['price_updated'] = False else: rec['price_updated'] = False except Exception as price_update_error: logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}") return { "success": True, "count": len(recommendations), "data": recommendations } except Exception as e: logger.error(f"获取有效推荐失败: {e}") raise HTTPException(status_code=500, detail=f"获取有效推荐失败: {str(e)}") @router.get("/{recommendation_id}") async def get_recommendation(recommendation_id: int): """ 根据ID获取推荐详情 """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") return { "success": True, "data": recommendation } except HTTPException: raise except Exception as e: logger.error(f"获取推荐详情失败: {e}") raise HTTPException(status_code=500, detail=f"获取推荐详情失败: {str(e)}") @router.get("/symbol/{symbol}") async def get_recommendations_by_symbol( symbol: str, limit: int = Query(10, ge=1, le=50, description="返回数量限制") ): """ 根据交易对获取推荐记录 """ try: recommendations = TradeRecommendation.get_by_symbol(symbol, limit=limit) return { "success": True, "count": len(recommendations), "data": recommendations } except Exception as e: logger.error(f"获取交易对推荐失败: {e}") raise HTTPException(status_code=500, detail=f"获取交易对推荐失败: {str(e)}") @router.post("/{recommendation_id}/execute") async def mark_recommendation_executed( recommendation_id: int, trade_id: Optional[int] = None ): """ 标记推荐已执行 Args: recommendation_id: 推荐ID trade_id: 关联的交易记录ID(可选) """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") if recommendation['status'] != 'active': raise HTTPException( status_code=400, detail=f"推荐状态为 {recommendation['status']},无法标记为已执行" ) TradeRecommendation.mark_executed(recommendation_id, trade_id) return { "success": True, "message": "推荐已标记为已执行" } except HTTPException: raise except Exception as e: logger.error(f"标记推荐已执行失败: {e}") raise HTTPException(status_code=500, detail=f"标记推荐已执行失败: {str(e)}") @router.post("/{recommendation_id}/cancel") async def cancel_recommendation( recommendation_id: int, notes: Optional[str] = None ): """ 取消推荐 Args: recommendation_id: 推荐ID notes: 取消原因备注 """ try: recommendation = TradeRecommendation.get_by_id(recommendation_id) if not recommendation: raise HTTPException(status_code=404, detail="推荐不存在") if recommendation['status'] != 'active': raise HTTPException( status_code=400, detail=f"推荐状态为 {recommendation['status']},无法取消" ) TradeRecommendation.mark_cancelled(recommendation_id, notes) return { "success": True, "message": "推荐已取消" } except HTTPException: raise except Exception as e: logger.error(f"取消推荐失败: {e}") raise HTTPException(status_code=500, detail=f"取消推荐失败: {str(e)}") @router.post("/generate") async def generate_recommendations( min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度"), max_recommendations: int = Query(20, ge=1, le=50, description="最大推荐数量") ): """ 生成新的交易推荐 Args: min_signal_strength: 最小信号强度 max_recommendations: 最大推荐数量 """ try: # 导入推荐器 import sys from pathlib import Path # 从 backend/api/routes/recommendations.py 向上找到项目根目录 # backend/api/routes -> backend/api -> backend -> 项目根目录 current_file = Path(__file__) backend_path = current_file.parent.parent.parent # backend目录 project_root = backend_path.parent # 项目根目录 trading_system_path = project_root / 'trading_system' logger.info(f"查找交易系统模块: {trading_system_path}") logger.info(f"路径存在: {trading_system_path.exists()}") if not trading_system_path.exists(): # 尝试其他可能的路径 alternative_path = backend_path / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path logger.info(f"使用备用路径: {trading_system_path}") else: raise HTTPException( status_code=500, detail=f"交易系统模块不存在。查找路径: {trading_system_path}, 备用路径: {alternative_path}" ) sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from trade_recommender import TradeRecommender import config # 初始化组件 client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) await client.connect() try: scanner = MarketScanner(client) risk_manager = RiskManager(client) recommender = TradeRecommender(client, scanner, risk_manager) # 生成推荐(增量添加到Redis缓存) # 降低信号强度阈值以获取更多推荐(推荐系统可以更宽松) recommendations = await recommender.generate_recommendations( min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐(最低2) max_recommendations=max(max_recommendations, 50), # 至少生成50个推荐 add_to_cache=True, # 添加到Redis缓存 min_quality_score=0.0 # 不过滤,保留所有推荐 ) return { "success": True, "count": len(recommendations), "message": f"成功生成 {len(recommendations)} 个推荐", "data": recommendations } finally: await client.disconnect() except HTTPException: raise except Exception as e: logger.error(f"生成推荐失败: {e}") import traceback logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}") @router.post("/clear-cache") async def clear_recommendations_cache(): """ 清理Redis中的推荐缓存 """ try: import sys from pathlib import Path current_file = Path(__file__) backend_path = current_file.parent.parent.parent project_root = backend_path.parent trading_system_path = project_root / 'trading_system' if not trading_system_path.exists(): alternative_path = backend_path / 'trading_system' if alternative_path.exists(): trading_system_path = alternative_path sys.path.insert(0, str(trading_system_path)) sys.path.insert(0, str(project_root)) from binance_client import BinanceClient import config client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) try: await client.redis_cache.connect() cache_key = "recommendations:realtime" await client.redis_cache.delete(cache_key) return { "success": True, "message": "推荐缓存已清理" } finally: await client.disconnect() except Exception as e: logger.error(f"清理推荐缓存失败: {e}") raise HTTPException(status_code=500, detail=f"清理推荐缓存失败: {str(e)}") @router.post("/bookmark") async def bookmark_recommendation(recommendation_data: dict): """ 标记推荐到数据库(用于复盘) Args: recommendation_data: 推荐数据(包含所有推荐字段) """ try: # 提取必需字段 required_fields = ['symbol', 'direction', 'current_price', 'change_percent', 'recommendation_reason', 'signal_strength'] for field in required_fields: if field not in recommendation_data: raise HTTPException(status_code=400, detail=f"缺少必需字段: {field}") # 保存到数据库 recommendation_id = TradeRecommendation.create( symbol=recommendation_data.get('symbol'), direction=recommendation_data.get('direction'), current_price=recommendation_data.get('current_price'), change_percent=recommendation_data.get('change_percent'), recommendation_reason=recommendation_data.get('recommendation_reason'), signal_strength=recommendation_data.get('signal_strength'), market_regime=recommendation_data.get('market_regime'), trend_4h=recommendation_data.get('trend_4h'), rsi=recommendation_data.get('rsi'), macd_histogram=recommendation_data.get('macd_histogram'), bollinger_upper=recommendation_data.get('bollinger_upper'), bollinger_middle=recommendation_data.get('bollinger_middle'), bollinger_lower=recommendation_data.get('bollinger_lower'), ema20=recommendation_data.get('ema20'), ema50=recommendation_data.get('ema50'), ema20_4h=recommendation_data.get('ema20_4h'), atr=recommendation_data.get('atr'), suggested_stop_loss=recommendation_data.get('suggested_stop_loss'), suggested_take_profit_1=recommendation_data.get('suggested_take_profit_1'), suggested_take_profit_2=recommendation_data.get('suggested_take_profit_2'), suggested_position_percent=recommendation_data.get('suggested_position_percent'), suggested_leverage=recommendation_data.get('suggested_leverage', 10), order_type=recommendation_data.get('order_type', 'LIMIT'), suggested_limit_price=recommendation_data.get('suggested_limit_price'), volume_24h=recommendation_data.get('volume_24h'), volatility=recommendation_data.get('volatility'), notes=recommendation_data.get('notes', '用户标记用于复盘'), user_guide=recommendation_data.get('user_guide'), recommendation_category=recommendation_data.get('recommendation_category'), risk_level=recommendation_data.get('risk_level'), expected_hold_time=recommendation_data.get('expected_hold_time'), trading_tutorial=recommendation_data.get('trading_tutorial'), max_hold_days=recommendation_data.get('max_hold_days', 3) ) logger.info(f"✓ 推荐已标记到数据库: {recommendation_data.get('symbol')} {recommendation_data.get('direction')} (ID: {recommendation_id})") return { "success": True, "message": "推荐已标记到数据库", "recommendation_id": recommendation_id } except HTTPException: raise except Exception as e: logger.error(f"标记推荐失败: {e}") import traceback logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"标记推荐失败: {str(e)}")