184 lines
5.6 KiB
Python
184 lines
5.6 KiB
Python
"""
|
||
公开只读状态接口(非管理员也可访问)
|
||
|
||
用途:
|
||
- 普通用户能看到:后端是否在线、启动时间、推荐是否在更新(snapshot 时间)
|
||
- recommendations-viewer 也可复用该接口展示“服务状态”
|
||
|
||
安全原则:
|
||
- 不返回任何敏感信息(不返回密钥、密码、完整 Redis URL 等)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any, Dict, Optional, Tuple
|
||
|
||
from fastapi import APIRouter
|
||
|
||
try:
|
||
import redis.asyncio as redis_async
|
||
except Exception: # pragma: no cover
|
||
redis_async = None
|
||
|
||
|
||
router = APIRouter(prefix="/api/public", tags=["public"])
|
||
|
||
_STARTED_AT_MS = int(time.time() * 1000)
|
||
|
||
REDIS_KEY_RECOMMENDATIONS_SNAPSHOT = "recommendations:snapshot"
|
||
|
||
|
||
def _beijing_time_str(ts_ms: Optional[int] = None) -> str:
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
if ts_ms is None:
|
||
return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
|
||
return datetime.fromtimestamp(ts_ms / 1000, tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def _mask_redis_url(redis_url: str) -> str:
|
||
s = (redis_url or "").strip()
|
||
if not s:
|
||
return ""
|
||
# 简单脱敏:去掉 username/password(如果有)
|
||
# rediss://user:pass@host:6379/0 -> rediss://***@host:6379/0
|
||
if "://" in s and "@" in s:
|
||
scheme, rest = s.split("://", 1)
|
||
creds_and_host = rest
|
||
# 仅替换 @ 前面的内容
|
||
idx = creds_and_host.rfind("@")
|
||
if idx > 0:
|
||
return f"{scheme}://***@{creds_and_host[idx+1:]}"
|
||
return s
|
||
|
||
|
||
def _redis_connection_kwargs() -> Tuple[str, Dict[str, Any]]:
|
||
redis_url = (os.getenv("REDIS_URL", "") or "").strip() or "redis://localhost:6379"
|
||
username = os.getenv("REDIS_USERNAME", None)
|
||
password = os.getenv("REDIS_PASSWORD", None)
|
||
ssl_cert_reqs = (os.getenv("REDIS_SSL_CERT_REQS", "required") or "required").strip()
|
||
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
|
||
|
||
select = os.getenv("REDIS_SELECT", None)
|
||
try:
|
||
select_i = int(select) if select is not None else 0
|
||
except Exception:
|
||
select_i = 0
|
||
|
||
kwargs: Dict[str, Any] = {"decode_responses": True}
|
||
if username:
|
||
kwargs["username"] = username
|
||
if password:
|
||
kwargs["password"] = password
|
||
kwargs["db"] = select_i
|
||
|
||
use_tls = redis_url.startswith("rediss://") or (os.getenv("REDIS_USE_TLS", "False").lower() == "true")
|
||
if use_tls and not redis_url.startswith("rediss://"):
|
||
if redis_url.startswith("redis://"):
|
||
redis_url = redis_url.replace("redis://", "rediss://", 1)
|
||
else:
|
||
redis_url = f"rediss://{redis_url}"
|
||
|
||
if use_tls or redis_url.startswith("rediss://"):
|
||
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
|
||
if ssl_ca_certs:
|
||
kwargs["ssl_ca_certs"] = ssl_ca_certs
|
||
kwargs["ssl_check_hostname"] = (ssl_cert_reqs == "required")
|
||
|
||
return redis_url, kwargs
|
||
|
||
|
||
async def _get_redis():
|
||
if redis_async is None:
|
||
return None
|
||
redis_url, kwargs = _redis_connection_kwargs()
|
||
try:
|
||
client = redis_async.from_url(redis_url, **kwargs)
|
||
await client.ping()
|
||
return client
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def _get_cached_json(client, key: str) -> Optional[Any]:
|
||
try:
|
||
raw = await client.get(key)
|
||
if not raw:
|
||
return None
|
||
return json.loads(raw)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
@router.get("/status")
|
||
async def public_status():
|
||
"""
|
||
公共状态:
|
||
- backend:在线/启动时间
|
||
- redis:可用性(不暴露密码)
|
||
- recommendations:snapshot 最新生成时间(若推荐进程在跑,会持续更新)
|
||
"""
|
||
now_ms = int(time.time() * 1000)
|
||
|
||
# Redis + 推荐快照
|
||
redis_ok = False
|
||
reco: Dict[str, Any] = {"snapshot_ok": False}
|
||
redis_meta: Dict[str, Any] = {"ok": False, "db": int(os.getenv("REDIS_SELECT", "0") or 0), "url": _mask_redis_url(os.getenv("REDIS_URL", ""))}
|
||
|
||
rds = await _get_redis()
|
||
if rds is not None:
|
||
redis_ok = True
|
||
redis_meta["ok"] = True
|
||
try:
|
||
snap = await _get_cached_json(rds, REDIS_KEY_RECOMMENDATIONS_SNAPSHOT)
|
||
except Exception:
|
||
snap = None
|
||
|
||
if isinstance(snap, dict):
|
||
gen_ms = snap.get("generated_at_ms")
|
||
try:
|
||
gen_ms = int(gen_ms) if gen_ms is not None else None
|
||
except Exception:
|
||
gen_ms = None
|
||
count = snap.get("count")
|
||
try:
|
||
count = int(count) if count is not None else None
|
||
except Exception:
|
||
count = None
|
||
age_sec = None
|
||
if gen_ms:
|
||
age_sec = max(0, int((now_ms - gen_ms) / 1000))
|
||
reco = {
|
||
"snapshot_ok": True,
|
||
"generated_at_ms": gen_ms,
|
||
"generated_at": snap.get("generated_at"),
|
||
"generated_at_beijing": _beijing_time_str(gen_ms) if gen_ms else None,
|
||
"age_sec": age_sec,
|
||
"count": count,
|
||
"ttl_sec": snap.get("ttl_sec"),
|
||
}
|
||
|
||
try:
|
||
await rds.close()
|
||
except Exception:
|
||
pass
|
||
|
||
return {
|
||
"backend": {
|
||
"running": True,
|
||
"started_at_ms": _STARTED_AT_MS,
|
||
"started_at": _beijing_time_str(_STARTED_AT_MS),
|
||
"now_ms": now_ms,
|
||
"now": _beijing_time_str(now_ms),
|
||
},
|
||
"redis": redis_meta,
|
||
"recommendations": reco,
|
||
"auth": {
|
||
"enabled": (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower() not in {"0", "false", "no"},
|
||
},
|
||
}
|
||
|