auto_trade_sys/backend/api/supervisor_account.py
薇薇安 414607d566 a
2026-01-21 22:38:24 +08:00

490 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Supervisor 多账号托管(宝塔插件兼容)
目标:
- 根据 account_id 自动生成一个 supervisor program 配置文件(.ini
- 自动定位 supervisord.conf 的 include 目录(尽量不要求你手填路径)
- 提供 supervisorctl 的常用调用封装reread/update/status/start/stop/restart
重要说明:
- 本模块只写入“程序配置文件”,不包含任何 API Key/Secret
- trading_system 进程通过 ATS_ACCOUNT_ID 选择自己的账号配置
"""
from __future__ import annotations
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
DEFAULT_CANDIDATE_CONFS = [
"/www/server/panel/plugin/supervisor/supervisord.conf",
"/www/server/panel/plugin/supervisor/supervisor.conf",
"/etc/supervisor/supervisord.conf",
"/etc/supervisord.conf",
]
# 常见 supervisord 主日志路径候选(不同发行版/面板插件差异很大)
DEFAULT_SUPERVISORD_LOG_CANDIDATES = [
# aaPanel / 宝塔 supervisor 插件常见
"/www/server/panel/plugin/supervisor/log/supervisord.log",
"/www/server/panel/plugin/supervisor/log/supervisor.log",
"/www/server/panel/plugin/supervisor/supervisord.log",
"/www/server/panel/plugin/supervisor/supervisor.log",
# 系统 supervisor 常见
"/var/log/supervisor/supervisord.log",
"/var/log/supervisor/supervisor.log",
"/var/log/supervisord.log",
"/var/log/supervisord/supervisord.log",
"/tmp/supervisord.log",
]
def _get_project_root() -> Path:
# backend/api/supervisor_account.py -> api -> backend -> project_root
# 期望得到:<project_root>(例如 /www/wwwroot/autosys_new
return Path(__file__).resolve().parents[2]
def _detect_supervisor_conf_path() -> Optional[Path]:
p = (os.getenv("SUPERVISOR_CONF") or "").strip()
if p:
pp = Path(p)
return pp if pp.exists() else pp # 允许不存在时也返回,便于报错信息
for cand in DEFAULT_CANDIDATE_CONFS:
try:
cp = Path(cand)
if cp.exists():
return cp
except Exception:
continue
return None
def _parse_include_dir_from_conf(conf_path: Path) -> Optional[Path]:
"""
尝试解析 supervisord.conf 的 [include] files=... 目录。
常见格式:
[include]
files = /path/to/conf.d/*.ini
"""
try:
text = conf_path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return None
in_include = False
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith(";") or line.startswith("#"):
continue
if re.match(r"^\[include\]\s*$", line, flags=re.I):
in_include = True
continue
if in_include and line.startswith("[") and line.endswith("]"):
break
if not in_include:
continue
m = re.match(r"^files\s*=\s*(.+)$", line, flags=re.I)
if not m:
continue
val = (m.group(1) or "").strip().strip('"').strip("'")
if not val:
continue
# 只取第一个 pattern即使写了多个用空格分隔
first = val.split()[0]
p = Path(first)
if not p.is_absolute():
p = (conf_path.parent / p).resolve()
return p.parent
return None
def get_supervisor_program_dir() -> Path:
"""
获取 supervisor program 配置目录(优先级):
1) SUPERVISOR_PROGRAM_DIR
2) 从 supervisord.conf 的 [include] files= 解析
3) 兜底:/www/server/panel/plugin/supervisor你当前看到的目录
"""
env_dir = (os.getenv("SUPERVISOR_PROGRAM_DIR") or "").strip()
if env_dir:
return Path(env_dir)
conf = _detect_supervisor_conf_path()
if conf and conf.exists():
inc = _parse_include_dir_from_conf(conf)
if inc:
return inc
return Path("/www/server/panel/plugin/supervisor")
def program_name_for_account(account_id: int) -> str:
tmpl = (os.getenv("SUPERVISOR_TRADING_PROGRAM_TEMPLATE") or "auto_sys_acc{account_id}").strip()
try:
return tmpl.format(account_id=int(account_id))
except Exception:
return f"auto_sys_acc{int(account_id)}"
def ini_filename_for_program(program_name: str) -> str:
safe = re.sub(r"[^a-zA-Z0-9_\-:.]+", "_", program_name).strip("_") or "auto_sys"
return f"{safe}.ini"
def render_program_ini(account_id: int, program_name: str) -> str:
project_root = _get_project_root()
# Python 可执行文件:
# - 优先使用 TRADING_PYTHON_BIN线上可显式指定 trading_system 的 venv
# - 否则尝试多种候选路径(避免 backend venv 未安装交易依赖导致启动失败)
python_bin_env = (os.getenv("TRADING_PYTHON_BIN") or "").strip()
candidates = []
if python_bin_env:
candidates.append(python_bin_env)
# 当前进程 pythonbackend venv
candidates.append(sys.executable)
# 常见 venv 位置
candidates += [
str(project_root / "backend" / ".venv" / "bin" / "python"),
str(project_root / ".venv" / "bin" / "python"),
str(project_root / "trading_system" / ".venv" / "bin" / "python"),
"/usr/bin/python3",
"/usr/local/bin/python3",
]
python_bin = None
for c in candidates:
try:
p = Path(c)
if p.exists() and os.access(str(p), os.X_OK):
python_bin = str(p)
break
except Exception:
continue
if not python_bin:
# 最后兜底:写 sys.executable让错误能在日志里体现
python_bin = sys.executable
# 日志目录可通过环境变量覆盖
log_dir, out_log, err_log = expected_trading_log_paths(project_root, int(account_id))
# supervisor 在 reread/update 时会校验 logfile 目录是否存在;这里提前创建避免 CANT_REREAD
try:
log_dir.mkdir(parents=True, exist_ok=True)
except Exception:
# 最后兜底到 /tmp确保一定存在
log_dir = Path("/tmp") / "autosys_logs"
log_dir.mkdir(parents=True, exist_ok=True)
out_log = log_dir / f"trading_{int(account_id)}.out.log"
err_log = log_dir / f"trading_{int(account_id)}.err.log"
# 默认不自动启动,避免“创建账号=立刻下单”
autostart = (os.getenv("TRADING_AUTOSTART_DEFAULT", "false") or "false").lower() == "true"
run_user = (os.getenv("SUPERVISOR_RUN_USER") or "").strip()
return "\n".join(
[
f"[program:{program_name}]",
f"directory={project_root}",
f"command={python_bin} -m trading_system.main",
"autostart=" + ("true" if autostart else "false"),
# 更合理仅在“非0退出”时重启0 退出视为“正常结束”,不进入 FATAL 反复拉起
"autorestart=unexpected",
# 兼容0/2 都视为“预期退出”(例如配置不完整/前置检查失败时主动退出)
"exitcodes=0,2",
"startsecs=0",
"stopasgroup=true",
"killasgroup=true",
"stopsignal=TERM",
"",
# 关键PYTHONPATH 指向项目根,确保 -m trading_system.main 可导入
f'environment=ATS_ACCOUNT_ID="{int(account_id)}",PYTHONUNBUFFERED="1",PYTHONPATH="{project_root}"',
(f"user={run_user}" if run_user else "").rstrip(),
"",
f"stdout_logfile={out_log}",
f"stderr_logfile={err_log}",
"stdout_logfile_maxbytes=20MB",
"stdout_logfile_backups=5",
"stderr_logfile_maxbytes=20MB",
"stderr_logfile_backups=5",
"",
]
)
def write_program_ini(program_dir: Path, filename: str, content: str) -> Path:
program_dir.mkdir(parents=True, exist_ok=True)
target = program_dir / filename
tmp = program_dir / (filename + ".tmp")
tmp.write_text(content, encoding="utf-8")
os.replace(str(tmp), str(target))
return target
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
supervisor_conf = (os.getenv("SUPERVISOR_CONF") or "").strip()
use_sudo = (os.getenv("SUPERVISOR_USE_SUDO", "false") or "false").lower() == "true"
if not supervisor_conf:
conf = _detect_supervisor_conf_path()
supervisor_conf = str(conf) if conf else ""
cmd: list[str] = []
if use_sudo:
cmd += ["sudo", "-n"]
cmd += [supervisorctl_path]
if supervisor_conf:
cmd += ["-c", supervisor_conf]
cmd += args
return cmd
def run_supervisorctl(args: list[str], timeout_sec: int = 10) -> str:
cmd = _build_supervisorctl_cmd(args)
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=int(timeout_sec))
except subprocess.TimeoutExpired:
raise RuntimeError("supervisorctl 超时")
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
combined = "\n".join([s for s in [out, err] if s]).strip()
# supervisorctl: status 在存在 STOPPED 等进程时可能返回 exit=3但输出仍然有效
ok_rc = {0}
if args and args[0] == "status":
ok_rc.add(3)
if res.returncode not in ok_rc:
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
return combined or out
def parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
if "RUNNING" in raw:
m = re.search(r"\bpid\s+(\d+)\b", raw)
pid = int(m.group(1)) if m else None
return True, pid, "RUNNING"
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
if state in raw:
return False, None, state
return False, None, "UNKNOWN"
def tail_supervisor(program: str, stream: str = "stderr", lines: int = 120) -> str:
"""
读取 supervisor 进程最近日志stdout/stderr
supervisorctl tail -<n> <program> [stdout|stderr]
"""
s = (stream or "stderr").strip().lower()
if s not in {"stdout", "stderr"}:
s = "stderr"
n = int(lines or 120)
if n < 20:
n = 20
if n > 500:
n = 500
return run_supervisorctl(["tail", f"-{n}", str(program), s])
def _tail_text_file(path: Path, lines: int = 200, max_bytes: int = 64 * 1024) -> str:
"""
读取文本文件末尾(用于 supervisor spawn error 等场景program stderr 可能为空)。
尽量只读最后 max_bytes避免大文件占用内存。
"""
try:
p = Path(path)
if not p.exists():
return ""
size = p.stat().st_size
read_size = min(int(max_bytes), int(size))
with p.open("rb") as f:
if size > read_size:
f.seek(-read_size, os.SEEK_END)
data = f.read()
text = data.decode("utf-8", errors="ignore")
# 仅保留最后 N 行
parts = text.splitlines()
if not parts:
return ""
n = int(lines or 200)
if n < 20:
n = 20
if n > 500:
n = 500
return "\n".join(parts[-n:]).strip()
except Exception:
# 兜底:若启用 sudo通常 backend 自己无权读 root 日志),尝试 sudo tail
try:
use_sudo = (os.getenv("SUPERVISOR_USE_SUDO", "false") or "false").lower() == "true"
if not use_sudo:
return ""
n = int(lines or 200)
if n < 20:
n = 20
if n > 500:
n = 500
res = subprocess.run(
["sudo", "-n", "tail", "-n", str(n), str(path)],
capture_output=True,
text=True,
timeout=5,
)
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
# 不强行报错:宁可空,也不要影响主流程
return (out or err or "").strip()
except Exception:
return ""
def _parse_supervisord_logfile_from_conf(conf_path: Path) -> Optional[Path]:
"""
解析 supervisord.conf 中 [supervisord] 的 logfile= 路径。
"""
try:
text = conf_path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return None
in_section = False
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith(";") or line.startswith("#"):
continue
if re.match(r"^\[supervisord\]\s*$", line, flags=re.I):
in_section = True
continue
if in_section and line.startswith("[") and line.endswith("]"):
break
if not in_section:
continue
m = re.match(r"^logfile\s*=\s*(.+)$", line, flags=re.I)
if not m:
continue
val = (m.group(1) or "").strip().strip('"').strip("'")
if not val:
continue
p = Path(val)
if not p.is_absolute():
p = (conf_path.parent / p).resolve()
return p
return None
def tail_supervisord_log(lines: int = 200) -> str:
"""
读取 supervisord 主日志尾部spawn error 的根因经常在这里)。
可通过环境变量 SUPERVISOR_LOGFILE 指定。
"""
env_p = (os.getenv("SUPERVISOR_LOGFILE") or "").strip()
if env_p:
return _tail_text_file(Path(env_p), lines=lines)
conf = _detect_supervisor_conf_path()
if conf and conf.exists():
lp = _parse_supervisord_logfile_from_conf(conf)
if lp:
return _tail_text_file(lp, lines=lines)
# 最后兜底:尝试常见路径
for cand in DEFAULT_SUPERVISORD_LOG_CANDIDATES:
try:
p = Path(cand)
if p.exists():
text = _tail_text_file(p, lines=lines)
if text:
return text
except Exception:
continue
return ""
def expected_trading_log_paths(project_root: Path, account_id: int) -> Tuple[Path, Path, Path]:
"""
计算 trading program 的 stdout/stderr logfile 路径(需与 render_program_ini 保持一致)。
返回 (log_dir, out_log, err_log)
"""
log_dir = Path(os.getenv("TRADING_LOG_DIR", str(project_root / "logs"))).expanduser()
out_log = log_dir / f"trading_{int(account_id)}.out.log"
err_log = log_dir / f"trading_{int(account_id)}.err.log"
return log_dir, out_log, err_log
def tail_trading_log_files(account_id: int, lines: int = 200) -> Dict[str, Any]:
"""
直接读取该账号 trading 进程的 stdout/stderr logfile 尾部(不依赖 supervisorctl tail
返回 {out_log, err_log, stdout_tail, stderr_tail}
"""
project_root = _get_project_root()
log_dir, out_log, err_log = expected_trading_log_paths(project_root, int(account_id))
return {
"log_dir": str(log_dir),
"out_log": str(out_log),
"err_log": str(err_log),
"stdout_tail_file": _tail_text_file(out_log, lines=lines),
"stderr_tail_file": _tail_text_file(err_log, lines=lines),
}
@dataclass
class EnsureProgramResult:
ok: bool
program: str
ini_path: str
program_dir: str
supervisor_conf: str
reread: str = ""
update: str = ""
error: str = ""
def ensure_account_program(account_id: int) -> EnsureProgramResult:
aid = int(account_id)
program = program_name_for_account(aid)
program_dir = get_supervisor_program_dir()
ini_name = ini_filename_for_program(program)
ini_text = render_program_ini(aid, program)
conf = _detect_supervisor_conf_path()
conf_s = str(conf) if conf else (os.getenv("SUPERVISOR_CONF") or "")
try:
path = write_program_ini(program_dir, ini_name, ini_text)
reread_out = ""
update_out = ""
try:
reread_out = run_supervisorctl(["reread"])
update_out = run_supervisorctl(["update"])
except Exception as e:
# 写文件成功但 supervisorctl 失败也要给出可诊断信息
return EnsureProgramResult(
ok=False,
program=program,
ini_path=str(path),
program_dir=str(program_dir),
supervisor_conf=conf_s,
reread=reread_out,
update=update_out,
error=f"写入配置成功,但执行 supervisorctl reread/update 失败: {e}",
)
return EnsureProgramResult(
ok=True,
program=program,
ini_path=str(path),
program_dir=str(program_dir),
supervisor_conf=conf_s,
reread=reread_out,
update=update_out,
)
except Exception as e:
return EnsureProgramResult(
ok=False,
program=program,
ini_path="",
program_dir=str(program_dir),
supervisor_conf=conf_s,
error=str(e),
)