957 lines
32 KiB
Python
957 lines
32 KiB
Python
import os
|
||
import re
|
||
import subprocess
|
||
import json
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional, Tuple
|
||
|
||
from fastapi import APIRouter, HTTPException, Header, Depends
|
||
from pydantic import BaseModel
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 路由统一挂在 /api/system 下,前端直接调用 /api/system/...
|
||
router = APIRouter(prefix="/api/system")
|
||
|
||
# 管理员鉴权(JWT;未启用登录时兼容 X-Admin-Token)
|
||
from api.auth_deps import require_system_admin # noqa: E402
|
||
|
||
LOG_GROUPS = ("error", "warning", "info")
|
||
|
||
# 后端服务启动时间(用于前端展示“运行多久/是否已重启”)
|
||
_BACKEND_STARTED_AT_MS: int = int(time.time() * 1000)
|
||
|
||
# 系统元信息存储(优先 Redis;用于记录重启时间等)
|
||
def _system_meta_prefix() -> str:
|
||
return (os.getenv("SYSTEM_META_PREFIX", "ats:system").strip() or "ats:system")
|
||
|
||
|
||
def _system_meta_key(name: str) -> str:
|
||
name = (name or "").strip()
|
||
return f"{_system_meta_prefix()}:{name}"
|
||
|
||
|
||
def _system_meta_read(name: str) -> Optional[Dict[str, Any]]:
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
return None
|
||
try:
|
||
raw = client.get(_system_meta_key(name))
|
||
if not raw:
|
||
return None
|
||
return json.loads(raw)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _system_meta_write(name: str, payload: Dict[str, Any], ttl_sec: int = 30 * 24 * 3600) -> None:
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
return
|
||
try:
|
||
client.setex(_system_meta_key(name), int(ttl_sec), json.dumps(payload, ensure_ascii=False))
|
||
except Exception:
|
||
return
|
||
|
||
# 避免 Redis 异常刷屏(前端可能自动刷新)
|
||
_last_logs_redis_err_ts: float = 0.0
|
||
|
||
|
||
def _logs_prefix() -> str:
|
||
return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs")
|
||
|
||
|
||
def _logs_key_for_group(group: str) -> str:
|
||
group = (group or "error").strip().lower()
|
||
# 兼容旧配置:REDIS_LOG_LIST_KEY 仅用于 error
|
||
if group == "error":
|
||
legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip()
|
||
if legacy:
|
||
return legacy
|
||
|
||
env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip()
|
||
if env_key:
|
||
return env_key
|
||
|
||
return f"{_logs_prefix()}:{group}"
|
||
|
||
|
||
def _logs_config_key() -> str:
|
||
return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config")
|
||
|
||
|
||
def _logs_stats_prefix() -> str:
|
||
return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added")
|
||
|
||
|
||
def _beijing_yyyymmdd() -> str:
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
return datetime.now(tz=beijing_tz).strftime("%Y%m%d")
|
||
|
||
|
||
def _default_logs_config() -> Dict[str, Any]:
|
||
return {
|
||
"max_len": {"error": 2000, "warning": 2000, "info": 2000},
|
||
"enabled": {"error": True, "warning": True, "info": True},
|
||
"dedupe_consecutive": True,
|
||
"include_debug_in_info": False,
|
||
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
|
||
"config_key": _logs_config_key(),
|
||
"stats_prefix": _logs_stats_prefix(),
|
||
}
|
||
|
||
|
||
def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]:
|
||
cfg = defaults
|
||
|
||
for g in LOG_GROUPS:
|
||
v = redis_hash.get(f"max_len:{g}")
|
||
if v is not None:
|
||
try:
|
||
n = int(str(v).strip())
|
||
if n > 0:
|
||
cfg["max_len"][g] = n
|
||
except Exception:
|
||
pass
|
||
|
||
ev = redis_hash.get(f"enabled:{g}")
|
||
if ev is not None:
|
||
s = str(ev).strip().lower()
|
||
cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on")
|
||
|
||
for k in ("dedupe_consecutive", "include_debug_in_info"):
|
||
vv = redis_hash.get(k)
|
||
if vv is not None:
|
||
s = str(vv).strip().lower()
|
||
cfg[k] = s in ("1", "true", "yes", "y", "on")
|
||
|
||
return cfg
|
||
|
||
|
||
def _read_logs_config(client) -> Dict[str, Any]:
|
||
defaults = _default_logs_config()
|
||
try:
|
||
raw = client.hgetall(_logs_config_key()) or {}
|
||
return _merge_logs_config(defaults, raw)
|
||
except Exception:
|
||
return defaults
|
||
|
||
|
||
def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]:
|
||
mapping: Dict[str, str] = {}
|
||
for g in LOG_GROUPS:
|
||
mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g]))
|
||
mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0"
|
||
mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0"
|
||
mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0"
|
||
|
||
# 注意:AWS Valkey/Redis 集群模式下,MULTI/EXEC 不能跨 slot
|
||
# 这里会同时操作多个 key(config hash + 3 个 list),所以必须禁用 transaction
|
||
pipe = client.pipeline(transaction=False)
|
||
pipe.hset(_logs_config_key(), mapping=mapping)
|
||
for g in LOG_GROUPS:
|
||
key = _logs_key_for_group(g)
|
||
max_len = int(cfg["max_len"][g])
|
||
if max_len > 0:
|
||
pipe.ltrim(key, 0, max_len - 1)
|
||
pipe.execute()
|
||
return cfg
|
||
|
||
|
||
class LogsConfigUpdate(BaseModel):
|
||
max_len: Optional[Dict[str, int]] = None
|
||
enabled: Optional[Dict[str, bool]] = None
|
||
dedupe_consecutive: Optional[bool] = None
|
||
include_debug_in_info: Optional[bool] = None
|
||
|
||
|
||
def _beijing_time_str() -> str:
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
@router.post("/logs/test-write")
|
||
async def logs_test_write(
|
||
_admin: Dict[str, Any] = Depends(require_system_admin),
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
写入 3 条测试日志到 Redis(error/warning/info),用于验证“是否写入到同一台 Redis、同一组 key”。
|
||
"""
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
raise HTTPException(status_code=503, detail="Redis 不可用,无法写入测试日志")
|
||
|
||
cfg = _read_logs_config(client)
|
||
now_ms = int(__import__("time").time() * 1000)
|
||
time_str = _beijing_time_str()
|
||
|
||
entries = {
|
||
"error": {"level": "ERROR", "message": f"[TEST] backend 写入 error 测试日志 @ {time_str}"},
|
||
"warning": {"level": "WARNING", "message": f"[TEST] backend 写入 warning 测试日志 @ {time_str}"},
|
||
"info": {"level": "INFO", "message": f"[TEST] backend 写入 info 测试日志 @ {time_str}"},
|
||
}
|
||
|
||
day = _beijing_yyyymmdd()
|
||
stats_prefix = _logs_stats_prefix()
|
||
|
||
# 集群模式下禁用 transaction,避免 CROSSSLOT
|
||
pipe = client.pipeline(transaction=False)
|
||
for g in LOG_GROUPS:
|
||
key = _logs_key_for_group(g)
|
||
max_len = int(cfg["max_len"][g])
|
||
entry = {
|
||
"ts": now_ms,
|
||
"time": time_str,
|
||
"service": "backend",
|
||
"level": entries[g]["level"],
|
||
"logger": "api.routes.system",
|
||
"message": entries[g]["message"],
|
||
"hostname": os.getenv("HOSTNAME", ""),
|
||
"signature": f"backend|{entries[g]['level']}|test|{entries[g]['message']}",
|
||
"count": 1,
|
||
}
|
||
pipe.lpush(key, json.dumps(entry, ensure_ascii=False))
|
||
if max_len > 0:
|
||
pipe.ltrim(key, 0, max_len - 1)
|
||
pipe.incr(f"{stats_prefix}:{day}:{g}", 1)
|
||
pipe.expire(f"{stats_prefix}:{day}:{g}", 14 * 24 * 3600)
|
||
|
||
pipe.execute()
|
||
|
||
# 返回写入后的 LLEN,便于你确认
|
||
# 单 key LLEN 查询也不需要 transaction
|
||
pipe2 = client.pipeline(transaction=False)
|
||
for g in LOG_GROUPS:
|
||
pipe2.llen(_logs_key_for_group(g))
|
||
llens = pipe2.execute()
|
||
|
||
return {
|
||
"message": "ok",
|
||
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
|
||
"llen": {g: int(llens[i] or 0) for i, g in enumerate(LOG_GROUPS)},
|
||
"note": "如果你在前端仍看不到,说明前端请求的后端实例/Redis key/筛选条件不一致。",
|
||
}
|
||
|
||
|
||
def _get_redis_client_for_logs():
|
||
"""
|
||
获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。
|
||
返回:redis.Redis 或 None
|
||
"""
|
||
# 1) 复用 config_manager(避免重复连接)
|
||
try:
|
||
import config_manager # backend/config_manager.py(已负责加载 .env)
|
||
|
||
cm = getattr(config_manager, "config_manager", None)
|
||
if cm is not None:
|
||
redis_client = getattr(cm, "_redis_client", None)
|
||
redis_connected = getattr(cm, "_redis_connected", False)
|
||
if redis_client is not None and redis_connected:
|
||
try:
|
||
redis_client.ping()
|
||
return redis_client
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) 自行创建
|
||
try:
|
||
import redis # type: ignore
|
||
|
||
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
|
||
redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true"
|
||
redis_username = os.getenv("REDIS_USERNAME", None)
|
||
redis_password = os.getenv("REDIS_PASSWORD", None)
|
||
ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required")
|
||
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
|
||
|
||
kwargs: Dict[str, Any] = {
|
||
"decode_responses": True,
|
||
"username": redis_username,
|
||
"password": redis_password,
|
||
"socket_connect_timeout": 1,
|
||
"socket_timeout": 1,
|
||
}
|
||
if redis_url.startswith("rediss://") or redis_use_tls:
|
||
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
|
||
if ssl_ca_certs:
|
||
kwargs["ssl_ca_certs"] = ssl_ca_certs
|
||
if ssl_cert_reqs == "none":
|
||
kwargs["ssl_check_hostname"] = False
|
||
elif ssl_cert_reqs == "required":
|
||
kwargs["ssl_check_hostname"] = True
|
||
else:
|
||
kwargs["ssl_check_hostname"] = False
|
||
|
||
client = redis.from_url(redis_url, **kwargs)
|
||
client.ping()
|
||
return client
|
||
except Exception as e:
|
||
# 把错误尽量打到 api.log(你现在看的文件)
|
||
global _last_logs_redis_err_ts
|
||
import time as _t
|
||
|
||
now = _t.time()
|
||
if now - _last_logs_redis_err_ts > 30:
|
||
_last_logs_redis_err_ts = now
|
||
logger.warning(f"日志模块 Redis 连接失败。REDIS_URL={os.getenv('REDIS_URL', '')} err={e}")
|
||
return None
|
||
|
||
|
||
@router.get("/logs")
|
||
async def get_logs(
|
||
limit: int = 200,
|
||
group: str = "error",
|
||
start: int = 0,
|
||
service: Optional[str] = None,
|
||
level: Optional[str] = None,
|
||
_admin: Dict[str, Any] = Depends(require_system_admin),
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
从 Redis List 读取最新日志(默认 group=error -> ats:logs:error)。
|
||
|
||
参数:
|
||
- limit: 返回条数(最大 2000)
|
||
- group: 日志分组(error / warning / info)
|
||
- service: 过滤(backend / trading_system)
|
||
- level: 过滤(ERROR / CRITICAL ...)
|
||
"""
|
||
if limit <= 0:
|
||
limit = 200
|
||
if limit > 20000:
|
||
limit = 20000
|
||
|
||
if start < 0:
|
||
start = 0
|
||
|
||
group = (group or "error").strip().lower()
|
||
if group not in LOG_GROUPS:
|
||
raise HTTPException(status_code=400, detail=f"非法 group:{group}(可选:{', '.join(LOG_GROUPS)})")
|
||
|
||
list_key = _logs_key_for_group(group)
|
||
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志")
|
||
|
||
try:
|
||
llen_total = int(client.llen(list_key) or 0)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}")
|
||
|
||
if llen_total <= 0:
|
||
return {
|
||
"group": group,
|
||
"key": list_key,
|
||
"start": start,
|
||
"limit": limit,
|
||
"llen_total": 0,
|
||
"next_start": start,
|
||
"has_more": False,
|
||
"count": 0,
|
||
"items": [],
|
||
}
|
||
|
||
# 分页扫描:为了支持 service/level 过滤,这里会向后多取一些直到凑够 limit 或到末尾
|
||
# 保护:最多扫描 limit*10 条,避免过滤太严格导致无限扫描
|
||
max_scan = min(llen_total, start + limit * 10)
|
||
pos = start
|
||
scanned = 0
|
||
items: list[Dict[str, Any]] = []
|
||
|
||
try:
|
||
while len(items) < limit and pos < llen_total and pos < max_scan:
|
||
chunk_size = min(500, limit, max_scan - pos)
|
||
end = pos + chunk_size - 1
|
||
raw_batch = client.lrange(list_key, pos, end)
|
||
scanned += len(raw_batch or [])
|
||
|
||
for raw in raw_batch or []:
|
||
try:
|
||
obj = raw
|
||
if isinstance(raw, bytes):
|
||
obj = raw.decode("utf-8", errors="ignore")
|
||
if not isinstance(obj, str):
|
||
continue
|
||
parsed = json.loads(obj)
|
||
if not isinstance(parsed, dict):
|
||
continue
|
||
if service and str(parsed.get("service")) != service:
|
||
continue
|
||
if level and str(parsed.get("level")) != level:
|
||
continue
|
||
items.append(parsed)
|
||
if len(items) >= limit:
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
pos = end + 1
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}")
|
||
|
||
return {
|
||
"group": group,
|
||
"key": list_key,
|
||
"start": start,
|
||
"limit": limit,
|
||
"llen_total": llen_total,
|
||
"scanned": scanned,
|
||
"next_start": pos,
|
||
"has_more": pos < llen_total,
|
||
"count": len(items),
|
||
"items": items,
|
||
}
|
||
|
||
|
||
@router.get("/logs/overview")
|
||
async def logs_overview(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览")
|
||
|
||
try:
|
||
cfg = _read_logs_config(client)
|
||
|
||
day = _beijing_yyyymmdd()
|
||
stats_prefix = _logs_stats_prefix()
|
||
|
||
# 集群模式下禁用 transaction,避免 CROSSSLOT
|
||
pipe = client.pipeline(transaction=False)
|
||
for g in LOG_GROUPS:
|
||
pipe.llen(_logs_key_for_group(g))
|
||
for g in LOG_GROUPS:
|
||
pipe.get(f"{stats_prefix}:{day}:{g}")
|
||
res = pipe.execute()
|
||
|
||
llen_vals = res[: len(LOG_GROUPS)]
|
||
added_vals = res[len(LOG_GROUPS) :]
|
||
|
||
llen: Dict[str, int] = {}
|
||
added_today: Dict[str, int] = {}
|
||
for i, g in enumerate(LOG_GROUPS):
|
||
try:
|
||
llen[g] = int(llen_vals[i] or 0)
|
||
except Exception:
|
||
llen[g] = 0
|
||
try:
|
||
added_today[g] = int(added_vals[i] or 0)
|
||
except Exception:
|
||
added_today[g] = 0
|
||
|
||
return {
|
||
"config": cfg,
|
||
"stats": {
|
||
"day": day,
|
||
"llen": llen,
|
||
"added_today": added_today,
|
||
},
|
||
"meta": {
|
||
"redis_url": os.getenv("REDIS_URL", ""),
|
||
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
|
||
},
|
||
}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"logs_overview 失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"logs_overview failed: {e}")
|
||
|
||
|
||
@router.put("/logs/config")
|
||
async def update_logs_config(
|
||
payload: LogsConfigUpdate,
|
||
_admin: Dict[str, Any] = Depends(require_system_admin),
|
||
) -> Dict[str, Any]:
|
||
client = _get_redis_client_for_logs()
|
||
if client is None:
|
||
raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置")
|
||
|
||
cfg = _read_logs_config(client)
|
||
|
||
if payload.max_len:
|
||
for g, v in payload.max_len.items():
|
||
gg = (g or "").strip().lower()
|
||
if gg not in LOG_GROUPS:
|
||
continue
|
||
try:
|
||
n = int(v)
|
||
if n < 100:
|
||
n = 100
|
||
if n > 20000:
|
||
n = 20000
|
||
cfg["max_len"][gg] = n
|
||
except Exception:
|
||
continue
|
||
|
||
if payload.enabled:
|
||
for g, v in payload.enabled.items():
|
||
gg = (g or "").strip().lower()
|
||
if gg not in LOG_GROUPS:
|
||
continue
|
||
cfg["enabled"][gg] = bool(v)
|
||
|
||
if payload.dedupe_consecutive is not None:
|
||
cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive)
|
||
|
||
if payload.include_debug_in_info is not None:
|
||
cfg["include_debug_in_info"] = bool(payload.include_debug_in_info)
|
||
|
||
cfg = _write_logs_config_and_trim(client, cfg)
|
||
return {"message": "ok", "config": cfg}
|
||
|
||
|
||
def _require_admin(token: Optional[str], provided: Optional[str]) -> None:
|
||
"""
|
||
可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN,则要求请求携带 X-Admin-Token。
|
||
生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。
|
||
"""
|
||
if not token:
|
||
return
|
||
if not provided or provided != token:
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
||
|
||
#
|
||
# 注意:require_system_admin 已迁移到 api.auth_deps,避免导入不一致导致 uvicorn 启动失败
|
||
|
||
|
||
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
|
||
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
|
||
supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip()
|
||
use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true"
|
||
|
||
# 如果没显式配置 SUPERVISOR_CONF,就尝试自动探测常见路径(宝塔/系统)
|
||
if not supervisor_conf:
|
||
candidates = [
|
||
"/www/server/panel/plugin/supervisor/supervisord.conf",
|
||
"/www/server/panel/plugin/supervisor/supervisor.conf",
|
||
"/etc/supervisor/supervisord.conf",
|
||
"/etc/supervisord.conf",
|
||
]
|
||
for p in candidates:
|
||
try:
|
||
if Path(p).exists():
|
||
supervisor_conf = p
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
cmd: list[str] = []
|
||
if use_sudo:
|
||
# 需要你在 sudoers 配置 NOPASSWD(sudo -n 才不会卡住)
|
||
cmd += ["sudo", "-n"]
|
||
cmd += [supervisorctl_path]
|
||
if supervisor_conf:
|
||
cmd += ["-c", supervisor_conf]
|
||
cmd += args
|
||
return cmd
|
||
|
||
|
||
def _run_supervisorctl(args: list[str]) -> str:
|
||
cmd = _build_supervisorctl_cmd(args)
|
||
try:
|
||
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
raise RuntimeError("supervisorctl 超时(10s)")
|
||
|
||
out = (res.stdout or "").strip()
|
||
err = (res.stderr or "").strip()
|
||
combined = "\n".join([s for s in [out, err] if s]).strip()
|
||
# supervisorctl 约定:
|
||
# - status 在存在 STOPPED/FATAL 等进程时可能返回 exit=3,但输出仍然有效
|
||
ok_rc = {0}
|
||
if args and args[0] == "status":
|
||
ok_rc.add(3)
|
||
if res.returncode not in ok_rc:
|
||
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
|
||
return combined or out
|
||
|
||
|
||
def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
|
||
"""
|
||
典型输出:
|
||
- auto_sys RUNNING pid 1234, uptime 0:10:00
|
||
- auto_sys STOPPED Not started
|
||
"""
|
||
if "RUNNING" in raw:
|
||
m = re.search(r"\bpid\s+(\d+)\b", raw)
|
||
pid = int(m.group(1)) if m else None
|
||
return True, pid, "RUNNING"
|
||
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
|
||
if state in raw:
|
||
return False, None, state
|
||
return False, None, "UNKNOWN"
|
||
|
||
|
||
def _get_program_name() -> str:
|
||
# 你给的宝塔配置是 [program:auto_sys]
|
||
return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys"
|
||
|
||
|
||
def _select_best_process_name(program: str, status_all_raw: str) -> Optional[str]:
|
||
"""
|
||
从 `supervisorctl status` 全量输出中,找到最匹配的真实进程名。
|
||
兼容 supervisor 的 group:process 格式,例如:auto_sys:auto_sys_00
|
||
"""
|
||
if not status_all_raw:
|
||
return None
|
||
|
||
lines = [ln.strip() for ln in status_all_raw.splitlines() if ln.strip()]
|
||
names: list[str] = []
|
||
for ln in lines:
|
||
name = ln.split(None, 1)[0].strip()
|
||
if name:
|
||
names.append(name)
|
||
|
||
# 精确优先:program / program_00 / program:program_00
|
||
preferred = [program, f"{program}_00", f"{program}:{program}_00"]
|
||
for cand in preferred:
|
||
if cand in names:
|
||
return cand
|
||
|
||
# 次优:任意以 program_ 开头
|
||
for name in names:
|
||
if name.startswith(program + "_"):
|
||
return name
|
||
|
||
# 次优:任意以 program: 开头
|
||
for name in names:
|
||
if name.startswith(program + ":"):
|
||
return name
|
||
|
||
return None
|
||
|
||
|
||
def _status_with_fallback(program: str) -> Tuple[str, Optional[str], Optional[str]]:
|
||
"""
|
||
- 优先 `status <program>`
|
||
- 若 no such process:返回全量 status,并尝试解析真实 name(例如 auto_sys:auto_sys_00)
|
||
返回:(raw, resolved_name, status_all)
|
||
"""
|
||
try:
|
||
raw = _run_supervisorctl(["status", program])
|
||
return raw, program, None
|
||
except Exception as e:
|
||
msg = str(e).lower()
|
||
if "no such process" not in msg:
|
||
raise
|
||
|
||
status_all = _run_supervisorctl(["status"])
|
||
resolved = _select_best_process_name(program, status_all)
|
||
if resolved:
|
||
try:
|
||
raw = _run_supervisorctl(["status", resolved])
|
||
return raw, resolved, status_all
|
||
except Exception:
|
||
# 兜底:至少把全量输出返回,方便你确认真实进程名
|
||
return status_all, None, status_all
|
||
return status_all, None, status_all
|
||
|
||
|
||
def _action_with_fallback(action: str, program: str) -> Tuple[str, Optional[str], Optional[str]]:
|
||
"""
|
||
对 start/stop/restart 做兜底:如果 program 不存在,尝试解析真实 name 再执行。
|
||
返回:(output, resolved_name, status_all)
|
||
"""
|
||
try:
|
||
out = _run_supervisorctl([action, program])
|
||
return out, program, None
|
||
except Exception as e:
|
||
msg = str(e).lower()
|
||
if "no such process" not in msg:
|
||
raise
|
||
|
||
status_all = _run_supervisorctl(["status"])
|
||
resolved = _select_best_process_name(program, status_all)
|
||
if not resolved:
|
||
# 没找到就把全量输出带上,方便定位
|
||
raise RuntimeError(f"no such process: {program}. 当前 supervisor 进程列表:\n{status_all}")
|
||
|
||
out = _run_supervisorctl([action, resolved])
|
||
return out, resolved, status_all
|
||
|
||
|
||
@router.post("/clear-cache")
|
||
async def clear_cache(
|
||
_admin: Dict[str, Any] = Depends(require_system_admin),
|
||
x_account_id: Optional[int] = Header(default=None, alias="X-Account-Id"),
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
清理配置缓存(Redis Hash: trading_config),并从数据库回灌到 Redis。
|
||
"""
|
||
try:
|
||
import config_manager
|
||
|
||
account_id = int(x_account_id or 1)
|
||
cm = None
|
||
if hasattr(config_manager, "ConfigManager") and hasattr(config_manager.ConfigManager, "for_account"):
|
||
cm = config_manager.ConfigManager.for_account(account_id)
|
||
else:
|
||
cm = getattr(config_manager, "config_manager", None)
|
||
if cm is None:
|
||
raise HTTPException(status_code=500, detail="config_manager 未初始化")
|
||
|
||
deleted_keys: list[str] = []
|
||
|
||
# 1) 清 backend 本地 cache
|
||
try:
|
||
cm._cache = {}
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) 清 Redis 缓存 key(Hash: trading_config)
|
||
try:
|
||
redis_client = getattr(cm, "_redis_client", None)
|
||
redis_connected = getattr(cm, "_redis_connected", False)
|
||
if redis_client is not None and redis_connected:
|
||
try:
|
||
redis_client.ping()
|
||
except Exception:
|
||
redis_connected = False
|
||
|
||
if redis_client is not None and redis_connected:
|
||
try:
|
||
key = getattr(cm, "_redis_hash_key", "trading_config")
|
||
redis_client.delete(key)
|
||
deleted_keys.append(str(key))
|
||
# 兼容:老 key(仅 default 账号)
|
||
legacy = getattr(cm, "_legacy_hash_key", None)
|
||
if legacy and legacy != key:
|
||
redis_client.delete(legacy)
|
||
deleted_keys.append(str(legacy))
|
||
except Exception as e:
|
||
logger.warning(f"删除 Redis key 失败: {e}")
|
||
|
||
# 可选:实时推荐缓存(如果存在)
|
||
try:
|
||
redis_client.delete("recommendations:realtime")
|
||
deleted_keys.append("recommendations:realtime")
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
logger.warning(f"清 Redis 缓存失败: {e}")
|
||
|
||
# 3) 立刻从 DB 回灌到 Redis(避免 trading_system 读到空)
|
||
try:
|
||
cm.reload()
|
||
except Exception as e:
|
||
logger.warning(f"回灌配置到 Redis 失败(仍可能使用DB/本地cache): {e}")
|
||
|
||
return {
|
||
"message": "缓存已清理并回灌",
|
||
"deleted_keys": deleted_keys,
|
||
"note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。",
|
||
}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"清理缓存失败: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/trading/status")
|
||
async def trading_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
|
||
program = _get_program_name()
|
||
try:
|
||
raw, resolved_name, status_all = _status_with_fallback(program)
|
||
running, pid, state = _parse_supervisor_status(raw)
|
||
meta = _system_meta_read("trading:last_restart") or {}
|
||
return {
|
||
"mode": "supervisor",
|
||
"program": program,
|
||
"resolved_name": resolved_name,
|
||
"running": running,
|
||
"pid": pid,
|
||
"state": state,
|
||
"raw": raw,
|
||
"status_all": status_all,
|
||
"meta": meta,
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO",
|
||
)
|
||
|
||
|
||
@router.post("/trading/start")
|
||
async def trading_start(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
|
||
program = _get_program_name()
|
||
try:
|
||
out, resolved_name, status_all = _action_with_fallback("start", program)
|
||
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
|
||
running, pid, state = _parse_supervisor_status(raw)
|
||
return {
|
||
"message": "交易系统已启动(supervisor)",
|
||
"output": out,
|
||
"status": {
|
||
"mode": "supervisor",
|
||
"program": program,
|
||
"resolved_name": resolved_name2 or resolved_name,
|
||
"running": running,
|
||
"pid": pid,
|
||
"state": state,
|
||
"raw": raw,
|
||
"status_all": status_all2 or status_all,
|
||
},
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}")
|
||
|
||
|
||
@router.post("/trading/stop")
|
||
async def trading_stop(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
|
||
program = _get_program_name()
|
||
try:
|
||
out, resolved_name, status_all = _action_with_fallback("stop", program)
|
||
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
|
||
running, pid, state = _parse_supervisor_status(raw)
|
||
return {
|
||
"message": "交易系统已停止(supervisor)",
|
||
"output": out,
|
||
"status": {
|
||
"mode": "supervisor",
|
||
"program": program,
|
||
"resolved_name": resolved_name2 or resolved_name,
|
||
"running": running,
|
||
"pid": pid,
|
||
"state": state,
|
||
"raw": raw,
|
||
"status_all": status_all2 or status_all,
|
||
},
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}")
|
||
|
||
|
||
@router.post("/trading/restart")
|
||
async def trading_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
|
||
program = _get_program_name()
|
||
try:
|
||
requested_at = _beijing_time_str()
|
||
requested_at_ms = int(time.time() * 1000)
|
||
out, resolved_name, status_all = _action_with_fallback("restart", program)
|
||
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
|
||
running, pid, state = _parse_supervisor_status(raw)
|
||
|
||
# 记录交易系统重启时间(用于前端展示)
|
||
_system_meta_write(
|
||
"trading:last_restart",
|
||
{
|
||
"requested_at": requested_at,
|
||
"requested_at_ms": requested_at_ms,
|
||
"pid": pid,
|
||
"program": resolved_name2 or resolved_name or program,
|
||
},
|
||
)
|
||
return {
|
||
"message": "交易系统已重启(supervisor)",
|
||
"output": out,
|
||
"status": {
|
||
"mode": "supervisor",
|
||
"program": program,
|
||
"resolved_name": resolved_name2 or resolved_name,
|
||
"running": running,
|
||
"pid": pid,
|
||
"state": state,
|
||
"raw": raw,
|
||
"status_all": status_all2 or status_all,
|
||
},
|
||
"meta": {
|
||
"requested_at": requested_at,
|
||
"requested_at_ms": requested_at_ms,
|
||
},
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}")
|
||
|
||
|
||
@router.get("/backend/status")
|
||
async def backend_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
"""
|
||
查看后端服务状态(当前 uvicorn 进程)。
|
||
|
||
说明:
|
||
- pid 使用 os.getpid()(当前 FastAPI 进程)
|
||
- last_restart 从 Redis 读取(若可用)
|
||
"""
|
||
meta = _system_meta_read("backend:last_restart") or {}
|
||
return {
|
||
"running": True,
|
||
"pid": os.getpid(),
|
||
"started_at_ms": _BACKEND_STARTED_AT_MS,
|
||
"started_at": _beijing_time_str(),
|
||
"meta": meta,
|
||
}
|
||
|
||
|
||
@router.post("/backend/restart")
|
||
async def backend_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
|
||
"""
|
||
重启后端服务(uvicorn)。
|
||
|
||
实现方式:
|
||
- 后端启动脚本为 nohup uvicorn ... &
|
||
- 这里通过后台启动 backend/restart.sh 来完成:
|
||
1) grep 找到 uvicorn api.main:app 进程并 kill
|
||
2) 再执行 backend/start.sh 拉起新进程
|
||
|
||
注意:
|
||
- 为了让接口能先返回,这里会延迟 1s 再执行 restart.sh
|
||
"""
|
||
backend_dir = Path(__file__).parent.parent.parent # backend/
|
||
restart_script = backend_dir / "restart.sh"
|
||
if not restart_script.exists():
|
||
raise HTTPException(status_code=500, detail=f"找不到重启脚本: {restart_script}")
|
||
|
||
requested_at = _beijing_time_str()
|
||
requested_at_ms = int(time.time() * 1000)
|
||
cur_pid = os.getpid()
|
||
|
||
_system_meta_write(
|
||
"backend:last_restart",
|
||
{
|
||
"requested_at": requested_at,
|
||
"requested_at_ms": requested_at_ms,
|
||
"pid_before": cur_pid,
|
||
"script": str(restart_script),
|
||
},
|
||
)
|
||
|
||
# 后台执行:sleep 1 后再重启,保证当前请求可以返回
|
||
cmd = ["bash", "-lc", f"sleep 1; '{restart_script}'"]
|
||
try:
|
||
subprocess.Popen(
|
||
cmd,
|
||
cwd=str(backend_dir),
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
start_new_session=True,
|
||
)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"启动重启脚本失败: {e}")
|
||
|
||
return {
|
||
"message": "已发起后端重启(1s 后执行)",
|
||
"pid_before": cur_pid,
|
||
"requested_at": requested_at,
|
||
"requested_at_ms": requested_at_ms,
|
||
"script": str(restart_script),
|
||
"note": "重启期间接口可能短暂不可用,页面可等待 3-5 秒后刷新状态。",
|
||
}
|
||
|