diff --git a/backend/api/routes/account.py b/backend/api/routes/account.py index 0ef1fa1..17b91c6 100644 --- a/backend/api/routes/account.py +++ b/backend/api/routes/account.py @@ -262,18 +262,29 @@ async def get_realtime_positions(): if mark_price == 0: mark_price = entry_price - position_value = abs(position_amt * mark_price) - - # 计算开仓时的USDT数量(名义价值) + # === 名义/保证金口径说明(与币安展示更接近)=== + # - 币安的名义价值/仓位价值通常随标记价(markPrice)变动 + # - DB 中的 notional_usdt/margin_usdt 通常是“开仓时”写入,用于复盘/统计 + # - 若发生部分止盈/减仓:币安 positionAmt 会变小,但 DB 里的 notional/margin 可能仍是“原始开仓量” + # → 会出现:数量=6.8,但名义/保证金像是 13.6 的两倍(与你反馈一致) + # + # 因此:实时持仓展示统一使用“当前数量×标记价”的实时名义/保证金, + # 并额外返回 original_* 字段保留 DB 开仓口径,避免混用导致误解。 + + # 兼容旧字段:entry_value_usdt 仍保留(但它是按入场价计算的名义) entry_value_usdt = abs(position_amt) * entry_price - - # 计算收益率:盈亏 / 保证金(与币安一致) - # 保证金 = 名义价值 / 杠杆 + leverage = float(pos.get('leverage', 1)) - margin = entry_value_usdt / leverage if leverage > 0 else entry_value_usdt + if leverage <= 0: + leverage = 1.0 + + # 当前持仓名义价值(USDT):按标记价 + notional_usdt_live = abs(position_amt) * mark_price + # 当前持仓保证金(USDT):名义/杠杆 + margin_usdt_live = notional_usdt_live / leverage pnl_percent = 0 - if margin > 0: - pnl_percent = (unrealized_pnl / margin) * 100 + if margin_usdt_live > 0: + pnl_percent = (unrealized_pnl / margin_usdt_live) * 100 # 尝试从数据库获取开仓时间、止损止盈价格(以及交易规模字段) entry_time = None @@ -314,9 +325,12 @@ async def get_realtime_positions(): "entry_price": entry_price, # 兼容旧字段:entry_value_usdt 仍保留(前端已有使用) "entry_value_usdt": entry_value_usdt, - # 新字段:名义/保证金(若DB有则优先使用DB;否则使用实时计算) - "notional_usdt": db_notional_usdt if db_notional_usdt is not None else entry_value_usdt, - "margin_usdt": db_margin_usdt if db_margin_usdt is not None else margin, + # 实时展示字段:与币安更一致(按当前数量×标记价) + "notional_usdt": notional_usdt_live, + "margin_usdt": margin_usdt_live, + # 额外返回“开仓记录口径”(用于排查部分止盈/减仓导致的不一致) + "original_notional_usdt": db_notional_usdt, + "original_margin_usdt": db_margin_usdt, "mark_price": mark_price, "pnl": unrealized_pnl, "pnl_percent": pnl_percent, # 基于保证金的盈亏百分比 @@ -384,12 +398,40 @@ async def close_position(symbol: str): logger.info("✓ 币安API连接成功") try: - # 检查币安是否有持仓 + # 检查币安是否有持仓(使用原始 position_information,确保能拿到 positionSide 以处理 -4061) logger.info(f"检查 {symbol} 在币安的持仓状态...") - positions = await client.get_open_positions() - position = next((p for p in positions if p['symbol'] == symbol and float(p['positionAmt']) != 0), None) + + # 读取持仓模式:dualSidePosition=True => 对冲模式(必须传 positionSide=LONG/SHORT) + dual_side = None + try: + mode_res = await client.client.futures_get_position_mode() + if isinstance(mode_res, dict): + dual_side = bool(mode_res.get("dualSidePosition")) + except Exception as e: + logger.warning(f"读取持仓模式失败(将按单向模式兜底): {e}") + dual_side = None + + raw_positions = await client.client.futures_position_information(symbol=symbol) + nonzero_positions = [] + for p in raw_positions or []: + try: + amt = float(p.get("positionAmt", 0)) + except Exception: + continue + if abs(amt) > 0: + nonzero_positions.append((amt, p)) + + # 兼容旧逻辑:如果原始接口异常,回退到封装方法 + if not nonzero_positions: + try: + positions = await client.get_open_positions() + position = next((p for p in positions if p['symbol'] == symbol and float(p['positionAmt']) != 0), None) + if position: + nonzero_positions = [(float(position["positionAmt"]), {"positionAmt": position["positionAmt"]})] + except Exception: + nonzero_positions = [] - if not position: + if not nonzero_positions: logger.warning(f"⚠ {symbol} 币安账户中没有持仓,可能已被平仓") # 检查数据库中是否有未平仓的记录,如果有则更新 open_trades = Trade.get_by_symbol(symbol, status='open') @@ -425,121 +467,183 @@ async def close_position(symbol: str): "symbol": symbol, "status": "closed" } - - # 获取持仓信息 - position_amt = float(position['positionAmt']) - logger.info(f"✓ 币安账户中有 {symbol} 持仓: {position_amt:.4f}") - - # 确定平仓方向(与持仓相反) - side = 'SELL' if position_amt > 0 else 'BUY' - quantity = abs(position_amt) - - logger.info(f"开始执行平仓操作: {symbol} {side} {quantity:.4f} @ MARKET (reduceOnly=true)...") - - # 直接调用币安 API 平仓,绕过 BinanceClient 的检查逻辑 - # 对于平仓操作,应该直接使用币安 API,不需要名义价值和保证金检查 + + # 获取交易对精度信息,调整数量精度(平仓不要向上补 minQty,避免超过持仓数量) + symbol_info = None try: - # 获取交易对精度信息,调整数量精度 symbol_info = await client.get_symbol_info(symbol) - if symbol_info: - quantity_precision = symbol_info.get('quantityPrecision', 8) - step_size = symbol_info.get('stepSize', 0) - min_qty = symbol_info.get('minQty', 0) - - # 调整数量精度 - if step_size > 0: - adjusted_quantity = float(int(quantity / step_size)) * step_size - else: - adjusted_quantity = round(quantity, quantity_precision) - - # 确保不小于最小数量 - if min_qty > 0 and adjusted_quantity < min_qty: - adjusted_quantity = min_qty - - adjusted_quantity = round(adjusted_quantity, quantity_precision) - if adjusted_quantity != quantity: - logger.info(f"数量精度调整: {quantity} -> {adjusted_quantity}") - quantity = adjusted_quantity - - # 直接调用币安 API 下单(使用 reduceOnly="true") - order = await client.client.futures_create_order( - symbol=symbol, - side=side, - type='MARKET', - quantity=quantity, - reduceOnly="true" # 使用字符串格式,符合币安API要求 + except Exception: + symbol_info = None + + def _adjust_close_qty(qty: float) -> float: + if qty is None: + return 0.0 + q = float(qty) + if not symbol_info: + return q + quantity_precision = symbol_info.get('quantityPrecision', 8) + step_size = float(symbol_info.get('stepSize', 0) or 0) + if step_size and step_size > 0: + # 向下取整,避免超过持仓 + q = float(int(q / step_size)) * step_size + else: + q = round(q, quantity_precision) + q = round(q, quantity_precision) + return q + + # 组装平仓订单(对冲模式可能同币种有 LONG/SHORT 两个仓位,这里一并平掉) + orders = [] + order_ids = [] + + # 如果 dual_side 无法读取,按 raw_positions 是否包含 positionSide 来推断 + if dual_side is None: + if any(isinstance(p, dict) and (p.get("positionSide") in ("LONG", "SHORT")) for _, p in nonzero_positions): + dual_side = True + else: + dual_side = False + + logger.info(f"{symbol} 持仓模式: {'HEDGE(对冲)' if dual_side else 'ONE-WAY(单向)'}") + + # 构造待平仓列表:[(positionSide, amt)] + to_close = [] + if dual_side: + for amt, p in nonzero_positions: + ps = (p.get("positionSide") or "").upper() + if ps not in ("LONG", "SHORT"): + ps = "LONG" if amt > 0 else "SHORT" + to_close.append((ps, amt)) + else: + # 单向模式只应存在一个净仓位;如果有多个,按合计处理 + net_amt = sum([amt for amt, _ in nonzero_positions]) + if abs(net_amt) > 0: + to_close.append(("BOTH", net_amt)) + + logger.info(f"✓ 币安账户中 {symbol} 待平仓: {to_close}") + + for ps, amt in to_close: + side = 'SELL' if float(amt) > 0 else 'BUY' + quantity = abs(float(amt)) + quantity = _adjust_close_qty(quantity) + if quantity <= 0: + logger.warning(f"{symbol} 平仓数量调整后为0,跳过该仓位: positionSide={ps}, amt={amt}") + continue + + order_params = { + "symbol": symbol, + "side": side, + "type": "MARKET", + "quantity": quantity, + } + # 对冲模式必须传 positionSide=LONG/SHORT;并且某些账户会 -1106,因此这里不再传 reduceOnly + if dual_side and ps in ("LONG", "SHORT"): + order_params["positionSide"] = ps + else: + # 单向模式用 reduceOnly 防止反向开仓 + order_params["reduceOnly"] = True + + logger.info( + f"开始执行平仓下单: {symbol} side={side} qty={quantity} " + f"positionSide={order_params.get('positionSide')} reduceOnly={order_params.get('reduceOnly')}" ) - - if not order: - error_msg = f"{symbol} 平仓失败:币安API返回 None" + try: + order = await client.client.futures_create_order(**order_params) + if not order: + raise RuntimeError("币安API返回 None") + orders.append(order) + oid = order.get("orderId") + if oid: + order_ids.append(oid) + except Exception as order_error: + error_msg = f"{symbol} 平仓失败:下单异常 - {str(order_error)}" logger.error(error_msg) + logger.error(f" 错误类型: {type(order_error).__name__}") + import traceback + logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=error_msg) - - except Exception as order_error: - error_msg = f"{symbol} 平仓失败:下单异常 - {str(order_error)}" - logger.error(error_msg) - logger.error(f" 错误类型: {type(order_error).__name__}") - import traceback - logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}") - raise HTTPException(status_code=500, detail=error_msg) - - order_id = order.get('orderId') - logger.info(f"✓ {symbol} 平仓订单已提交 (订单ID: {order_id})") + + if not orders: + raise HTTPException(status_code=400, detail=f"{symbol} 无可平仓的有效仓位(数量调整后为0或无持仓)") + + logger.info(f"✓ {symbol} 平仓订单已提交: {order_ids}") # 等待订单成交,获取实际成交价格 import asyncio await asyncio.sleep(1) - # 获取订单详情 - exit_price = None + # 获取订单详情(可能多个订单,按订单号分别取价) + exit_prices = {} + for oid in order_ids: + try: + order_info = await client.client.futures_get_order(symbol=symbol, orderId=oid) + if order_info: + p = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0)) + if p <= 0 and order_info.get('fills'): + total_qty = 0 + total_value = 0 + for fill in order_info.get('fills', []): + qty = float(fill.get('qty', 0)) + price = float(fill.get('price', 0)) + total_qty += qty + total_value += qty * price + if total_qty > 0: + p = total_value / total_qty + if p > 0: + exit_prices[oid] = p + except Exception as e: + logger.warning(f"获取订单详情失败 (orderId={oid}): {e}") + + # 兜底:如果无法获取订单价格,使用当前价格 + fallback_exit_price = None try: - order_info = await client.client.futures_get_order(symbol=symbol, orderId=order_id) - if order_info: - exit_price = float(order_info.get('avgPrice', 0)) or float(order_info.get('price', 0)) - if exit_price <= 0 and order_info.get('fills'): - # 计算加权平均成交价格 - total_qty = 0 - total_value = 0 - for fill in order_info.get('fills', []): - qty = float(fill.get('qty', 0)) - price = float(fill.get('price', 0)) - total_qty += qty - total_value += qty * price - if total_qty > 0: - exit_price = total_value / total_qty - except Exception as e: - logger.warning(f"获取订单详情失败: {e},使用当前价格") - - # 如果无法获取订单价格,使用当前价格 - if not exit_price or exit_price <= 0: ticker = await client.get_ticker_24h(symbol) - exit_price = float(ticker['price']) if ticker else float(position.get('entryPrice', 0)) + fallback_exit_price = float(ticker['price']) if ticker else None + except Exception: + fallback_exit_price = None # 更新数据库记录 open_trades = Trade.get_by_symbol(symbol, status='open') if open_trades: - trade = open_trades[0] - entry_price = float(trade['entry_price']) - trade_quantity = float(trade['quantity']) - - # 计算盈亏 - if trade['side'] == 'BUY': - pnl = (exit_price - entry_price) * trade_quantity - pnl_percent = ((exit_price - entry_price) / entry_price) * 100 - else: - pnl = (entry_price - exit_price) * trade_quantity - pnl_percent = ((entry_price - exit_price) / entry_price) * 100 - - # 更新数据库 - Trade.update_exit( - trade_id=trade['id'], - exit_price=exit_price, - exit_reason='manual', - pnl=pnl, - pnl_percent=pnl_percent, - exit_order_id=order_id - ) - logger.info(f"✓ 已更新数据库记录 (盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)") + # 对冲模式可能有多条 trade(BUY/LONG 和 SELL/SHORT),尽量按方向匹配订单更新 + used_order_ids = set() + for trade in open_trades: + try: + entry_price = float(trade['entry_price']) + trade_quantity = float(trade['quantity']) + except Exception: + continue + + # 选择一个未使用的 orderId(如果只有一个,就复用) + chosen_oid = None + for oid in order_ids: + if oid not in used_order_ids: + chosen_oid = oid + break + if chosen_oid is None and order_ids: + chosen_oid = order_ids[0] + if chosen_oid: + used_order_ids.add(chosen_oid) + + exit_price = exit_prices.get(chosen_oid) if chosen_oid else None + if not exit_price: + exit_price = fallback_exit_price or entry_price + + # 计算盈亏(数据库侧依旧按名义盈亏;收益率展示用保证金口径在前端/统计里另算) + if trade['side'] == 'BUY': + pnl = (exit_price - entry_price) * trade_quantity + pnl_percent = ((exit_price - entry_price) / entry_price) * 100 + else: + pnl = (entry_price - exit_price) * trade_quantity + pnl_percent = ((entry_price - exit_price) / entry_price) * 100 + + Trade.update_exit( + trade_id=trade['id'], + exit_price=exit_price, + exit_reason='manual', + pnl=pnl, + pnl_percent=pnl_percent, + exit_order_id=chosen_oid + ) + logger.info(f"✓ 已更新数据库记录 trade_id={trade['id']} order_id={chosen_oid} (盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)") logger.info(f"✓ {symbol} 平仓成功") return { diff --git a/backend/api/routes/recommendations.py b/backend/api/routes/recommendations.py index 8ca10ec..31e5a1a 100644 --- a/backend/api/routes/recommendations.py +++ b/backend/api/routes/recommendations.py @@ -6,11 +6,171 @@ from typing import Optional, List from datetime import datetime, timedelta from database.models import TradeRecommendation import logging +import os +import json +from typing import Any, Dict, Tuple + +import aiohttp + +try: + import redis.asyncio as redis_async +except Exception: # pragma: no cover + redis_async = None logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/recommendations", tags=["recommendations"]) +REDIS_KEY_RECOMMENDATIONS_SNAPSHOT = "recommendations:snapshot" +REDIS_KEY_RECOMMENDATIONS_HASH = "recommendations:realtime" +REDIS_KEY_MARK_PRICE_ALL = "market:mark_price:all" +REDIS_KEY_MARK_PRICE_ALL_LOCK = "lock:market:mark_price:all" + + +def _beijing_time_str() -> str: + from datetime import timezone + + return datetime.now(tz=timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S") + + +def _redis_connection_kwargs() -> Tuple[str, Dict[str, Any]]: + """ + 从环境变量构建 Redis 连接参数(兼容 Valkey Serverless + TLS)。 + """ + redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379" + username = os.getenv("REDIS_USERNAME", None) + password = os.getenv("REDIS_PASSWORD", None) + ssl_cert_reqs = (os.getenv("REDIS_SSL_CERT_REQS", "required") or "required").strip() + ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) + + kwargs: Dict[str, Any] = {"decode_responses": True} + if username: + kwargs["username"] = username + if password: + kwargs["password"] = password + + use_tls = redis_url.startswith("rediss://") or (os.getenv("REDIS_USE_TLS", "False").lower() == "true") + if use_tls and not redis_url.startswith("rediss://"): + if redis_url.startswith("redis://"): + redis_url = redis_url.replace("redis://", "rediss://", 1) + else: + redis_url = f"rediss://{redis_url}" + + if use_tls or redis_url.startswith("rediss://"): + kwargs["ssl_cert_reqs"] = ssl_cert_reqs + if ssl_ca_certs: + kwargs["ssl_ca_certs"] = ssl_ca_certs + # redis-py: ssl_check_hostname 用于控制主机名校验 + if ssl_cert_reqs == "required": + kwargs["ssl_check_hostname"] = True + else: + kwargs["ssl_check_hostname"] = False + + return redis_url, kwargs + + +async def _get_redis(): + if redis_async is None: + return None + redis_url, kwargs = _redis_connection_kwargs() + try: + client = redis_async.from_url(redis_url, **kwargs) + await client.ping() + return client + except Exception as e: + logger.warning(f"Redis 不可用(recommendations): {e}") + return None + + +async def _get_cached_json(client, key: str) -> Optional[Any]: + try: + raw = await client.get(key) + if not raw: + return None + return json.loads(raw) + except Exception: + return None + + +async def _set_cached_json(client, key: str, value: Any, ttl_sec: int) -> None: + await client.setex(key, ttl_sec, json.dumps(value, ensure_ascii=False)) + + +async def _refresh_mark_price_all_if_needed(client, min_refresh_sec: int = 10, ttl_sec: int = 30) -> Dict[str, Any]: + """ + 获取并缓存 Binance U本位合约 Mark Price(premiumIndex)。 + - 使用单 key 全量缓存,适合给“推荐页面当前价格”用 + - 用 Redis NX lock 防止并发刷新风暴 + """ + now_ms = int(__import__("time").time() * 1000) + + cached = await _get_cached_json(client, REDIS_KEY_MARK_PRICE_ALL) + if isinstance(cached, dict): + updated_at_ms = cached.get("updated_at_ms") + try: + updated_at_ms = int(updated_at_ms) if updated_at_ms is not None else None + except Exception: + updated_at_ms = None + if updated_at_ms and (now_ms - updated_at_ms) < (min_refresh_sec * 1000): + return cached + + # 尝试抢锁刷新(锁很短,避免慢请求时阻塞) + try: + locked = await client.set(REDIS_KEY_MARK_PRICE_ALL_LOCK, str(now_ms), nx=True, ex=min_refresh_sec) + except Exception: + locked = False + + if not locked: + # 没拿到锁就直接返回旧值(如果有) + return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}} + + # 真的去 Binance 拉一次 + url = "https://fapi.binance.com/fapi/v1/premiumIndex" + try: + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=6)) as session: + async with session.get(url) as resp: + if resp.status != 200: + raise RuntimeError(f"binance premiumIndex http={resp.status}") + payload = await resp.json() + except Exception as e: + logger.warning(f"刷新 mark price 失败: {e}") + # 刷新失败,返回旧值 + return cached if isinstance(cached, dict) else {"updated_at_ms": None, "updated_at": None, "items": {}} + + items: Dict[str, Dict[str, Any]] = {} + if isinstance(payload, list): + for row in payload: + if not isinstance(row, dict): + continue + sym = row.get("symbol") + if not sym or not isinstance(sym, str): + continue + # 只保留 USDT 永续相关(避免无关数据撑爆) + if not sym.endswith("USDT"): + continue + try: + mp = float(row.get("markPrice", 0) or 0) + except Exception: + mp = 0.0 + t = row.get("time") # 毫秒 + try: + t = int(t) if t is not None else None + except Exception: + t = None + items[sym] = {"markPrice": mp, "time": t} + + snapshot = { + "updated_at_ms": now_ms, + "updated_at": _beijing_time_str(), + "items": items, + "source": "binance_premiumIndex", + } + try: + await _set_cached_json(client, REDIS_KEY_MARK_PRICE_ALL, snapshot, ttl_sec=ttl_sec) + except Exception as e: + logger.warning(f"写入 {REDIS_KEY_MARK_PRICE_ALL} 失败(不影响返回): {e}") + return snapshot + @router.get("") async def get_recommendations( @@ -36,100 +196,83 @@ async def get_recommendations( """ try: if type == 'realtime': - # 从Redis缓存读取推荐(如果存在) - import sys - from pathlib import Path - current_file = Path(__file__) - backend_path = current_file.parent.parent.parent - project_root = backend_path.parent - trading_system_path = project_root / 'trading_system' - - if not trading_system_path.exists(): - alternative_path = backend_path / 'trading_system' - if alternative_path.exists(): - trading_system_path = alternative_path - else: - raise HTTPException( - status_code=500, - detail=f"交易系统模块不存在" - ) - - sys.path.insert(0, str(trading_system_path)) - sys.path.insert(0, str(project_root)) - - from binance_client import BinanceClient - import config - - # 创建客户端(用于访问Redis) - client = BinanceClient( - api_key=config.BINANCE_API_KEY, - api_secret=config.BINANCE_API_SECRET, - testnet=config.USE_TESTNET - ) - - # 连接Redis(如果还没有连接) - try: - await client.redis_cache.connect() - except: - pass - - # 从Redis读取推荐 + # 实时推荐:统一只读 Redis(全局一份 snapshot,不在请求里实时生成,避免“页面刷新=触发扫描”) recommendations = [] cache_available = False + snapshot_meta: Dict[str, Any] = {} + + rds = await _get_redis() + if rds is None: + raise HTTPException(status_code=503, detail="Redis 不可用,无法读取推荐缓存") + + # 1) 优先读取 snapshot(单 key) try: - import time - cache_key = "recommendations:realtime" - cached_data = await client.redis_cache.hgetall(cache_key) - if cached_data: - cache_available = True - current_time = time.time() - max_age = 3600 * 2 # 推荐最大保留时间:2小时 - - # 过滤过期推荐 - for rec in cached_data.values(): - if isinstance(rec, dict): - rec_timestamp = rec.get('timestamp', 0) - # 如果推荐时间超过2小时,跳过 - if current_time - rec_timestamp > max_age: - continue - recommendations.append(rec) - - # 按时间戳降序排序(最新的在前) - recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True) - logger.info(f"从Redis读取到 {len(recommendations)} 个有效推荐(已过滤过期)") + snapshot = await _get_cached_json(rds, REDIS_KEY_RECOMMENDATIONS_SNAPSHOT) + if isinstance(snapshot, dict): + items = snapshot.get("items", []) + if isinstance(items, list) and items: + recommendations = items + cache_available = True + snapshot_meta = { + "generated_at_ms": snapshot.get("generated_at_ms"), + "generated_at": snapshot.get("generated_at"), + "ttl_sec": snapshot.get("ttl_sec"), + } except Exception as e: - logger.warning(f"从Redis读取推荐失败: {e}") - - # 如果Redis中没有推荐,实时生成 + logger.warning(f"读取 {REDIS_KEY_RECOMMENDATIONS_SNAPSHOT} 失败: {e}") + + # 2) 兼容旧缓存(Hash) if not recommendations: - logger.info("Redis中没有推荐,实时生成推荐...") try: - from market_scanner import MarketScanner - from risk_manager import RiskManager - from trade_recommender import TradeRecommender - - await client.connect() - - try: - scanner = MarketScanner(client) - risk_manager = RiskManager(client) - recommender = TradeRecommender(client, scanner, risk_manager) - - # 生成推荐(会自动保存到Redis) - # 降低信号强度阈值以获取更多推荐(推荐系统可以更宽松) - recommendations = await recommender.generate_recommendations( - min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐(最低2) - max_recommendations=max(limit, 50), # 至少生成50个推荐 - add_to_cache=True, - min_quality_score=0.0 - ) - logger.info(f"实时生成了 {len(recommendations)} 个推荐") - finally: - await client.disconnect() + import time as _time + cached_data = await rds.hgetall(REDIS_KEY_RECOMMENDATIONS_HASH) + if cached_data: + cache_available = True + current_time = _time.time() + max_age = 3600 * 2 # 推荐最大保留时间:2小时 + for raw in cached_data.values(): + try: + rec = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + rec = None + if isinstance(rec, dict): + rec_timestamp = rec.get('timestamp', 0) + if current_time - float(rec_timestamp or 0) > max_age: + continue + recommendations.append(rec) + recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True) + logger.info(f"从Redis(Hash)读取到 {len(recommendations)} 个有效推荐(已过滤过期)") except Exception as e: - logger.error(f"实时生成推荐失败: {e}") - import traceback - logger.error(traceback.format_exc()) + logger.warning(f"从Redis(Hash)读取推荐失败: {e}") + + # 3) 合并“标记价”(mark price)作为真正的“当前价格”,并携带时间戳用于前端展示 + mark_snapshot = {} + try: + mark_snapshot = await _refresh_mark_price_all_if_needed(rds) + except Exception as e: + logger.debug(f"刷新/读取 mark price 失败(不影响返回): {e}") + + mark_items = mark_snapshot.get("items", {}) if isinstance(mark_snapshot, dict) else {} + mark_updated_at = mark_snapshot.get("updated_at") + mark_updated_at_ms = mark_snapshot.get("updated_at_ms") + + if isinstance(mark_items, dict) and mark_items: + for rec in recommendations: + if not isinstance(rec, dict): + continue + sym = rec.get("symbol") + if not sym: + continue + mp = mark_items.get(sym) + if isinstance(mp, dict) and mp.get("markPrice"): + rec["current_price"] = float(mp.get("markPrice") or rec.get("current_price") or 0) + rec["current_price_source"] = "mark_price" + rec["current_price_time_ms"] = mp.get("time") or mark_updated_at_ms + rec["price_updated"] = True + else: + # 保留推荐生成时的 current_price,但给出来源/时间(用于提示“可能过时”) + rec.setdefault("current_price_source", rec.get("current_price_source") or "snapshot") + rec.setdefault("price_updated", False) # 方向过滤 if direction: @@ -143,6 +286,12 @@ async def get_recommendations( "count": len(recommendations), "type": "realtime", "from_cache": cache_available, + "meta": { + **snapshot_meta, + "price_source": "mark_price" if mark_items else None, + "price_updated_at": mark_updated_at, + "price_updated_at_ms": mark_updated_at_ms, + }, "data": recommendations } diff --git a/frontend/src/components/Recommendations.css b/frontend/src/components/Recommendations.css index f16b6e4..06d5edf 100644 --- a/frontend/src/components/Recommendations.css +++ b/frontend/src/components/Recommendations.css @@ -230,6 +230,17 @@ gap: 8px; } +.price-item.price-meta { + font-size: 12px; + color: #666; +} + +.price-updated-badge { + margin-left: 6px; + font-size: 12px; + vertical-align: middle; +} + .price-item label { font-weight: bold; color: #666; diff --git a/frontend/src/components/Recommendations.jsx b/frontend/src/components/Recommendations.jsx index a46fb29..25724c5 100644 --- a/frontend/src/components/Recommendations.jsx +++ b/frontend/src/components/Recommendations.jsx @@ -204,6 +204,25 @@ function Recommendations() { } } + const formatTimeMs = (ms) => { + if (!ms && ms !== 0) return '-' + try { + const date = new Date(Number(ms)) + if (isNaN(date.getTime())) return '-' + return date.toLocaleString('zh-CN', { + year: 'numeric', + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + timeZone: 'Asia/Shanghai' + }) + } catch (e) { + return '-' + } + } + const getStatusBadge = (status) => { const statusMap = { active: { text: '有效', class: 'status-active' }, @@ -338,11 +357,25 @@ function Recommendations() { {parseFloat(rec.current_price || 0).toFixed(4)} USDT - {rec.price_updated && ( - 🟢 + {rec.current_price_source === 'mark_price' && ( + + 🟢 + )} + {(rec.current_price_source || rec.current_price_time_ms) && ( +
+ + + {rec.current_price_source === 'mark_price' ? '标记价' : (rec.current_price_source || 'snapshot')} + {rec.current_price_time_ms ? ` · ${formatTimeMs(rec.current_price_time_ms)}` : ''} + +
+ )} {rec.change_percent !== undefined && rec.change_percent !== null && (
diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 1a075df..7eb15d2 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -511,6 +511,7 @@ class BinanceClient: ) result = {} + now_ms = int(__import__("time").time() * 1000) for ticker in tickers: symbol = ticker['symbol'] if symbol.endswith('USDT'): @@ -518,7 +519,9 @@ class BinanceClient: 'symbol': symbol, 'price': float(ticker.get('lastPrice', 0)), 'volume': float(ticker.get('quoteVolume', 0)), - 'changePercent': float(ticker.get('priceChangePercent', 0)) + 'changePercent': float(ticker.get('priceChangePercent', 0)), + # 用于前端展示“当前价格更新时间”(以及后端合并时判断新鲜度) + 'ts': now_ms } # 写入 Redis 缓存(TTL: 30秒) diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 16b6441..2bab2c2 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -183,6 +183,19 @@ class MarketScanner: if not ticker: return None + # 24h ticker 的“最新价”(更接近用户理解的“当前价格”) + # 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳 + ticker_price = ticker.get('price') + try: + ticker_price = float(ticker_price) if ticker_price is not None else None + except Exception: + ticker_price = None + ticker_ts = ticker.get('ts') + try: + ticker_ts = int(ticker_ts) if ticker_ts is not None else None + except Exception: + ticker_ts = None + # 获取更多K线数据用于技术指标计算(使用配置的主周期) primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') klines = await self.client.get_klines( @@ -270,9 +283,17 @@ class MarketScanner: return { 'symbol': symbol, + # 技术分析使用的价格(K线收盘价) 'price': current_price, + 'kline_close_price': current_price, + # 展示用“当前价”(24h ticker 最新价,通常更贴近用户的直觉) + 'ticker_price': ticker_price, + 'ticker_ts': ticker_ts, 'prevPrice': prev_price, - 'changePercent': change_percent, + # 1) 主周期涨跌幅(用于内部信号) + 'kline_change_percent': change_percent, + # 2) 24h涨跌幅(用于前端展示更符合“24h涨跌”的文案) + 'changePercent': float(ticker.get('changePercent', 0) or 0), 'volume24h': ticker.get('volume', 0), 'direction': 'UP' if change_percent > 0 else 'DOWN', 'rsi': rsi, diff --git a/trading_system/trade_recommender.py b/trading_system/trade_recommender.py index a98fb1d..278c047 100644 --- a/trading_system/trade_recommender.py +++ b/trading_system/trade_recommender.py @@ -194,6 +194,7 @@ class TradeRecommender: if add_to_cache: try: cache_key = "recommendations:realtime" + snapshot_key = "recommendations:snapshot" # 增量更新:只更新或添加新的推荐,保留其他推荐 updated_count = 0 @@ -210,6 +211,20 @@ class TradeRecommender: except: pass + # 写入“全量快照”(单 key,一份数据给所有用户读) + try: + now_ms = int(__import__("time").time() * 1000) + snapshot = { + "generated_at_ms": now_ms, + "generated_at": datetime.now().isoformat(), + "ttl_sec": 7200, + "count": len(final_recommendations), + "items": final_recommendations, + } + await self.client.redis_cache.set(snapshot_key, snapshot, ttl=7200) # 2小时有效 + except Exception as e: + logger.warning(f"写入 recommendations:snapshot 失败(不影响返回): {e}") + logger.info(f"已更新 {updated_count} 个推荐到Redis缓存(总计 {len(final_recommendations)} 个)") except Exception as e: logger.warning(f"保存推荐到Redis失败: {e}") @@ -355,7 +370,30 @@ class TradeRecommender: """ try: symbol = symbol_info['symbol'] - current_price = symbol_info['price'] + # 技术分析价(K线收盘价):用于指标/信号一致性 + analysis_price = symbol_info.get('kline_close_price', symbol_info.get('price')) + try: + analysis_price = float(analysis_price) if analysis_price is not None else None + except Exception: + analysis_price = None + + # 展示用“当前价”:优先用 ticker_price(更贴近用户理解的“当前价格”) + current_price = symbol_info.get('ticker_price', None) + try: + current_price = float(current_price) if current_price is not None else None + except Exception: + current_price = None + + # 回退:如果拿不到 ticker_price,就用分析价兜底 + if current_price is None: + current_price = analysis_price + + price_ts_ms = symbol_info.get("ticker_ts", None) + try: + price_ts_ms = int(price_ts_ms) if price_ts_ms is not None else None + except Exception: + price_ts_ms = None + direction = trade_signal['direction'] # 计算建议的止损止盈(基于保证金) @@ -473,11 +511,16 @@ class TradeRecommender: 'symbol': symbol, 'direction': direction, 'current_price': current_price, + # 价格元信息:前端用于展示“这是不是实时的” + 'current_price_source': 'ticker_24h' if symbol_info.get('ticker_price') is not None else 'kline_close', + 'current_price_time_ms': price_ts_ms, 'change_percent': symbol_info.get('changePercent', 0), 'recommendation_reason': trade_signal['reason'], 'signal_strength': signal_strength, 'market_regime': market_regime, 'trend_4h': trend_4h, + # 额外保留分析价(用于排查“为什么信号这样算”) + 'analysis_price': analysis_price, 'rsi': symbol_info.get('rsi'), 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, @@ -523,12 +566,112 @@ class TradeRecommender: f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%, " f"分类: {recommendation_category}, 风险: {risk_level})" ) - - return [limit_recommendation] + + recs = [limit_recommendation] + # 将“单条推荐”也写入 Redis(统一推荐缓存来源,避免只在手动 generate 时才更新) + await self._update_realtime_cache(recs) + return recs except Exception as e: logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}", exc_info=True) return None + + async def _update_realtime_cache(self, recs: List[Dict]) -> None: + """ + 将推荐写入 Redis: + - Hash: recommendations:realtime(增量更新) + - Snapshot: recommendations:snapshot(全量快照,给前端统一读取) + """ + if not recs: + return + try: + rc = getattr(self.client, "redis_cache", None) + if not rc or not getattr(rc, "redis", None) or not getattr(rc, "_connected", False): + return + + cache_key = "recommendations:realtime" + snapshot_key = "recommendations:snapshot" + lock_key = "lock:recommendations:snapshot:update" + + # 1) 增量写 Hash(每条推荐一个 field) + for rec in recs: + if not isinstance(rec, dict): + continue + sym = rec.get("symbol") + if not sym: + continue + rec_key = f"{sym}_{rec.get('order_type', 'LIMIT')}" + await rc.hset(cache_key, rec_key, rec, ttl=3600) + try: + await rc.redis.expire(cache_key, 3600) + except Exception: + pass + + # 2) 合并写 Snapshot(单 key,给所有用户读) + now_ms = int(__import__("time").time() * 1000) + try: + got_lock = await rc.redis.set(lock_key, str(now_ms), nx=True, ex=2) + except Exception: + got_lock = False + if not got_lock: + return + + existing = None + try: + existing = await rc.get(snapshot_key) + except Exception: + existing = None + + # 从 snapshot 取旧 items;如果没有,就从 Hash 兜底(避免首次写入为空) + items: List[Dict] = [] + if isinstance(existing, dict) and isinstance(existing.get("items"), list): + items = [x for x in existing.get("items", []) if isinstance(x, dict)] + + # 合并:按 rec_key 覆盖(同 symbol + order_type) + merged: Dict[str, Dict] = {} + for x in items: + sym = x.get("symbol") + if not sym: + continue + k = f"{sym}_{x.get('order_type', 'LIMIT')}" + merged[k] = x + for x in recs: + if not isinstance(x, dict): + continue + sym = x.get("symbol") + if not sym: + continue + k = f"{sym}_{x.get('order_type', 'LIMIT')}" + merged[k] = x + + # 过滤过期(2小时) + max_age_sec = 3600 * 2 + now_s = __import__("time").time() + filtered: List[Dict] = [] + for x in merged.values(): + ts = x.get("timestamp", 0) + try: + ts = float(ts) if ts is not None else 0.0 + except Exception: + ts = 0.0 + if ts and (now_s - ts) > max_age_sec: + continue + filtered.append(x) + + filtered.sort(key=lambda x: x.get("timestamp", 0) or 0, reverse=True) + # 防止快照无限长(前端默认 limit=50,这里留一点冗余) + filtered = filtered[:200] + + snapshot = { + "generated_at_ms": now_ms, + "generated_at": datetime.now().isoformat(), + "ttl_sec": 7200, + "count": len(filtered), + "items": filtered, + } + await rc.set(snapshot_key, snapshot, ttl=7200) + except Exception as e: + logger.debug(f"更新推荐 Redis 缓存失败(可忽略): {e}") def _classify_recommendation( self,