auto_trade_sys/backend/api/routes/system.py
薇薇安 50026fb048 a
2026-01-18 20:49:47 +08:00

676 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import subprocess
import json
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
# 路由统一挂在 /api/system 下,前端直接调用 /api/system/...
router = APIRouter(prefix="/api/system")
LOG_GROUPS = ("error", "warning", "info")
def _logs_prefix() -> str:
return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs")
def _logs_key_for_group(group: str) -> str:
group = (group or "error").strip().lower()
# 兼容旧配置REDIS_LOG_LIST_KEY 仅用于 error
if group == "error":
legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip()
if legacy:
return legacy
env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip()
if env_key:
return env_key
return f"{_logs_prefix()}:{group}"
def _logs_config_key() -> str:
return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config")
def _logs_stats_prefix() -> str:
return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added")
def _beijing_yyyymmdd() -> str:
from datetime import datetime, timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(tz=beijing_tz).strftime("%Y%m%d")
def _default_logs_config() -> Dict[str, Any]:
return {
"max_len": {"error": 2000, "warning": 2000, "info": 2000},
"enabled": {"error": True, "warning": True, "info": True},
"dedupe_consecutive": True,
"include_debug_in_info": False,
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
"config_key": _logs_config_key(),
"stats_prefix": _logs_stats_prefix(),
}
def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]:
cfg = defaults
for g in LOG_GROUPS:
v = redis_hash.get(f"max_len:{g}")
if v is not None:
try:
n = int(str(v).strip())
if n > 0:
cfg["max_len"][g] = n
except Exception:
pass
ev = redis_hash.get(f"enabled:{g}")
if ev is not None:
s = str(ev).strip().lower()
cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on")
for k in ("dedupe_consecutive", "include_debug_in_info"):
vv = redis_hash.get(k)
if vv is not None:
s = str(vv).strip().lower()
cfg[k] = s in ("1", "true", "yes", "y", "on")
return cfg
def _read_logs_config(client) -> Dict[str, Any]:
defaults = _default_logs_config()
try:
raw = client.hgetall(_logs_config_key()) or {}
return _merge_logs_config(defaults, raw)
except Exception:
return defaults
def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]:
mapping: Dict[str, str] = {}
for g in LOG_GROUPS:
mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g]))
mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0"
mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0"
mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0"
pipe = client.pipeline()
pipe.hset(_logs_config_key(), mapping=mapping)
for g in LOG_GROUPS:
key = _logs_key_for_group(g)
max_len = int(cfg["max_len"][g])
if max_len > 0:
pipe.ltrim(key, 0, max_len - 1)
pipe.execute()
return cfg
class LogsConfigUpdate(BaseModel):
max_len: Optional[Dict[str, int]] = None
enabled: Optional[Dict[str, bool]] = None
dedupe_consecutive: Optional[bool] = None
include_debug_in_info: Optional[bool] = None
def _get_redis_client_for_logs():
"""
获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。
返回redis.Redis 或 None
"""
# 1) 复用 config_manager避免重复连接
try:
import config_manager # backend/config_manager.py已负责加载 .env
cm = getattr(config_manager, "config_manager", None)
if cm is not None:
redis_client = getattr(cm, "_redis_client", None)
redis_connected = getattr(cm, "_redis_connected", False)
if redis_client is not None and redis_connected:
try:
redis_client.ping()
return redis_client
except Exception:
pass
except Exception:
pass
# 2) 自行创建
try:
import redis # type: ignore
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true"
redis_username = os.getenv("REDIS_USERNAME", None)
redis_password = os.getenv("REDIS_PASSWORD", None)
ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required")
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
kwargs: Dict[str, Any] = {
"decode_responses": True,
"username": redis_username,
"password": redis_password,
"socket_connect_timeout": 1,
"socket_timeout": 1,
}
if redis_url.startswith("rediss://") or redis_use_tls:
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
if ssl_ca_certs:
kwargs["ssl_ca_certs"] = ssl_ca_certs
if ssl_cert_reqs == "none":
kwargs["ssl_check_hostname"] = False
elif ssl_cert_reqs == "required":
kwargs["ssl_check_hostname"] = True
else:
kwargs["ssl_check_hostname"] = False
client = redis.from_url(redis_url, **kwargs)
client.ping()
return client
except Exception:
return None
@router.get("/logs")
async def get_logs(
limit: int = 200,
group: str = "error",
service: Optional[str] = None,
level: Optional[str] = None,
x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"),
) -> Dict[str, Any]:
"""
从 Redis List 读取最新日志(默认 group=error -> ats:logs:error
参数:
- limit: 返回条数(最大 2000
- group: 日志分组error / warning / info
- service: 过滤backend / trading_system
- level: 过滤ERROR / CRITICAL ...
"""
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
if limit <= 0:
limit = 200
if limit > 2000:
limit = 2000
group = (group or "error").strip().lower()
if group not in LOG_GROUPS:
raise HTTPException(status_code=400, detail=f"非法 group{group}(可选:{', '.join(LOG_GROUPS)}")
list_key = _logs_key_for_group(group)
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志")
try:
raw_items = client.lrange(list_key, 0, limit - 1)
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}")
items: list[Dict[str, Any]] = []
for raw in raw_items or []:
try:
obj = raw
if isinstance(raw, bytes):
obj = raw.decode("utf-8", errors="ignore")
if isinstance(obj, str):
parsed = json.loads(obj)
else:
continue
if not isinstance(parsed, dict):
continue
if service and str(parsed.get("service")) != service:
continue
if level and str(parsed.get("level")) != level:
continue
items.append(parsed)
except Exception:
continue
return {
"group": group,
"key": list_key,
"count": len(items),
"items": items,
}
@router.get("/logs/overview")
async def logs_overview(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览")
cfg = _read_logs_config(client)
day = _beijing_yyyymmdd()
stats_prefix = _logs_stats_prefix()
pipe = client.pipeline()
for g in LOG_GROUPS:
pipe.llen(_logs_key_for_group(g))
for g in LOG_GROUPS:
pipe.get(f"{stats_prefix}:{day}:{g}")
res = pipe.execute()
llen_vals = res[: len(LOG_GROUPS)]
added_vals = res[len(LOG_GROUPS) :]
llen: Dict[str, int] = {}
added_today: Dict[str, int] = {}
for i, g in enumerate(LOG_GROUPS):
try:
llen[g] = int(llen_vals[i] or 0)
except Exception:
llen[g] = 0
try:
added_today[g] = int(added_vals[i] or 0)
except Exception:
added_today[g] = 0
return {
"config": cfg,
"stats": {
"day": day,
"llen": llen,
"added_today": added_today,
},
}
@router.put("/logs/config")
async def update_logs_config(
payload: LogsConfigUpdate,
x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"),
) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置")
cfg = _read_logs_config(client)
if payload.max_len:
for g, v in payload.max_len.items():
gg = (g or "").strip().lower()
if gg not in LOG_GROUPS:
continue
try:
n = int(v)
if n < 100:
n = 100
if n > 20000:
n = 20000
cfg["max_len"][gg] = n
except Exception:
continue
if payload.enabled:
for g, v in payload.enabled.items():
gg = (g or "").strip().lower()
if gg not in LOG_GROUPS:
continue
cfg["enabled"][gg] = bool(v)
if payload.dedupe_consecutive is not None:
cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive)
if payload.include_debug_in_info is not None:
cfg["include_debug_in_info"] = bool(payload.include_debug_in_info)
cfg = _write_logs_config_and_trim(client, cfg)
return {"message": "ok", "config": cfg}
def _require_admin(token: Optional[str], provided: Optional[str]) -> None:
"""
可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN则要求请求携带 X-Admin-Token。
生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。
"""
if not token:
return
if not provided or provided != token:
raise HTTPException(status_code=401, detail="Unauthorized")
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip()
use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true"
# 如果没显式配置 SUPERVISOR_CONF就尝试自动探测常见路径宝塔/系统)
if not supervisor_conf:
candidates = [
"/www/server/panel/plugin/supervisor/supervisord.conf",
"/www/server/panel/plugin/supervisor/supervisor.conf",
"/etc/supervisor/supervisord.conf",
"/etc/supervisord.conf",
]
for p in candidates:
try:
if Path(p).exists():
supervisor_conf = p
break
except Exception:
continue
cmd: list[str] = []
if use_sudo:
# 需要你在 sudoers 配置 NOPASSWDsudo -n 才不会卡住)
cmd += ["sudo", "-n"]
cmd += [supervisorctl_path]
if supervisor_conf:
cmd += ["-c", supervisor_conf]
cmd += args
return cmd
def _run_supervisorctl(args: list[str]) -> str:
cmd = _build_supervisorctl_cmd(args)
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired:
raise RuntimeError("supervisorctl 超时10s")
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
combined = "\n".join([s for s in [out, err] if s]).strip()
if res.returncode != 0:
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
return combined or out
def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
"""
典型输出:
- auto_sys RUNNING pid 1234, uptime 0:10:00
- auto_sys STOPPED Not started
"""
if "RUNNING" in raw:
m = re.search(r"\bpid\s+(\d+)\b", raw)
pid = int(m.group(1)) if m else None
return True, pid, "RUNNING"
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
if state in raw:
return False, None, state
return False, None, "UNKNOWN"
def _get_program_name() -> str:
# 你给的宝塔配置是 [program:auto_sys]
return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys"
def _select_best_process_name(program: str, status_all_raw: str) -> Optional[str]:
"""
从 `supervisorctl status` 全量输出中,找到最匹配的真实进程名。
兼容 supervisor 的 group:process 格式例如auto_sys:auto_sys_00
"""
if not status_all_raw:
return None
lines = [ln.strip() for ln in status_all_raw.splitlines() if ln.strip()]
names: list[str] = []
for ln in lines:
name = ln.split(None, 1)[0].strip()
if name:
names.append(name)
# 精确优先program / program_00 / program:program_00
preferred = [program, f"{program}_00", f"{program}:{program}_00"]
for cand in preferred:
if cand in names:
return cand
# 次优:任意以 program_ 开头
for name in names:
if name.startswith(program + "_"):
return name
# 次优:任意以 program: 开头
for name in names:
if name.startswith(program + ":"):
return name
return None
def _status_with_fallback(program: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
- 优先 `status <program>`
- 若 no such process返回全量 status并尝试解析真实 name例如 auto_sys:auto_sys_00
返回:(raw, resolved_name, status_all)
"""
try:
raw = _run_supervisorctl(["status", program])
return raw, program, None
except Exception as e:
msg = str(e).lower()
if "no such process" not in msg:
raise
status_all = _run_supervisorctl(["status"])
resolved = _select_best_process_name(program, status_all)
if resolved:
try:
raw = _run_supervisorctl(["status", resolved])
return raw, resolved, status_all
except Exception:
# 兜底:至少把全量输出返回,方便你确认真实进程名
return status_all, None, status_all
return status_all, None, status_all
def _action_with_fallback(action: str, program: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
对 start/stop/restart 做兜底:如果 program 不存在,尝试解析真实 name 再执行。
返回:(output, resolved_name, status_all)
"""
try:
out = _run_supervisorctl([action, program])
return out, program, None
except Exception as e:
msg = str(e).lower()
if "no such process" not in msg:
raise
status_all = _run_supervisorctl(["status"])
resolved = _select_best_process_name(program, status_all)
if not resolved:
# 没找到就把全量输出带上,方便定位
raise RuntimeError(f"no such process: {program}. 当前 supervisor 进程列表:\n{status_all}")
out = _run_supervisorctl([action, resolved])
return out, resolved, status_all
@router.post("/clear-cache")
async def clear_cache(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
"""
清理配置缓存Redis Hash: trading_config并从数据库回灌到 Redis。
"""
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
try:
import config_manager
cm = getattr(config_manager, "config_manager", None)
if cm is None:
raise HTTPException(status_code=500, detail="config_manager 未初始化")
deleted_keys: list[str] = []
# 1) 清 backend 本地 cache
try:
cm._cache = {}
except Exception:
pass
# 2) 清 Redis 缓存 keyHash: trading_config
try:
redis_client = getattr(cm, "_redis_client", None)
redis_connected = getattr(cm, "_redis_connected", False)
if redis_client is not None and redis_connected:
try:
redis_client.ping()
except Exception:
redis_connected = False
if redis_client is not None and redis_connected:
try:
redis_client.delete("trading_config")
deleted_keys.append("trading_config")
except Exception as e:
logger.warning(f"删除 Redis key trading_config 失败: {e}")
# 可选:实时推荐缓存(如果存在)
try:
redis_client.delete("recommendations:realtime")
deleted_keys.append("recommendations:realtime")
except Exception:
pass
except Exception as e:
logger.warning(f"清 Redis 缓存失败: {e}")
# 3) 立刻从 DB 回灌到 Redis避免 trading_system 读到空)
try:
cm.reload()
except Exception as e:
logger.warning(f"回灌配置到 Redis 失败仍可能使用DB/本地cache: {e}")
return {
"message": "缓存已清理并回灌",
"deleted_keys": deleted_keys,
"note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"清理缓存失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/trading/status")
async def trading_status(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
raw, resolved_name, status_all = _status_with_fallback(program)
running, pid, state = _parse_supervisor_status(raw)
return {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all,
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO",
)
@router.post("/trading/start")
async def trading_start(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out, resolved_name, status_all = _action_with_fallback("start", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已启动supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}")
@router.post("/trading/stop")
async def trading_stop(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out, resolved_name, status_all = _action_with_fallback("stop", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已停止supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}")
@router.post("/trading/restart")
async def trading_restart(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]:
_require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token)
program = _get_program_name()
try:
out, resolved_name, status_all = _action_with_fallback("restart", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已重启supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}")