""" Supervisor 多账号托管(宝塔插件兼容) 目标: - 根据 account_id 自动生成一个 supervisor program 配置文件(.ini) - 自动定位 supervisord.conf 的 include 目录(尽量不要求你手填路径) - 提供 supervisorctl 的常用调用封装(reread/update/status/start/stop/restart) 重要说明: - 本模块只写入“程序配置文件”,不包含任何 API Key/Secret - trading_system 进程通过 ATS_ACCOUNT_ID 选择自己的账号配置 """ from __future__ import annotations import os import re import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Optional, Tuple DEFAULT_CANDIDATE_CONFS = [ "/www/server/panel/plugin/supervisor/supervisord.conf", "/www/server/panel/plugin/supervisor/supervisor.conf", "/etc/supervisor/supervisord.conf", "/etc/supervisord.conf", ] def _get_project_root() -> Path: # backend/api/supervisor_account.py -> api -> backend -> project_root # 期望得到:(例如 /www/wwwroot/autosys_new) return Path(__file__).resolve().parents[2] def _detect_supervisor_conf_path() -> Optional[Path]: p = (os.getenv("SUPERVISOR_CONF") or "").strip() if p: pp = Path(p) return pp if pp.exists() else pp # 允许不存在时也返回,便于报错信息 for cand in DEFAULT_CANDIDATE_CONFS: try: cp = Path(cand) if cp.exists(): return cp except Exception: continue return None def _parse_include_dir_from_conf(conf_path: Path) -> Optional[Path]: """ 尝试解析 supervisord.conf 的 [include] files=... 目录。 常见格式: [include] files = /path/to/conf.d/*.ini """ try: text = conf_path.read_text(encoding="utf-8", errors="ignore") except Exception: return None in_include = False for raw in text.splitlines(): line = raw.strip() if not line or line.startswith(";") or line.startswith("#"): continue if re.match(r"^\[include\]\s*$", line, flags=re.I): in_include = True continue if in_include and line.startswith("[") and line.endswith("]"): break if not in_include: continue m = re.match(r"^files\s*=\s*(.+)$", line, flags=re.I) if not m: continue val = (m.group(1) or "").strip().strip('"').strip("'") if not val: continue # 只取第一个 pattern(即使写了多个用空格分隔) first = val.split()[0] p = Path(first) if not p.is_absolute(): p = (conf_path.parent / p).resolve() return p.parent return None def get_supervisor_program_dir() -> Path: """ 获取 supervisor program 配置目录(优先级): 1) SUPERVISOR_PROGRAM_DIR 2) 从 supervisord.conf 的 [include] files= 解析 3) 兜底:/www/server/panel/plugin/supervisor(你当前看到的目录) """ env_dir = (os.getenv("SUPERVISOR_PROGRAM_DIR") or "").strip() if env_dir: return Path(env_dir) conf = _detect_supervisor_conf_path() if conf and conf.exists(): inc = _parse_include_dir_from_conf(conf) if inc: return inc return Path("/www/server/panel/plugin/supervisor") def program_name_for_account(account_id: int) -> str: tmpl = (os.getenv("SUPERVISOR_TRADING_PROGRAM_TEMPLATE") or "auto_sys_acc{account_id}").strip() try: return tmpl.format(account_id=int(account_id)) except Exception: return f"auto_sys_acc{int(account_id)}" def ini_filename_for_program(program_name: str) -> str: safe = re.sub(r"[^a-zA-Z0-9_\-:.]+", "_", program_name).strip("_") or "auto_sys" return f"{safe}.ini" def render_program_ini(account_id: int, program_name: str) -> str: project_root = _get_project_root() python_bin = sys.executable # 使用 backend 当前虚拟环境,通常已包含 trading_system 依赖 # 日志目录可通过环境变量覆盖 log_dir = Path(os.getenv("TRADING_LOG_DIR", str(project_root / "logs"))).expanduser() # supervisor 在 reread/update 时会校验 logfile 目录是否存在;这里提前创建避免 CANT_REREAD try: log_dir.mkdir(parents=True, exist_ok=True) except Exception: # 最后兜底到 /tmp,确保一定存在 log_dir = Path("/tmp") / "autosys_logs" log_dir.mkdir(parents=True, exist_ok=True) out_log = log_dir / f"trading_{int(account_id)}.out.log" err_log = log_dir / f"trading_{int(account_id)}.err.log" # 默认不自动启动,避免“创建账号=立刻下单” autostart = (os.getenv("TRADING_AUTOSTART_DEFAULT", "false") or "false").lower() == "true" return "\n".join( [ f"[program:{program_name}]", f"directory={project_root}", f"command={python_bin} -m trading_system.main", "autostart=" + ("true" if autostart else "false"), "autorestart=true", "startsecs=3", "stopasgroup=true", "killasgroup=true", "", f'environment=ATS_ACCOUNT_ID="{int(account_id)}",PYTHONUNBUFFERED="1"', "", f"stdout_logfile={out_log}", f"stderr_logfile={err_log}", "", ] ) def write_program_ini(program_dir: Path, filename: str, content: str) -> Path: program_dir.mkdir(parents=True, exist_ok=True) target = program_dir / filename tmp = program_dir / (filename + ".tmp") tmp.write_text(content, encoding="utf-8") os.replace(str(tmp), str(target)) return target def _build_supervisorctl_cmd(args: list[str]) -> list[str]: supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl") supervisor_conf = (os.getenv("SUPERVISOR_CONF") or "").strip() use_sudo = (os.getenv("SUPERVISOR_USE_SUDO", "false") or "false").lower() == "true" if not supervisor_conf: conf = _detect_supervisor_conf_path() supervisor_conf = str(conf) if conf else "" cmd: list[str] = [] if use_sudo: cmd += ["sudo", "-n"] cmd += [supervisorctl_path] if supervisor_conf: cmd += ["-c", supervisor_conf] cmd += args return cmd def run_supervisorctl(args: list[str], timeout_sec: int = 10) -> str: cmd = _build_supervisorctl_cmd(args) try: res = subprocess.run(cmd, capture_output=True, text=True, timeout=int(timeout_sec)) except subprocess.TimeoutExpired: raise RuntimeError("supervisorctl 超时") out = (res.stdout or "").strip() err = (res.stderr or "").strip() combined = "\n".join([s for s in [out, err] if s]).strip() # supervisorctl: status 在存在 STOPPED 等进程时可能返回 exit=3,但输出仍然有效 ok_rc = {0} if args and args[0] == "status": ok_rc.add(3) if res.returncode not in ok_rc: raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})") return combined or out def parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]: if "RUNNING" in raw: m = re.search(r"\bpid\s+(\d+)\b", raw) pid = int(m.group(1)) if m else None return True, pid, "RUNNING" for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]: if state in raw: return False, None, state return False, None, "UNKNOWN" @dataclass class EnsureProgramResult: ok: bool program: str ini_path: str program_dir: str supervisor_conf: str reread: str = "" update: str = "" error: str = "" def ensure_account_program(account_id: int) -> EnsureProgramResult: aid = int(account_id) program = program_name_for_account(aid) program_dir = get_supervisor_program_dir() ini_name = ini_filename_for_program(program) ini_text = render_program_ini(aid, program) conf = _detect_supervisor_conf_path() conf_s = str(conf) if conf else (os.getenv("SUPERVISOR_CONF") or "") try: path = write_program_ini(program_dir, ini_name, ini_text) reread_out = "" update_out = "" try: reread_out = run_supervisorctl(["reread"]) update_out = run_supervisorctl(["update"]) except Exception as e: # 写文件成功但 supervisorctl 失败也要给出可诊断信息 return EnsureProgramResult( ok=False, program=program, ini_path=str(path), program_dir=str(program_dir), supervisor_conf=conf_s, reread=reread_out, update=update_out, error=f"写入配置成功,但执行 supervisorctl reread/update 失败: {e}", ) return EnsureProgramResult( ok=True, program=program, ini_path=str(path), program_dir=str(program_dir), supervisor_conf=conf_s, reread=reread_out, update=update_out, ) except Exception as e: return EnsureProgramResult( ok=False, program=program, ini_path="", program_dir=str(program_dir), supervisor_conf=conf_s, error=str(e), )