auto_trade_sys/backend/api/routes/public.py
薇薇安 4d26777845 a
2026-01-21 11:48:41 +08:00

184 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
公开只读状态接口(非管理员也可访问)
用途:
- 普通用户能看到后端是否在线、启动时间、推荐是否在更新snapshot 时间)
- recommendations-viewer 也可复用该接口展示“服务状态”
安全原则:
- 不返回任何敏感信息(不返回密钥、密码、完整 Redis URL 等)
"""
from __future__ import annotations
import json
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Tuple
from fastapi import APIRouter
try:
import redis.asyncio as redis_async
except Exception: # pragma: no cover
redis_async = None
router = APIRouter(prefix="/api/public", tags=["public"])
_STARTED_AT_MS = int(time.time() * 1000)
REDIS_KEY_RECOMMENDATIONS_SNAPSHOT = "recommendations:snapshot"
def _beijing_time_str(ts_ms: Optional[int] = None) -> str:
beijing_tz = timezone(timedelta(hours=8))
if ts_ms is None:
return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
return datetime.fromtimestamp(ts_ms / 1000, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
def _mask_redis_url(redis_url: str) -> str:
s = (redis_url or "").strip()
if not s:
return ""
# 简单脱敏:去掉 username/password如果有
# rediss://user:pass@host:6379/0 -> rediss://***@host:6379/0
if "://" in s and "@" in s:
scheme, rest = s.split("://", 1)
creds_and_host = rest
# 仅替换 @ 前面的内容
idx = creds_and_host.rfind("@")
if idx > 0:
return f"{scheme}://***@{creds_and_host[idx+1:]}"
return s
def _redis_connection_kwargs() -> Tuple[str, Dict[str, Any]]:
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
username = os.getenv("REDIS_USERNAME", None)
password = os.getenv("REDIS_PASSWORD", None)
ssl_cert_reqs = (os.getenv("REDIS_SSL_CERT_REQS", "required") or "required").strip()
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
select = os.getenv("REDIS_SELECT", None)
try:
select_i = int(select) if select is not None else 0
except Exception:
select_i = 0
kwargs: Dict[str, Any] = {"decode_responses": True}
if username:
kwargs["username"] = username
if password:
kwargs["password"] = password
kwargs["db"] = select_i
use_tls = redis_url.startswith("rediss://") or (os.getenv("REDIS_USE_TLS", "False").lower() == "true")
if use_tls and not redis_url.startswith("rediss://"):
if redis_url.startswith("redis://"):
redis_url = redis_url.replace("redis://", "rediss://", 1)
else:
redis_url = f"rediss://{redis_url}"
if use_tls or redis_url.startswith("rediss://"):
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
if ssl_ca_certs:
kwargs["ssl_ca_certs"] = ssl_ca_certs
kwargs["ssl_check_hostname"] = (ssl_cert_reqs == "required")
return redis_url, kwargs
async def _get_redis():
if redis_async is None:
return None
redis_url, kwargs = _redis_connection_kwargs()
try:
client = redis_async.from_url(redis_url, **kwargs)
await client.ping()
return client
except Exception:
return None
async def _get_cached_json(client, key: str) -> Optional[Any]:
try:
raw = await client.get(key)
if not raw:
return None
return json.loads(raw)
except Exception:
return None
@router.get("/status")
async def public_status():
"""
公共状态:
- backend在线/启动时间
- redis可用性不暴露密码
- recommendationssnapshot 最新生成时间(若推荐进程在跑,会持续更新)
"""
now_ms = int(time.time() * 1000)
# Redis + 推荐快照
redis_ok = False
reco: Dict[str, Any] = {"snapshot_ok": False}
redis_meta: Dict[str, Any] = {"ok": False, "db": int(os.getenv("REDIS_SELECT", "0") or 0), "url": _mask_redis_url(os.getenv("REDIS_URL", ""))}
rds = await _get_redis()
if rds is not None:
redis_ok = True
redis_meta["ok"] = True
try:
snap = await _get_cached_json(rds, REDIS_KEY_RECOMMENDATIONS_SNAPSHOT)
except Exception:
snap = None
if isinstance(snap, dict):
gen_ms = snap.get("generated_at_ms")
try:
gen_ms = int(gen_ms) if gen_ms is not None else None
except Exception:
gen_ms = None
count = snap.get("count")
try:
count = int(count) if count is not None else None
except Exception:
count = None
age_sec = None
if gen_ms:
age_sec = max(0, int((now_ms - gen_ms) / 1000))
reco = {
"snapshot_ok": True,
"generated_at_ms": gen_ms,
"generated_at": snap.get("generated_at"),
"generated_at_beijing": _beijing_time_str(gen_ms) if gen_ms else None,
"age_sec": age_sec,
"count": count,
"ttl_sec": snap.get("ttl_sec"),
}
try:
await rds.close()
except Exception:
pass
return {
"backend": {
"running": True,
"started_at_ms": _STARTED_AT_MS,
"started_at": _beijing_time_str(_STARTED_AT_MS),
"now_ms": now_ms,
"now": _beijing_time_str(now_ms),
},
"redis": redis_meta,
"recommendations": reco,
"auth": {
"enabled": (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower() not in {"0", "false", "no"},
},
}