auto_trade_sys/backend/api/routes/recommendations.py
薇薇安 30cf5d539f a
2026-01-20 10:48:32 +08:00

906 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对API路由
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from datetime import datetime, timedelta
from database.models import TradeRecommendation
import logging
import os
import json
from typing import Any, Dict, Tuple
import aiohttp
try:
import redis.asyncio as redis_async
except Exception: # pragma: no cover
redis_async = None
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/recommendations", tags=["recommendations"])
REDIS_KEY_RECOMMENDATIONS_SNAPSHOT = "recommendations:snapshot"
REDIS_KEY_RECOMMENDATIONS_HASH = "recommendations:realtime"
REDIS_KEY_MARK_PRICE_ALL = "market:mark_price:all"
REDIS_KEY_MARK_PRICE_ALL_LOCK = "lock:market:mark_price:all"
def _beijing_time_str() -> str:
from datetime import timezone
return datetime.now(tz=timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S")
def _redis_connection_kwargs() -> Tuple[str, Dict[str, Any]]:
"""
从环境变量构建 Redis 连接参数(兼容 Valkey Serverless + TLS
"""
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
username = os.getenv("REDIS_USERNAME", None)
password = os.getenv("REDIS_PASSWORD", None)
ssl_cert_reqs = (os.getenv("REDIS_SSL_CERT_REQS", "required") or "required").strip()
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
kwargs: Dict[str, Any] = {"decode_responses": True}
if username:
kwargs["username"] = username
if password:
kwargs["password"] = password
use_tls = redis_url.startswith("rediss://") or (os.getenv("REDIS_USE_TLS", "False").lower() == "true")
if use_tls and not redis_url.startswith("rediss://"):
if redis_url.startswith("redis://"):
redis_url = redis_url.replace("redis://", "rediss://", 1)
else:
redis_url = f"rediss://{redis_url}"
if use_tls or redis_url.startswith("rediss://"):
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
if ssl_ca_certs:
kwargs["ssl_ca_certs"] = ssl_ca_certs
# redis-py: ssl_check_hostname 用于控制主机名校验
if ssl_cert_reqs == "required":
kwargs["ssl_check_hostname"] = True
else:
kwargs["ssl_check_hostname"] = False
return redis_url, kwargs
async def _get_redis():
if redis_async is None:
return None
redis_url, kwargs = _redis_connection_kwargs()
try:
client = redis_async.from_url(redis_url, **kwargs)
await client.ping()
return client
except Exception as e:
logger.warning(f"Redis 不可用recommendations: {e}")
return None
async def _get_cached_json(client, key: str) -> Optional[Any]:
try:
raw = await client.get(key)
if not raw:
return None
return json.loads(raw)
except Exception:
return None
async def _set_cached_json(client, key: str, value: Any, ttl_sec: int) -> None:
await client.setex(key, ttl_sec, json.dumps(value, ensure_ascii=False))
async def _refresh_mark_price_all_if_needed(client, min_refresh_sec: int = 10, ttl_sec: int = 30) -> Dict[str, Any]:
"""
获取并缓存 Binance U本位合约 Mark PricepremiumIndex
- 使用单 key 全量缓存,适合给“推荐页面当前价格”用
- 用 Redis NX lock 防止并发刷新风暴
"""
now_ms = int(__import__("time").time() * 1000)
cached = await _get_cached_json(client, REDIS_KEY_MARK_PRICE_ALL)
if isinstance(cached, dict):
updated_at_ms = cached.get("updated_at_ms")
try:
updated_at_ms = int(updated_at_ms) if updated_at_ms is not None else None
except Exception:
updated_at_ms = None
if updated_at_ms and (now_ms - updated_at_ms) < (min_refresh_sec * 1000):
return cached
# 尝试抢锁刷新(锁很短,避免慢请求时阻塞)
try:
locked = await client.set(REDIS_KEY_MARK_PRICE_ALL_LOCK, str(now_ms), nx=True, ex=min_refresh_sec)
except Exception:
locked = False
if not locked:
# 没拿到锁就直接返回旧值(如果有)
return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}}
# 真的去 Binance 拉一次
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=6)) as session:
async with session.get(url) as resp:
if resp.status != 200:
raise RuntimeError(f"binance premiumIndex http={resp.status}")
payload = await resp.json()
except Exception as e:
logger.warning(f"刷新 mark price 失败: {e}")
# 刷新失败,返回旧值
return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}}
items: Dict[str, Dict[str, Any]] = {}
if isinstance(payload, list):
for row in payload:
if not isinstance(row, dict):
continue
sym = row.get("symbol")
if not sym or not isinstance(sym, str):
continue
# 只保留 USDT 永续相关(避免无关数据撑爆)
if not sym.endswith("USDT"):
continue
try:
mp = float(row.get("markPrice", 0) or 0)
except Exception:
mp = 0.0
t = row.get("time") # 毫秒
try:
t = int(t) if t is not None else None
except Exception:
t = None
items[sym] = {"markPrice": mp, "time": t}
snapshot = {
"updated_at_ms": now_ms,
"updated_at": _beijing_time_str(),
"items": items,
"source": "binance_premiumIndex",
}
try:
await _set_cached_json(client, REDIS_KEY_MARK_PRICE_ALL, snapshot, ttl_sec=ttl_sec)
except Exception as e:
logger.warning(f"写入 {REDIS_KEY_MARK_PRICE_ALL} 失败(不影响返回): {e}")
return snapshot
@router.get("")
async def get_recommendations(
type: Optional[str] = Query('realtime', description="类型: realtime(实时推荐), bookmarked(已标记的推荐)"),
status: Optional[str] = Query(None, description="状态过滤: active, executed, expired, cancelled (仅用于bookmarked类型)"),
direction: Optional[str] = Query(None, description="方向过滤: BUY, SELL"),
limit: int = Query(50, ge=1, le=200, description="返回数量限制"),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD, 仅用于bookmarked类型)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD, 仅用于bookmarked类型)"),
min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度 (仅用于realtime类型)")
):
"""
获取推荐交易对列表
Args:
type: 类型 - realtime(实时推荐,基于当前行情), bookmarked(已标记的推荐,从数据库查询)
status: 状态过滤仅用于bookmarked类型
direction: 方向过滤
limit: 返回数量限制
start_date: 开始日期仅用于bookmarked类型
end_date: 结束日期仅用于bookmarked类型
min_signal_strength: 最小信号强度仅用于realtime类型
"""
try:
if type == 'realtime':
# 实时推荐:统一只读 Redis全局一份 snapshot不在请求里实时生成避免“页面刷新=触发扫描”)
recommendations = []
cache_available = False
snapshot_meta: Dict[str, Any] = {}
rds = await _get_redis()
if rds is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取推荐缓存")
# 1) 优先读取 snapshot单 key
try:
snapshot = await _get_cached_json(rds, REDIS_KEY_RECOMMENDATIONS_SNAPSHOT)
if isinstance(snapshot, dict):
items = snapshot.get("items", [])
if isinstance(items, list) and items:
recommendations = items
cache_available = True
snapshot_meta = {
"generated_at_ms": snapshot.get("generated_at_ms"),
"generated_at": snapshot.get("generated_at"),
"ttl_sec": snapshot.get("ttl_sec"),
}
except Exception as e:
logger.warning(f"读取 {REDIS_KEY_RECOMMENDATIONS_SNAPSHOT} 失败: {e}")
# 2) 兼容旧缓存Hash
if not recommendations:
try:
import time as _time
cached_data = await rds.hgetall(REDIS_KEY_RECOMMENDATIONS_HASH)
if cached_data:
cache_available = True
current_time = _time.time()
max_age = 3600 * 2 # 推荐最大保留时间2小时
for raw in cached_data.values():
try:
rec = json.loads(raw) if isinstance(raw, str) else raw
except Exception:
rec = None
if isinstance(rec, dict):
rec_timestamp = rec.get('timestamp', 0)
if current_time - float(rec_timestamp or 0) > max_age:
continue
recommendations.append(rec)
recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
logger.info(f"从Redis(Hash)读取到 {len(recommendations)} 个有效推荐(已过滤过期)")
except Exception as e:
logger.warning(f"从Redis(Hash)读取推荐失败: {e}")
# 3) 合并“标记价”(mark price)作为真正的“当前价格”,并携带时间戳用于前端展示
mark_snapshot = {}
try:
mark_snapshot = await _refresh_mark_price_all_if_needed(rds)
except Exception as e:
logger.debug(f"刷新/读取 mark price 失败(不影响返回): {e}")
mark_items = mark_snapshot.get("items", {}) if isinstance(mark_snapshot, dict) else {}
mark_updated_at = mark_snapshot.get("updated_at")
mark_updated_at_ms = mark_snapshot.get("updated_at_ms")
if isinstance(mark_items, dict) and mark_items:
for rec in recommendations:
if not isinstance(rec, dict):
continue
sym = rec.get("symbol")
if not sym:
continue
mp = mark_items.get(sym)
if isinstance(mp, dict) and mp.get("markPrice"):
rec["current_price"] = float(mp.get("markPrice") or rec.get("current_price") or 0)
rec["current_price_source"] = "mark_price"
rec["current_price_time_ms"] = mp.get("time") or mark_updated_at_ms
rec["price_updated"] = True
else:
# 保留推荐生成时的 current_price但给出来源/时间(用于提示“可能过时”)
rec.setdefault("current_price_source", rec.get("current_price_source") or "snapshot")
rec.setdefault("price_updated", False)
# 4) 过滤“过期/不再适用”的推荐(短期可借鉴)
# 规则:
# - 时间过期:超过 max_age_sec 直接丢弃
# - 价格偏离:当前价偏离 planned_entry_price / suggested_limit_price 过大丢弃
# - 合法性校验BUY 需要 SL < entry < TPSELL 需要 SL > entry > TP
import time as _time
now_s = _time.time()
try:
max_age_sec = int(os.getenv("RECOMMENDATIONS_MAX_AGE_SEC", "1800")) # 默认30分钟
except Exception:
max_age_sec = 1800
try:
max_price_drift_pct = float(os.getenv("RECOMMENDATIONS_MAX_PRICE_DRIFT_PCT", "1.5")) # 默认1.5%
except Exception:
max_price_drift_pct = 1.5
dropped_age = 0
dropped_drift = 0
dropped_invalid = 0
def _f(v):
try:
return float(v)
except Exception:
return None
filtered_recs: List[Dict[str, Any]] = []
for rec in recommendations:
if not isinstance(rec, dict):
continue
# 4.1 时间过期
ts = rec.get("timestamp", 0)
try:
ts = float(ts) if ts is not None else 0.0
except Exception:
ts = 0.0
if ts and max_age_sec > 0 and (now_s - ts) > max_age_sec:
dropped_age += 1
continue
direction_u = str(rec.get("direction", "") or "").upper()
cur = _f(rec.get("current_price"))
# 基准入场价:优先 planned_entry_price新逻辑再 suggested_limit_price
entry_base = _f(rec.get("planned_entry_price"))
if entry_base is None:
entry_base = _f(rec.get("suggested_limit_price"))
if entry_base is None:
entry_base = _f(rec.get("analysis_price")) or cur
# 4.2 价格偏离过大(挂单参考已失效)
if max_price_drift_pct > 0 and cur and entry_base and entry_base > 0:
drift = abs((cur - entry_base) / entry_base) * 100
if drift > max_price_drift_pct:
dropped_drift += 1
continue
# 4.3 合法性校验:止损/止盈相对关系
sl = _f(rec.get("suggested_stop_loss"))
tp1 = _f(rec.get("suggested_take_profit_1"))
tp2 = _f(rec.get("suggested_take_profit_2"))
if direction_u == "BUY":
if entry_base and sl and sl >= entry_base:
dropped_invalid += 1
continue
if entry_base and tp1 and tp1 <= entry_base:
dropped_invalid += 1
continue
if entry_base and tp2 and tp2 <= entry_base:
dropped_invalid += 1
continue
elif direction_u == "SELL":
if entry_base and sl and sl <= entry_base:
dropped_invalid += 1
continue
if entry_base and tp1 and tp1 >= entry_base:
dropped_invalid += 1
continue
if entry_base and tp2 and tp2 >= entry_base:
dropped_invalid += 1
continue
filtered_recs.append(rec)
recommendations = filtered_recs
# 方向过滤
if direction:
recommendations = [r for r in recommendations if r.get('direction') == direction]
# 限制返回数量
recommendations = recommendations[:limit]
return {
"success": True,
"count": len(recommendations),
"type": "realtime",
"from_cache": cache_available,
"meta": {
**snapshot_meta,
"price_source": "mark_price" if mark_items else None,
"price_updated_at": mark_updated_at,
"price_updated_at_ms": mark_updated_at_ms,
"recommendations_max_age_sec": max_age_sec,
"recommendations_max_price_drift_pct": max_price_drift_pct,
"dropped": {
"age": dropped_age,
"price_drift": dropped_drift,
"invalid": dropped_invalid,
},
},
"data": recommendations
}
elif type == 'bookmarked':
# 从数据库查询已标记的推荐
# 转换日期字符串
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
raise HTTPException(status_code=400, detail="开始日期格式错误,应为 YYYY-MM-DD")
if end_date:
try:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
end_dt = end_dt.replace(hour=23, minute=59, second=59)
except ValueError:
raise HTTPException(status_code=400, detail="结束日期格式错误,应为 YYYY-MM-DD")
recommendations = TradeRecommendation.get_all(
status=status,
direction=direction,
limit=limit,
start_date=start_dt,
end_date=end_dt
)
# 更新实时价格
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if trading_system_path.exists():
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
import config
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
# 连接Redis如果还没有连接
try:
await client.redis_cache.connect()
except:
pass
# 更新推荐中的实时价格和涨跌幅
for rec in recommendations:
symbol = rec.get('symbol')
if symbol:
# 先尝试从内存缓存获取实时价格(同步)
realtime_price = client.get_realtime_price(symbol)
# 如果内存缓存没有尝试从Valkey异步读取
if realtime_price is None:
try:
realtime_price = await client.get_realtime_price_async(symbol)
except:
pass
# 如果WebSocket/Valkey都没有尝试从REST API获取
if realtime_price is None:
try:
ticker = await client.get_ticker_24h(symbol)
if ticker:
realtime_price = float(ticker.get('lastPrice', 0))
except:
pass
if realtime_price is not None and realtime_price > 0:
old_price = rec.get('current_price', 0)
rec['current_price'] = realtime_price
if old_price > 0:
change_percent = ((realtime_price - old_price) / old_price) * 100
rec['change_percent'] = round(change_percent, 4)
rec['price_updated'] = True
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
except Exception as price_update_error:
logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}")
return {
"success": True,
"count": len(recommendations),
"type": "bookmarked",
"data": recommendations
}
else:
raise HTTPException(status_code=400, detail="type参数必须是 'realtime''bookmarked'")
except HTTPException:
raise
except Exception as e:
logger.error(f"获取推荐列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐列表失败: {str(e)}")
@router.get("/active")
async def get_active_recommendations():
"""
获取当前有效的推荐(已标记的推荐中未过期、未执行、未取消的)
使用WebSocket实时价格更新推荐中的价格信息
同一交易对只返回最新的推荐(已去重)
"""
try:
recommendations = TradeRecommendation.get_active()
# 确保时间格式正确转换为ISO格式字符串包含时区信息
from datetime import datetime
for rec in recommendations:
if rec.get('recommendation_time'):
if isinstance(rec['recommendation_time'], datetime):
rec['recommendation_time'] = rec['recommendation_time'].isoformat()
# 尝试从WebSocket缓存获取实时价格更新推荐中的价格
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if trading_system_path.exists():
sys.path.insert(0, str(trading_system_path))
from binance_client import BinanceClient
import config
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
# 连接Redis如果还没有连接
try:
await client.redis_cache.connect()
except:
pass
# 更新推荐中的实时价格和涨跌幅
for rec in recommendations:
symbol = rec.get('symbol')
if symbol:
# 先尝试从内存缓存获取实时价格(同步)
realtime_price = client.get_realtime_price(symbol)
# 如果内存缓存没有尝试从Valkey异步读取
if realtime_price is None:
try:
realtime_price = await client.get_realtime_price_async(symbol)
except:
pass
# 如果WebSocket/Valkey都没有尝试从REST API获取
if realtime_price is None:
try:
ticker = await client.get_ticker_24h(symbol)
if ticker:
realtime_price = float(ticker.get('lastPrice', 0))
except:
pass
if realtime_price is not None and realtime_price > 0:
old_price = rec.get('current_price', 0)
rec['current_price'] = realtime_price
if old_price > 0:
change_percent = ((realtime_price - old_price) / old_price) * 100
rec['change_percent'] = round(change_percent, 4)
rec['price_updated'] = True
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
else:
rec['price_updated'] = False
except Exception as price_update_error:
logger.debug(f"更新推荐实时价格失败(不影响返回): {price_update_error}")
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取有效推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取有效推荐失败: {str(e)}")
@router.get("/{recommendation_id}")
async def get_recommendation(recommendation_id: int):
"""
根据ID获取推荐详情
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
return {
"success": True,
"data": recommendation
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取推荐详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取推荐详情失败: {str(e)}")
@router.get("/symbol/{symbol}")
async def get_recommendations_by_symbol(
symbol: str,
limit: int = Query(10, ge=1, le=50, description="返回数量限制")
):
"""
根据交易对获取推荐记录
"""
try:
recommendations = TradeRecommendation.get_by_symbol(symbol, limit=limit)
return {
"success": True,
"count": len(recommendations),
"data": recommendations
}
except Exception as e:
logger.error(f"获取交易对推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"获取交易对推荐失败: {str(e)}")
@router.post("/{recommendation_id}/execute")
async def mark_recommendation_executed(
recommendation_id: int,
trade_id: Optional[int] = None
):
"""
标记推荐已执行
Args:
recommendation_id: 推荐ID
trade_id: 关联的交易记录ID可选
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法标记为已执行"
)
TradeRecommendation.mark_executed(recommendation_id, trade_id)
return {
"success": True,
"message": "推荐已标记为已执行"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"标记推荐已执行失败: {e}")
raise HTTPException(status_code=500, detail=f"标记推荐已执行失败: {str(e)}")
@router.post("/{recommendation_id}/cancel")
async def cancel_recommendation(
recommendation_id: int,
notes: Optional[str] = None
):
"""
取消推荐
Args:
recommendation_id: 推荐ID
notes: 取消原因备注
"""
try:
recommendation = TradeRecommendation.get_by_id(recommendation_id)
if not recommendation:
raise HTTPException(status_code=404, detail="推荐不存在")
if recommendation['status'] != 'active':
raise HTTPException(
status_code=400,
detail=f"推荐状态为 {recommendation['status']},无法取消"
)
TradeRecommendation.mark_cancelled(recommendation_id, notes)
return {
"success": True,
"message": "推荐已取消"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"取消推荐失败: {e}")
raise HTTPException(status_code=500, detail=f"取消推荐失败: {str(e)}")
@router.post("/generate")
async def generate_recommendations(
min_signal_strength: int = Query(5, ge=0, le=10, description="最小信号强度"),
max_recommendations: int = Query(20, ge=1, le=50, description="最大推荐数量")
):
"""
生成新的交易推荐
Args:
min_signal_strength: 最小信号强度
max_recommendations: 最大推荐数量
"""
try:
# 导入推荐器
import sys
from pathlib import Path
# 从 backend/api/routes/recommendations.py 向上找到项目根目录
# backend/api/routes -> backend/api -> backend -> 项目根目录
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent # backend目录
project_root = backend_path.parent # 项目根目录
trading_system_path = project_root / 'trading_system'
logger.info(f"查找交易系统模块: {trading_system_path}")
logger.info(f"路径存在: {trading_system_path.exists()}")
if not trading_system_path.exists():
# 尝试其他可能的路径
alternative_path = backend_path / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
logger.info(f"使用备用路径: {trading_system_path}")
else:
raise HTTPException(
status_code=500,
detail=f"交易系统模块不存在。查找路径: {trading_system_path}, 备用路径: {alternative_path}"
)
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from trade_recommender import TradeRecommender
import config
# 初始化组件
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
await client.connect()
try:
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
# 生成推荐增量添加到Redis缓存
# 降低信号强度阈值以获取更多推荐(推荐系统可以更宽松)
recommendations = await recommender.generate_recommendations(
min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐最低2
max_recommendations=max(max_recommendations, 50), # 至少生成50个推荐
add_to_cache=True, # 添加到Redis缓存
min_quality_score=0.0, # 不过滤,保留所有推荐
scan_cache_namespace="api", # 避免覆盖交易/推荐进程的扫描缓存
)
return {
"success": True,
"count": len(recommendations),
"message": f"成功生成 {len(recommendations)} 个推荐",
"data": recommendations
}
finally:
await client.disconnect()
except HTTPException:
raise
except Exception as e:
logger.error(f"生成推荐失败: {e}")
import traceback
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}")
@router.post("/clear-cache")
async def clear_recommendations_cache():
"""
清理Redis中的推荐缓存
"""
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
alternative_path = backend_path / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
import config
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
try:
await client.redis_cache.connect()
cache_key = "recommendations:realtime"
await client.redis_cache.delete(cache_key)
return {
"success": True,
"message": "推荐缓存已清理"
}
finally:
await client.disconnect()
except Exception as e:
logger.error(f"清理推荐缓存失败: {e}")
raise HTTPException(status_code=500, detail=f"清理推荐缓存失败: {str(e)}")
@router.post("/bookmark")
async def bookmark_recommendation(recommendation_data: dict):
"""
标记推荐到数据库(用于复盘)
Args:
recommendation_data: 推荐数据(包含所有推荐字段)
"""
try:
# 提取必需字段
required_fields = ['symbol', 'direction', 'current_price', 'change_percent',
'recommendation_reason', 'signal_strength']
for field in required_fields:
if field not in recommendation_data:
raise HTTPException(status_code=400, detail=f"缺少必需字段: {field}")
# 保存到数据库
recommendation_id = TradeRecommendation.create(
symbol=recommendation_data.get('symbol'),
direction=recommendation_data.get('direction'),
current_price=recommendation_data.get('current_price'),
change_percent=recommendation_data.get('change_percent'),
recommendation_reason=recommendation_data.get('recommendation_reason'),
signal_strength=recommendation_data.get('signal_strength'),
market_regime=recommendation_data.get('market_regime'),
trend_4h=recommendation_data.get('trend_4h'),
rsi=recommendation_data.get('rsi'),
macd_histogram=recommendation_data.get('macd_histogram'),
bollinger_upper=recommendation_data.get('bollinger_upper'),
bollinger_middle=recommendation_data.get('bollinger_middle'),
bollinger_lower=recommendation_data.get('bollinger_lower'),
ema20=recommendation_data.get('ema20'),
ema50=recommendation_data.get('ema50'),
ema20_4h=recommendation_data.get('ema20_4h'),
atr=recommendation_data.get('atr'),
suggested_stop_loss=recommendation_data.get('suggested_stop_loss'),
suggested_take_profit_1=recommendation_data.get('suggested_take_profit_1'),
suggested_take_profit_2=recommendation_data.get('suggested_take_profit_2'),
suggested_position_percent=recommendation_data.get('suggested_position_percent'),
suggested_leverage=recommendation_data.get('suggested_leverage', 10),
order_type=recommendation_data.get('order_type', 'LIMIT'),
suggested_limit_price=recommendation_data.get('suggested_limit_price'),
volume_24h=recommendation_data.get('volume_24h'),
volatility=recommendation_data.get('volatility'),
notes=recommendation_data.get('notes', '用户标记用于复盘'),
user_guide=recommendation_data.get('user_guide'),
recommendation_category=recommendation_data.get('recommendation_category'),
risk_level=recommendation_data.get('risk_level'),
expected_hold_time=recommendation_data.get('expected_hold_time'),
trading_tutorial=recommendation_data.get('trading_tutorial'),
max_hold_days=recommendation_data.get('max_hold_days', 3)
)
logger.info(f"✓ 推荐已标记到数据库: {recommendation_data.get('symbol')} {recommendation_data.get('direction')} (ID: {recommendation_id})")
return {
"success": True,
"message": "推荐已标记到数据库",
"recommendation_id": recommendation_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"标记推荐失败: {e}")
import traceback
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"标记推荐失败: {str(e)}")