auto_trade_sys/backend/api/routes/system.py
薇薇安 38ebbebd95 a
2026-01-18 16:39:57 +08:00

241 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import subprocess
from typing import Any, Dict, Optional, Tuple
from fastapi import APIRouter, HTTPException, Header
import logging
logger = logging.getLogger(__name__)
# 路由统一挂在 /api/system 下,前端直接调用 /api/system/...
router = APIRouter(prefix="/api/system")
def _require_admin(token: Optional[str], provided: Optional[str]) -> None:
"""
可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN则要求请求携带 X-Admin-Token。
生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。
"""
if not token:
return
if not provided or provided != token:
raise HTTPException(status_code=401, detail="Unauthorized")
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip()
use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true"
cmd: list[str] = []
if use_sudo:
# 需要你在 sudoers 配置 NOPASSWDsudo -n 才不会卡住)
cmd += ["sudo", "-n"]
cmd += [supervisorctl_path]
if supervisor_conf:
cmd += ["-c", supervisor_conf]
cmd += args
return cmd
def _run_supervisorctl(args: list[str]) -> str:
cmd = _build_supervisorctl_cmd(args)
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired:
raise RuntimeError("supervisorctl 超时10s")
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
combined = "\n".join([s for s in [out, err] if s]).strip()
if res.returncode != 0:
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
return combined or out
def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
"""
典型输出:
- auto_sys RUNNING pid 1234, uptime 0:10:00
- auto_sys STOPPED Not started
"""
if "RUNNING" in raw:
m = re.search(r"\bpid\s+(\d+)\b", raw)
pid = int(m.group(1)) if m else None
return True, pid, "RUNNING"
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
if state in raw:
return False, None, state
return False, None, "UNKNOWN"
def _get_program_name() -> str:
# 你给的宝塔配置是 [program:auto_sys]
return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys"
@router.post("/clear-cache")
async def clear_cache(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
"""
清理配置缓存Redis Hash: trading_config并从数据库回灌到 Redis。
"""
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
try:
import config_manager
cm = getattr(config_manager, "config_manager", None)
if cm is None:
raise HTTPException(status_code=500, detail="config_manager 未初始化")
deleted_keys: list[str] = []
# 1) 清 backend 本地 cache
try:
cm._cache = {}
except Exception:
pass
# 2) 清 Redis 缓存 keyHash: trading_config
try:
redis_client = getattr(cm, "_redis_client", None)
redis_connected = getattr(cm, "_redis_connected", False)
if redis_client is not None and redis_connected:
try:
redis_client.ping()
except Exception:
redis_connected = False
if redis_client is not None and redis_connected:
try:
redis_client.delete("trading_config")
deleted_keys.append("trading_config")
except Exception as e:
logger.warning(f"删除 Redis key trading_config 失败: {e}")
# 可选:实时推荐缓存(如果存在)
try:
redis_client.delete("recommendations:realtime")
deleted_keys.append("recommendations:realtime")
except Exception:
pass
except Exception as e:
logger.warning(f"清 Redis 缓存失败: {e}")
# 3) 立刻从 DB 回灌到 Redis避免 trading_system 读到空)
try:
cm.reload()
except Exception as e:
logger.warning(f"回灌配置到 Redis 失败仍可能使用DB/本地cache: {e}")
return {
"message": "缓存已清理并回灌",
"deleted_keys": deleted_keys,
"note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"清理缓存失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/trading/status")
async def trading_status(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
raw = _run_supervisorctl(["status", program])
running, pid, state = _parse_supervisor_status(raw)
return {
"mode": "supervisor",
"program": program,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO",
)
@router.post("/trading/start")
async def trading_start(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out = _run_supervisorctl(["start", program])
raw = _run_supervisorctl(["status", program])
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已启动supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}")
@router.post("/trading/stop")
async def trading_stop(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out = _run_supervisorctl(["stop", program])
raw = _run_supervisorctl(["status", program])
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已停止supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}")
@router.post("/trading/restart")
async def trading_restart(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out = _run_supervisorctl(["restart", program])
raw = _run_supervisorctl(["status", program])
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已重启supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}")