auto_trade_sys/backend/api/supervisor_account.py
薇薇安 6d48dc98d2 a
2026-01-21 11:04:44 +08:00

271 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Supervisor 多账号托管(宝塔插件兼容)
目标:
- 根据 account_id 自动生成一个 supervisor program 配置文件(.ini
- 自动定位 supervisord.conf 的 include 目录(尽量不要求你手填路径)
- 提供 supervisorctl 的常用调用封装reread/update/status/start/stop/restart
重要说明:
- 本模块只写入“程序配置文件”,不包含任何 API Key/Secret
- trading_system 进程通过 ATS_ACCOUNT_ID 选择自己的账号配置
"""
from __future__ import annotations
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
DEFAULT_CANDIDATE_CONFS = [
"/www/server/panel/plugin/supervisor/supervisord.conf",
"/www/server/panel/plugin/supervisor/supervisor.conf",
"/etc/supervisor/supervisord.conf",
"/etc/supervisord.conf",
]
def _get_project_root() -> Path:
# backend/api/supervisor_account.py -> api -> backend -> project_root
return Path(__file__).resolve().parents[2].parent
def _detect_supervisor_conf_path() -> Optional[Path]:
p = (os.getenv("SUPERVISOR_CONF") or "").strip()
if p:
pp = Path(p)
return pp if pp.exists() else pp # 允许不存在时也返回,便于报错信息
for cand in DEFAULT_CANDIDATE_CONFS:
try:
cp = Path(cand)
if cp.exists():
return cp
except Exception:
continue
return None
def _parse_include_dir_from_conf(conf_path: Path) -> Optional[Path]:
"""
尝试解析 supervisord.conf 的 [include] files=... 目录。
常见格式:
[include]
files = /path/to/conf.d/*.ini
"""
try:
text = conf_path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return None
in_include = False
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith(";") or line.startswith("#"):
continue
if re.match(r"^\[include\]\s*$", line, flags=re.I):
in_include = True
continue
if in_include and line.startswith("[") and line.endswith("]"):
break
if not in_include:
continue
m = re.match(r"^files\s*=\s*(.+)$", line, flags=re.I)
if not m:
continue
val = (m.group(1) or "").strip().strip('"').strip("'")
if not val:
continue
# 只取第一个 pattern即使写了多个用空格分隔
first = val.split()[0]
p = Path(first)
if not p.is_absolute():
p = (conf_path.parent / p).resolve()
return p.parent
return None
def get_supervisor_program_dir() -> Path:
"""
获取 supervisor program 配置目录(优先级):
1) SUPERVISOR_PROGRAM_DIR
2) 从 supervisord.conf 的 [include] files= 解析
3) 兜底:/www/server/panel/plugin/supervisor你当前看到的目录
"""
env_dir = (os.getenv("SUPERVISOR_PROGRAM_DIR") or "").strip()
if env_dir:
return Path(env_dir)
conf = _detect_supervisor_conf_path()
if conf and conf.exists():
inc = _parse_include_dir_from_conf(conf)
if inc:
return inc
return Path("/www/server/panel/plugin/supervisor")
def program_name_for_account(account_id: int) -> str:
tmpl = (os.getenv("SUPERVISOR_TRADING_PROGRAM_TEMPLATE") or "auto_sys_acc{account_id}").strip()
try:
return tmpl.format(account_id=int(account_id))
except Exception:
return f"auto_sys_acc{int(account_id)}"
def ini_filename_for_program(program_name: str) -> str:
safe = re.sub(r"[^a-zA-Z0-9_\-:.]+", "_", program_name).strip("_") or "auto_sys"
return f"{safe}.ini"
def render_program_ini(account_id: int, program_name: str) -> str:
project_root = _get_project_root()
python_bin = sys.executable # 使用 backend 当前虚拟环境,通常已包含 trading_system 依赖
# 日志目录可通过环境变量覆盖
log_dir = Path(os.getenv("TRADING_LOG_DIR", str(project_root / "logs"))).expanduser()
out_log = log_dir / f"trading_{int(account_id)}.out.log"
err_log = log_dir / f"trading_{int(account_id)}.err.log"
# 默认不自动启动,避免“创建账号=立刻下单”
autostart = (os.getenv("TRADING_AUTOSTART_DEFAULT", "false") or "false").lower() == "true"
return "\n".join(
[
f"[program:{program_name}]",
f"directory={project_root}",
f"command={python_bin} -m trading_system.main",
"autostart=" + ("true" if autostart else "false"),
"autorestart=true",
"startsecs=3",
"stopasgroup=true",
"killasgroup=true",
"",
f'environment=ATS_ACCOUNT_ID="{int(account_id)}",PYTHONUNBUFFERED="1"',
"",
f"stdout_logfile={out_log}",
f"stderr_logfile={err_log}",
"",
]
)
def write_program_ini(program_dir: Path, filename: str, content: str) -> Path:
program_dir.mkdir(parents=True, exist_ok=True)
target = program_dir / filename
tmp = program_dir / (filename + ".tmp")
tmp.write_text(content, encoding="utf-8")
os.replace(str(tmp), str(target))
return target
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
supervisor_conf = (os.getenv("SUPERVISOR_CONF") or "").strip()
use_sudo = (os.getenv("SUPERVISOR_USE_SUDO", "false") or "false").lower() == "true"
if not supervisor_conf:
conf = _detect_supervisor_conf_path()
supervisor_conf = str(conf) if conf else ""
cmd: list[str] = []
if use_sudo:
cmd += ["sudo", "-n"]
cmd += [supervisorctl_path]
if supervisor_conf:
cmd += ["-c", supervisor_conf]
cmd += args
return cmd
def run_supervisorctl(args: list[str], timeout_sec: int = 10) -> str:
cmd = _build_supervisorctl_cmd(args)
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=int(timeout_sec))
except subprocess.TimeoutExpired:
raise RuntimeError("supervisorctl 超时")
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
combined = "\n".join([s for s in [out, err] if s]).strip()
if res.returncode != 0:
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
return combined or out
def parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
if "RUNNING" in raw:
m = re.search(r"\bpid\s+(\d+)\b", raw)
pid = int(m.group(1)) if m else None
return True, pid, "RUNNING"
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
if state in raw:
return False, None, state
return False, None, "UNKNOWN"
@dataclass
class EnsureProgramResult:
ok: bool
program: str
ini_path: str
program_dir: str
supervisor_conf: str
reread: str = ""
update: str = ""
error: str = ""
def ensure_account_program(account_id: int) -> EnsureProgramResult:
aid = int(account_id)
program = program_name_for_account(aid)
program_dir = get_supervisor_program_dir()
ini_name = ini_filename_for_program(program)
ini_text = render_program_ini(aid, program)
conf = _detect_supervisor_conf_path()
conf_s = str(conf) if conf else (os.getenv("SUPERVISOR_CONF") or "")
try:
path = write_program_ini(program_dir, ini_name, ini_text)
reread_out = ""
update_out = ""
try:
reread_out = run_supervisorctl(["reread"])
update_out = run_supervisorctl(["update"])
except Exception as e:
# 写文件成功但 supervisorctl 失败也要给出可诊断信息
return EnsureProgramResult(
ok=False,
program=program,
ini_path=str(path),
program_dir=str(program_dir),
supervisor_conf=conf_s,
reread=reread_out,
update=update_out,
error=f"写入配置成功,但执行 supervisorctl reread/update 失败: {e}",
)
return EnsureProgramResult(
ok=True,
program=program,
ini_path=str(path),
program_dir=str(program_dir),
supervisor_conf=conf_s,
reread=reread_out,
update=update_out,
)
except Exception as e:
return EnsureProgramResult(
ok=False,
program=program,
ini_path="",
program_dir=str(program_dir),
supervisor_conf=conf_s,
error=str(e),
)