import os import re import subprocess import json from pathlib import Path from typing import Any, Dict, Optional, Tuple from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel import logging logger = logging.getLogger(__name__) # 路由统一挂在 /api/system 下,前端直接调用 /api/system/... router = APIRouter(prefix="/api/system") LOG_GROUPS = ("error", "warning", "info") def _logs_prefix() -> str: return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs") def _logs_key_for_group(group: str) -> str: group = (group or "error").strip().lower() # 兼容旧配置:REDIS_LOG_LIST_KEY 仅用于 error if group == "error": legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip() if legacy: return legacy env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip() if env_key: return env_key return f"{_logs_prefix()}:{group}" def _logs_config_key() -> str: return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config") def _logs_stats_prefix() -> str: return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added") def _beijing_yyyymmdd() -> str: from datetime import datetime, timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) return datetime.now(tz=beijing_tz).strftime("%Y%m%d") def _default_logs_config() -> Dict[str, Any]: return { "max_len": {"error": 2000, "warning": 2000, "info": 2000}, "enabled": {"error": True, "warning": True, "info": True}, "dedupe_consecutive": True, "include_debug_in_info": False, "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, "config_key": _logs_config_key(), "stats_prefix": _logs_stats_prefix(), } def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]: cfg = defaults for g in LOG_GROUPS: v = redis_hash.get(f"max_len:{g}") if v is not None: try: n = int(str(v).strip()) if n > 0: cfg["max_len"][g] = n except Exception: pass ev = redis_hash.get(f"enabled:{g}") if ev is not None: s = str(ev).strip().lower() cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on") for k in ("dedupe_consecutive", "include_debug_in_info"): vv = redis_hash.get(k) if vv is not None: s = str(vv).strip().lower() cfg[k] = s in ("1", "true", "yes", "y", "on") return cfg def _read_logs_config(client) -> Dict[str, Any]: defaults = _default_logs_config() try: raw = client.hgetall(_logs_config_key()) or {} return _merge_logs_config(defaults, raw) except Exception: return defaults def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]: mapping: Dict[str, str] = {} for g in LOG_GROUPS: mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g])) mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0" mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0" mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0" pipe = client.pipeline() pipe.hset(_logs_config_key(), mapping=mapping) for g in LOG_GROUPS: key = _logs_key_for_group(g) max_len = int(cfg["max_len"][g]) if max_len > 0: pipe.ltrim(key, 0, max_len - 1) pipe.execute() return cfg class LogsConfigUpdate(BaseModel): max_len: Optional[Dict[str, int]] = None enabled: Optional[Dict[str, bool]] = None dedupe_consecutive: Optional[bool] = None include_debug_in_info: Optional[bool] = None def _beijing_time_str() -> str: from datetime import datetime, timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S") @router.post("/logs/test-write") async def logs_test_write( x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), ) -> Dict[str, Any]: """ 写入 3 条测试日志到 Redis(error/warning/info),用于验证“是否写入到同一台 Redis、同一组 key”。 """ _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法写入测试日志") cfg = _read_logs_config(client) now_ms = int(__import__("time").time() * 1000) time_str = _beijing_time_str() entries = { "error": {"level": "ERROR", "message": f"[TEST] backend 写入 error 测试日志 @ {time_str}"}, "warning": {"level": "WARNING", "message": f"[TEST] backend 写入 warning 测试日志 @ {time_str}"}, "info": {"level": "INFO", "message": f"[TEST] backend 写入 info 测试日志 @ {time_str}"}, } day = _beijing_yyyymmdd() stats_prefix = _logs_stats_prefix() pipe = client.pipeline() for g in LOG_GROUPS: key = _logs_key_for_group(g) max_len = int(cfg["max_len"][g]) entry = { "ts": now_ms, "time": time_str, "service": "backend", "level": entries[g]["level"], "logger": "api.routes.system", "message": entries[g]["message"], "hostname": os.getenv("HOSTNAME", ""), "signature": f"backend|{entries[g]['level']}|test|{entries[g]['message']}", "count": 1, } pipe.lpush(key, json.dumps(entry, ensure_ascii=False)) if max_len > 0: pipe.ltrim(key, 0, max_len - 1) pipe.incr(f"{stats_prefix}:{day}:{g}", 1) pipe.expire(f"{stats_prefix}:{day}:{g}", 14 * 24 * 3600) pipe.execute() # 返回写入后的 LLEN,便于你确认 pipe2 = client.pipeline() for g in LOG_GROUPS: pipe2.llen(_logs_key_for_group(g)) llens = pipe2.execute() return { "message": "ok", "keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS}, "llen": {g: int(llens[i] or 0) for i, g in enumerate(LOG_GROUPS)}, "note": "如果你在前端仍看不到,说明前端请求的后端实例/Redis key/筛选条件不一致。", } def _get_redis_client_for_logs(): """ 获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。 返回:redis.Redis 或 None """ # 1) 复用 config_manager(避免重复连接) try: import config_manager # backend/config_manager.py(已负责加载 .env) cm = getattr(config_manager, "config_manager", None) if cm is not None: redis_client = getattr(cm, "_redis_client", None) redis_connected = getattr(cm, "_redis_connected", False) if redis_client is not None and redis_connected: try: redis_client.ping() return redis_client except Exception: pass except Exception: pass # 2) 自行创建 try: import redis # type: ignore redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true" redis_username = os.getenv("REDIS_USERNAME", None) redis_password = os.getenv("REDIS_PASSWORD", None) ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required") ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None) kwargs: Dict[str, Any] = { "decode_responses": True, "username": redis_username, "password": redis_password, "socket_connect_timeout": 1, "socket_timeout": 1, } if redis_url.startswith("rediss://") or redis_use_tls: kwargs["ssl_cert_reqs"] = ssl_cert_reqs if ssl_ca_certs: kwargs["ssl_ca_certs"] = ssl_ca_certs if ssl_cert_reqs == "none": kwargs["ssl_check_hostname"] = False elif ssl_cert_reqs == "required": kwargs["ssl_check_hostname"] = True else: kwargs["ssl_check_hostname"] = False client = redis.from_url(redis_url, **kwargs) client.ping() return client except Exception: return None @router.get("/logs") async def get_logs( limit: int = 200, group: str = "error", start: int = 0, service: Optional[str] = None, level: Optional[str] = None, x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), ) -> Dict[str, Any]: """ 从 Redis List 读取最新日志(默认 group=error -> ats:logs:error)。 参数: - limit: 返回条数(最大 2000) - group: 日志分组(error / warning / info) - service: 过滤(backend / trading_system) - level: 过滤(ERROR / CRITICAL ...) """ _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) if limit <= 0: limit = 200 if limit > 20000: limit = 20000 if start < 0: start = 0 group = (group or "error").strip().lower() if group not in LOG_GROUPS: raise HTTPException(status_code=400, detail=f"非法 group:{group}(可选:{', '.join(LOG_GROUPS)})") list_key = _logs_key_for_group(group) client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志") try: llen_total = int(client.llen(list_key) or 0) except Exception as e: raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}") if llen_total <= 0: return { "group": group, "key": list_key, "start": start, "limit": limit, "llen_total": 0, "next_start": start, "has_more": False, "count": 0, "items": [], } # 分页扫描:为了支持 service/level 过滤,这里会向后多取一些直到凑够 limit 或到末尾 # 保护:最多扫描 limit*10 条,避免过滤太严格导致无限扫描 max_scan = min(llen_total, start + limit * 10) pos = start scanned = 0 items: list[Dict[str, Any]] = [] try: while len(items) < limit and pos < llen_total and pos < max_scan: chunk_size = min(500, limit, max_scan - pos) end = pos + chunk_size - 1 raw_batch = client.lrange(list_key, pos, end) scanned += len(raw_batch or []) for raw in raw_batch or []: try: obj = raw if isinstance(raw, bytes): obj = raw.decode("utf-8", errors="ignore") if not isinstance(obj, str): continue parsed = json.loads(obj) if not isinstance(parsed, dict): continue if service and str(parsed.get("service")) != service: continue if level and str(parsed.get("level")) != level: continue items.append(parsed) if len(items) >= limit: break except Exception: continue pos = end + 1 except Exception as e: raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}") return { "group": group, "key": list_key, "start": start, "limit": limit, "llen_total": llen_total, "scanned": scanned, "next_start": pos, "has_more": pos < llen_total, "count": len(items), "items": items, } @router.get("/logs/overview") async def logs_overview(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览") cfg = _read_logs_config(client) day = _beijing_yyyymmdd() stats_prefix = _logs_stats_prefix() pipe = client.pipeline() for g in LOG_GROUPS: pipe.llen(_logs_key_for_group(g)) for g in LOG_GROUPS: pipe.get(f"{stats_prefix}:{day}:{g}") res = pipe.execute() llen_vals = res[: len(LOG_GROUPS)] added_vals = res[len(LOG_GROUPS) :] llen: Dict[str, int] = {} added_today: Dict[str, int] = {} for i, g in enumerate(LOG_GROUPS): try: llen[g] = int(llen_vals[i] or 0) except Exception: llen[g] = 0 try: added_today[g] = int(added_vals[i] or 0) except Exception: added_today[g] = 0 return { "config": cfg, "stats": { "day": day, "llen": llen, "added_today": added_today, }, } @router.put("/logs/config") async def update_logs_config( payload: LogsConfigUpdate, x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), ) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) client = _get_redis_client_for_logs() if client is None: raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置") cfg = _read_logs_config(client) if payload.max_len: for g, v in payload.max_len.items(): gg = (g or "").strip().lower() if gg not in LOG_GROUPS: continue try: n = int(v) if n < 100: n = 100 if n > 20000: n = 20000 cfg["max_len"][gg] = n except Exception: continue if payload.enabled: for g, v in payload.enabled.items(): gg = (g or "").strip().lower() if gg not in LOG_GROUPS: continue cfg["enabled"][gg] = bool(v) if payload.dedupe_consecutive is not None: cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive) if payload.include_debug_in_info is not None: cfg["include_debug_in_info"] = bool(payload.include_debug_in_info) cfg = _write_logs_config_and_trim(client, cfg) return {"message": "ok", "config": cfg} def _require_admin(token: Optional[str], provided: Optional[str]) -> None: """ 可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN,则要求请求携带 X-Admin-Token。 生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。 """ if not token: return if not provided or provided != token: raise HTTPException(status_code=401, detail="Unauthorized") def _build_supervisorctl_cmd(args: list[str]) -> list[str]: supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl") supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip() use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true" # 如果没显式配置 SUPERVISOR_CONF,就尝试自动探测常见路径(宝塔/系统) if not supervisor_conf: candidates = [ "/www/server/panel/plugin/supervisor/supervisord.conf", "/www/server/panel/plugin/supervisor/supervisor.conf", "/etc/supervisor/supervisord.conf", "/etc/supervisord.conf", ] for p in candidates: try: if Path(p).exists(): supervisor_conf = p break except Exception: continue cmd: list[str] = [] if use_sudo: # 需要你在 sudoers 配置 NOPASSWD(sudo -n 才不会卡住) cmd += ["sudo", "-n"] cmd += [supervisorctl_path] if supervisor_conf: cmd += ["-c", supervisor_conf] cmd += args return cmd def _run_supervisorctl(args: list[str]) -> str: cmd = _build_supervisorctl_cmd(args) try: res = subprocess.run(cmd, capture_output=True, text=True, timeout=10) except subprocess.TimeoutExpired: raise RuntimeError("supervisorctl 超时(10s)") out = (res.stdout or "").strip() err = (res.stderr or "").strip() combined = "\n".join([s for s in [out, err] if s]).strip() if res.returncode != 0: raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})") return combined or out def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]: """ 典型输出: - auto_sys RUNNING pid 1234, uptime 0:10:00 - auto_sys STOPPED Not started """ if "RUNNING" in raw: m = re.search(r"\bpid\s+(\d+)\b", raw) pid = int(m.group(1)) if m else None return True, pid, "RUNNING" for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]: if state in raw: return False, None, state return False, None, "UNKNOWN" def _get_program_name() -> str: # 你给的宝塔配置是 [program:auto_sys] return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys" def _select_best_process_name(program: str, status_all_raw: str) -> Optional[str]: """ 从 `supervisorctl status` 全量输出中,找到最匹配的真实进程名。 兼容 supervisor 的 group:process 格式,例如:auto_sys:auto_sys_00 """ if not status_all_raw: return None lines = [ln.strip() for ln in status_all_raw.splitlines() if ln.strip()] names: list[str] = [] for ln in lines: name = ln.split(None, 1)[0].strip() if name: names.append(name) # 精确优先:program / program_00 / program:program_00 preferred = [program, f"{program}_00", f"{program}:{program}_00"] for cand in preferred: if cand in names: return cand # 次优:任意以 program_ 开头 for name in names: if name.startswith(program + "_"): return name # 次优:任意以 program: 开头 for name in names: if name.startswith(program + ":"): return name return None def _status_with_fallback(program: str) -> Tuple[str, Optional[str], Optional[str]]: """ - 优先 `status ` - 若 no such process:返回全量 status,并尝试解析真实 name(例如 auto_sys:auto_sys_00) 返回:(raw, resolved_name, status_all) """ try: raw = _run_supervisorctl(["status", program]) return raw, program, None except Exception as e: msg = str(e).lower() if "no such process" not in msg: raise status_all = _run_supervisorctl(["status"]) resolved = _select_best_process_name(program, status_all) if resolved: try: raw = _run_supervisorctl(["status", resolved]) return raw, resolved, status_all except Exception: # 兜底:至少把全量输出返回,方便你确认真实进程名 return status_all, None, status_all return status_all, None, status_all def _action_with_fallback(action: str, program: str) -> Tuple[str, Optional[str], Optional[str]]: """ 对 start/stop/restart 做兜底:如果 program 不存在,尝试解析真实 name 再执行。 返回:(output, resolved_name, status_all) """ try: out = _run_supervisorctl([action, program]) return out, program, None except Exception as e: msg = str(e).lower() if "no such process" not in msg: raise status_all = _run_supervisorctl(["status"]) resolved = _select_best_process_name(program, status_all) if not resolved: # 没找到就把全量输出带上,方便定位 raise RuntimeError(f"no such process: {program}. 当前 supervisor 进程列表:\n{status_all}") out = _run_supervisorctl([action, resolved]) return out, resolved, status_all @router.post("/clear-cache") async def clear_cache(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: """ 清理配置缓存(Redis Hash: trading_config),并从数据库回灌到 Redis。 """ _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) try: import config_manager cm = getattr(config_manager, "config_manager", None) if cm is None: raise HTTPException(status_code=500, detail="config_manager 未初始化") deleted_keys: list[str] = [] # 1) 清 backend 本地 cache try: cm._cache = {} except Exception: pass # 2) 清 Redis 缓存 key(Hash: trading_config) try: redis_client = getattr(cm, "_redis_client", None) redis_connected = getattr(cm, "_redis_connected", False) if redis_client is not None and redis_connected: try: redis_client.ping() except Exception: redis_connected = False if redis_client is not None and redis_connected: try: redis_client.delete("trading_config") deleted_keys.append("trading_config") except Exception as e: logger.warning(f"删除 Redis key trading_config 失败: {e}") # 可选:实时推荐缓存(如果存在) try: redis_client.delete("recommendations:realtime") deleted_keys.append("recommendations:realtime") except Exception: pass except Exception as e: logger.warning(f"清 Redis 缓存失败: {e}") # 3) 立刻从 DB 回灌到 Redis(避免 trading_system 读到空) try: cm.reload() except Exception as e: logger.warning(f"回灌配置到 Redis 失败(仍可能使用DB/本地cache): {e}") return { "message": "缓存已清理并回灌", "deleted_keys": deleted_keys, "note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。", } except HTTPException: raise except Exception as e: logger.error(f"清理缓存失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/trading/status") async def trading_status(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) program = _get_program_name() try: raw, resolved_name, status_all = _status_with_fallback(program) running, pid, state = _parse_supervisor_status(raw) return { "mode": "supervisor", "program": program, "resolved_name": resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all, } except Exception as e: raise HTTPException( status_code=500, detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO", ) @router.post("/trading/start") async def trading_start(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) program = _get_program_name() try: out, resolved_name, status_all = _action_with_fallback("start", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) return { "message": "交易系统已启动(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}") @router.post("/trading/stop") async def trading_stop(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) program = _get_program_name() try: out, resolved_name, status_all = _action_with_fallback("stop", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) return { "message": "交易系统已停止(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}") @router.post("/trading/restart") async def trading_restart(x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token")) -> Dict[str, Any]: _require_admin(os.getenv("SYSTEM_CONTROL_TOKEN", "").strip(), x_admin_token) program = _get_program_name() try: out, resolved_name, status_all = _action_with_fallback("restart", program) raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program) running, pid, state = _parse_supervisor_status(raw) return { "message": "交易系统已重启(supervisor)", "output": out, "status": { "mode": "supervisor", "program": program, "resolved_name": resolved_name2 or resolved_name, "running": running, "pid": pid, "state": state, "raw": raw, "status_all": status_all2 or status_all, }, } except Exception as e: raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}")