auto_trade_sys/backend/api/routes/system.py
薇薇安 45a654f654 a
2026-01-21 21:45:10 +08:00

1081 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import subprocess
import json
import time
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from fastapi import APIRouter, HTTPException, Header, Depends
from pydantic import BaseModel
import logging
logger = logging.getLogger(__name__)
# 路由统一挂在 /api/system 下,前端直接调用 /api/system/...
router = APIRouter(prefix="/api/system")
# 管理员鉴权JWT未启用登录时兼容 X-Admin-Token
from api.auth_deps import require_system_admin # noqa: E402
from database.models import Account # noqa: E402
LOG_GROUPS = ("error", "warning", "info")
# 后端服务启动时间(用于前端展示“运行多久/是否已重启”)
_BACKEND_STARTED_AT_MS: int = int(time.time() * 1000)
# 系统元信息存储(优先 Redis用于记录重启时间等
def _system_meta_prefix() -> str:
return (os.getenv("SYSTEM_META_PREFIX", "ats:system").strip() or "ats:system")
def _system_meta_key(name: str) -> str:
name = (name or "").strip()
return f"{_system_meta_prefix()}:{name}"
def _system_meta_read(name: str) -> Optional[Dict[str, Any]]:
client = _get_redis_client_for_logs()
if client is None:
return None
try:
raw = client.get(_system_meta_key(name))
if not raw:
return None
return json.loads(raw)
except Exception:
return None
def _system_meta_write(name: str, payload: Dict[str, Any], ttl_sec: int = 30 * 24 * 3600) -> None:
client = _get_redis_client_for_logs()
if client is None:
return
try:
client.setex(_system_meta_key(name), int(ttl_sec), json.dumps(payload, ensure_ascii=False))
except Exception:
return
# 避免 Redis 异常刷屏(前端可能自动刷新)
_last_logs_redis_err_ts: float = 0.0
def _logs_prefix() -> str:
return (os.getenv("REDIS_LOG_LIST_PREFIX", "ats:logs").strip() or "ats:logs")
def _logs_key_for_group(group: str) -> str:
group = (group or "error").strip().lower()
# 兼容旧配置REDIS_LOG_LIST_KEY 仅用于 error
if group == "error":
legacy = os.getenv("REDIS_LOG_LIST_KEY", "").strip()
if legacy:
return legacy
env_key = os.getenv(f"REDIS_LOG_LIST_KEY_{group.upper()}", "").strip()
if env_key:
return env_key
return f"{_logs_prefix()}:{group}"
def _logs_config_key() -> str:
return (os.getenv("REDIS_LOG_CONFIG_KEY", "ats:logs:config").strip() or "ats:logs:config")
def _logs_stats_prefix() -> str:
return (os.getenv("REDIS_LOG_STATS_PREFIX", "ats:logs:stats:added").strip() or "ats:logs:stats:added")
def _beijing_yyyymmdd() -> str:
from datetime import datetime, timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(tz=beijing_tz).strftime("%Y%m%d")
def _default_logs_config() -> Dict[str, Any]:
return {
"max_len": {"error": 2000, "warning": 2000, "info": 2000},
"enabled": {"error": True, "warning": True, "info": True},
"dedupe_consecutive": True,
"include_debug_in_info": False,
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
"config_key": _logs_config_key(),
"stats_prefix": _logs_stats_prefix(),
}
def _merge_logs_config(defaults: Dict[str, Any], redis_hash: Dict[str, str]) -> Dict[str, Any]:
cfg = defaults
for g in LOG_GROUPS:
v = redis_hash.get(f"max_len:{g}")
if v is not None:
try:
n = int(str(v).strip())
if n > 0:
cfg["max_len"][g] = n
except Exception:
pass
ev = redis_hash.get(f"enabled:{g}")
if ev is not None:
s = str(ev).strip().lower()
cfg["enabled"][g] = s in ("1", "true", "yes", "y", "on")
for k in ("dedupe_consecutive", "include_debug_in_info"):
vv = redis_hash.get(k)
if vv is not None:
s = str(vv).strip().lower()
cfg[k] = s in ("1", "true", "yes", "y", "on")
return cfg
def _read_logs_config(client) -> Dict[str, Any]:
defaults = _default_logs_config()
try:
raw = client.hgetall(_logs_config_key()) or {}
return _merge_logs_config(defaults, raw)
except Exception:
return defaults
def _write_logs_config_and_trim(client, cfg: Dict[str, Any]) -> Dict[str, Any]:
mapping: Dict[str, str] = {}
for g in LOG_GROUPS:
mapping[f"max_len:{g}"] = str(int(cfg["max_len"][g]))
mapping[f"enabled:{g}"] = "1" if cfg["enabled"][g] else "0"
mapping["dedupe_consecutive"] = "1" if cfg.get("dedupe_consecutive") else "0"
mapping["include_debug_in_info"] = "1" if cfg.get("include_debug_in_info") else "0"
# 注意AWS Valkey/Redis 集群模式下MULTI/EXEC 不能跨 slot
# 这里会同时操作多个 keyconfig hash + 3 个 list所以必须禁用 transaction
pipe = client.pipeline(transaction=False)
pipe.hset(_logs_config_key(), mapping=mapping)
for g in LOG_GROUPS:
key = _logs_key_for_group(g)
max_len = int(cfg["max_len"][g])
if max_len > 0:
pipe.ltrim(key, 0, max_len - 1)
pipe.execute()
return cfg
class LogsConfigUpdate(BaseModel):
max_len: Optional[Dict[str, int]] = None
enabled: Optional[Dict[str, bool]] = None
dedupe_consecutive: Optional[bool] = None
include_debug_in_info: Optional[bool] = None
def _beijing_time_str() -> str:
from datetime import datetime, timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(tz=beijing_tz).strftime("%Y-%m-%d %H:%M:%S")
@router.post("/logs/test-write")
async def logs_test_write(
_admin: Dict[str, Any] = Depends(require_system_admin),
) -> Dict[str, Any]:
"""
写入 3 条测试日志到 Rediserror/warning/info用于验证“是否写入到同一台 Redis、同一组 key”。
"""
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法写入测试日志")
cfg = _read_logs_config(client)
now_ms = int(__import__("time").time() * 1000)
time_str = _beijing_time_str()
entries = {
"error": {"level": "ERROR", "message": f"[TEST] backend 写入 error 测试日志 @ {time_str}"},
"warning": {"level": "WARNING", "message": f"[TEST] backend 写入 warning 测试日志 @ {time_str}"},
"info": {"level": "INFO", "message": f"[TEST] backend 写入 info 测试日志 @ {time_str}"},
}
day = _beijing_yyyymmdd()
stats_prefix = _logs_stats_prefix()
# 集群模式下禁用 transaction避免 CROSSSLOT
pipe = client.pipeline(transaction=False)
for g in LOG_GROUPS:
key = _logs_key_for_group(g)
max_len = int(cfg["max_len"][g])
entry = {
"ts": now_ms,
"time": time_str,
"service": "backend",
"level": entries[g]["level"],
"logger": "api.routes.system",
"message": entries[g]["message"],
"hostname": os.getenv("HOSTNAME", ""),
"signature": f"backend|{entries[g]['level']}|test|{entries[g]['message']}",
"count": 1,
}
pipe.lpush(key, json.dumps(entry, ensure_ascii=False))
if max_len > 0:
pipe.ltrim(key, 0, max_len - 1)
pipe.incr(f"{stats_prefix}:{day}:{g}", 1)
pipe.expire(f"{stats_prefix}:{day}:{g}", 14 * 24 * 3600)
pipe.execute()
# 返回写入后的 LLEN便于你确认
# 单 key LLEN 查询也不需要 transaction
pipe2 = client.pipeline(transaction=False)
for g in LOG_GROUPS:
pipe2.llen(_logs_key_for_group(g))
llens = pipe2.execute()
return {
"message": "ok",
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
"llen": {g: int(llens[i] or 0) for i, g in enumerate(LOG_GROUPS)},
"note": "如果你在前端仍看不到,说明前端请求的后端实例/Redis key/筛选条件不一致。",
}
def _get_redis_client_for_logs():
"""
获取 Redis 客户端(优先复用 config_manager 的连接;失败则自行创建)。
返回redis.Redis 或 None
"""
# 1) 复用 config_manager避免重复连接
try:
import config_manager # backend/config_manager.py已负责加载 .env
cm = getattr(config_manager, "config_manager", None)
if cm is not None:
redis_client = getattr(cm, "_redis_client", None)
redis_connected = getattr(cm, "_redis_connected", False)
if redis_client is not None and redis_connected:
try:
redis_client.ping()
return redis_client
except Exception:
pass
except Exception:
pass
# 2) 自行创建
try:
import redis # type: ignore
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_use_tls = os.getenv("REDIS_USE_TLS", "False").lower() == "true"
redis_username = os.getenv("REDIS_USERNAME", None)
redis_password = os.getenv("REDIS_PASSWORD", None)
ssl_cert_reqs = os.getenv("REDIS_SSL_CERT_REQS", "required")
ssl_ca_certs = os.getenv("REDIS_SSL_CA_CERTS", None)
kwargs: Dict[str, Any] = {
"decode_responses": True,
"username": redis_username,
"password": redis_password,
"socket_connect_timeout": 1,
"socket_timeout": 1,
}
if redis_url.startswith("rediss://") or redis_use_tls:
kwargs["ssl_cert_reqs"] = ssl_cert_reqs
if ssl_ca_certs:
kwargs["ssl_ca_certs"] = ssl_ca_certs
if ssl_cert_reqs == "none":
kwargs["ssl_check_hostname"] = False
elif ssl_cert_reqs == "required":
kwargs["ssl_check_hostname"] = True
else:
kwargs["ssl_check_hostname"] = False
client = redis.from_url(redis_url, **kwargs)
client.ping()
return client
except Exception as e:
# 把错误尽量打到 api.log你现在看的文件
global _last_logs_redis_err_ts
import time as _t
now = _t.time()
if now - _last_logs_redis_err_ts > 30:
_last_logs_redis_err_ts = now
logger.warning(f"日志模块 Redis 连接失败。REDIS_URL={os.getenv('REDIS_URL', '')} err={e}")
return None
@router.get("/logs")
async def get_logs(
limit: int = 200,
group: str = "error",
start: int = 0,
service: Optional[str] = None,
level: Optional[str] = None,
_admin: Dict[str, Any] = Depends(require_system_admin),
) -> Dict[str, Any]:
"""
从 Redis List 读取最新日志(默认 group=error -> ats:logs:error
参数:
- limit: 返回条数(最大 2000
- group: 日志分组error / warning / info
- service: 过滤backend / trading_system
- level: 过滤ERROR / CRITICAL ...
"""
if limit <= 0:
limit = 200
if limit > 20000:
limit = 20000
if start < 0:
start = 0
group = (group or "error").strip().lower()
if group not in LOG_GROUPS:
raise HTTPException(status_code=400, detail=f"非法 group{group}(可选:{', '.join(LOG_GROUPS)}")
list_key = _logs_key_for_group(group)
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志")
try:
llen_total = int(client.llen(list_key) or 0)
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}")
if llen_total <= 0:
return {
"group": group,
"key": list_key,
"start": start,
"limit": limit,
"llen_total": 0,
"next_start": start,
"has_more": False,
"count": 0,
"items": [],
}
# 分页扫描:为了支持 service/level 过滤,这里会向后多取一些直到凑够 limit 或到末尾
# 保护:最多扫描 limit*10 条,避免过滤太严格导致无限扫描
max_scan = min(llen_total, start + limit * 10)
pos = start
scanned = 0
items: list[Dict[str, Any]] = []
try:
while len(items) < limit and pos < llen_total and pos < max_scan:
chunk_size = min(500, limit, max_scan - pos)
end = pos + chunk_size - 1
raw_batch = client.lrange(list_key, pos, end)
scanned += len(raw_batch or [])
for raw in raw_batch or []:
try:
obj = raw
if isinstance(raw, bytes):
obj = raw.decode("utf-8", errors="ignore")
if not isinstance(obj, str):
continue
parsed = json.loads(obj)
if not isinstance(parsed, dict):
continue
if service and str(parsed.get("service")) != service:
continue
if level and str(parsed.get("level")) != level:
continue
items.append(parsed)
if len(items) >= limit:
break
except Exception:
continue
pos = end + 1
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取 Redis 日志失败: {e}")
return {
"group": group,
"key": list_key,
"start": start,
"limit": limit,
"llen_total": llen_total,
"scanned": scanned,
"next_start": pos,
"has_more": pos < llen_total,
"count": len(items),
"items": items,
}
@router.get("/logs/overview")
async def logs_overview(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法读取日志概览")
try:
cfg = _read_logs_config(client)
day = _beijing_yyyymmdd()
stats_prefix = _logs_stats_prefix()
# 集群模式下禁用 transaction避免 CROSSSLOT
pipe = client.pipeline(transaction=False)
for g in LOG_GROUPS:
pipe.llen(_logs_key_for_group(g))
for g in LOG_GROUPS:
pipe.get(f"{stats_prefix}:{day}:{g}")
res = pipe.execute()
llen_vals = res[: len(LOG_GROUPS)]
added_vals = res[len(LOG_GROUPS) :]
llen: Dict[str, int] = {}
added_today: Dict[str, int] = {}
for i, g in enumerate(LOG_GROUPS):
try:
llen[g] = int(llen_vals[i] or 0)
except Exception:
llen[g] = 0
try:
added_today[g] = int(added_vals[i] or 0)
except Exception:
added_today[g] = 0
return {
"config": cfg,
"stats": {
"day": day,
"llen": llen,
"added_today": added_today,
},
"meta": {
"redis_url": os.getenv("REDIS_URL", ""),
"keys": {g: _logs_key_for_group(g) for g in LOG_GROUPS},
},
}
except HTTPException:
raise
except Exception as e:
logger.error(f"logs_overview 失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"logs_overview failed: {e}")
@router.put("/logs/config")
async def update_logs_config(
payload: LogsConfigUpdate,
_admin: Dict[str, Any] = Depends(require_system_admin),
) -> Dict[str, Any]:
client = _get_redis_client_for_logs()
if client is None:
raise HTTPException(status_code=503, detail="Redis 不可用,无法更新日志配置")
cfg = _read_logs_config(client)
if payload.max_len:
for g, v in payload.max_len.items():
gg = (g or "").strip().lower()
if gg not in LOG_GROUPS:
continue
try:
n = int(v)
if n < 100:
n = 100
if n > 20000:
n = 20000
cfg["max_len"][gg] = n
except Exception:
continue
if payload.enabled:
for g, v in payload.enabled.items():
gg = (g or "").strip().lower()
if gg not in LOG_GROUPS:
continue
cfg["enabled"][gg] = bool(v)
if payload.dedupe_consecutive is not None:
cfg["dedupe_consecutive"] = bool(payload.dedupe_consecutive)
if payload.include_debug_in_info is not None:
cfg["include_debug_in_info"] = bool(payload.include_debug_in_info)
cfg = _write_logs_config_and_trim(client, cfg)
return {"message": "ok", "config": cfg}
def _require_admin(token: Optional[str], provided: Optional[str]) -> None:
"""
可选的简单保护:如果环境变量配置了 SYSTEM_CONTROL_TOKEN则要求请求携带 X-Admin-Token。
生产环境强烈建议通过 Nginx 额外做鉴权 / IP 白名单。
"""
if not token:
return
if not provided or provided != token:
raise HTTPException(status_code=401, detail="Unauthorized")
#
# 注意require_system_admin 已迁移到 api.auth_deps避免导入不一致导致 uvicorn 启动失败
def _build_supervisorctl_cmd(args: list[str]) -> list[str]:
supervisorctl_path = os.getenv("SUPERVISORCTL_PATH", "supervisorctl")
supervisor_conf = os.getenv("SUPERVISOR_CONF", "").strip()
use_sudo = os.getenv("SUPERVISOR_USE_SUDO", "false").lower() == "true"
# 如果没显式配置 SUPERVISOR_CONF就尝试自动探测常见路径宝塔/系统)
if not supervisor_conf:
candidates = [
"/www/server/panel/plugin/supervisor/supervisord.conf",
"/www/server/panel/plugin/supervisor/supervisor.conf",
"/etc/supervisor/supervisord.conf",
"/etc/supervisord.conf",
]
for p in candidates:
try:
if Path(p).exists():
supervisor_conf = p
break
except Exception:
continue
cmd: list[str] = []
if use_sudo:
# 需要你在 sudoers 配置 NOPASSWDsudo -n 才不会卡住)
cmd += ["sudo", "-n"]
cmd += [supervisorctl_path]
if supervisor_conf:
cmd += ["-c", supervisor_conf]
cmd += args
return cmd
def _run_supervisorctl(args: list[str]) -> str:
cmd = _build_supervisorctl_cmd(args)
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired:
raise RuntimeError("supervisorctl 超时10s")
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
combined = "\n".join([s for s in [out, err] if s]).strip()
# supervisorctl 约定:
# - status 在存在 STOPPED/FATAL 等进程时可能返回 exit=3但输出仍然有效
ok_rc = {0}
if args and args[0] == "status":
ok_rc.add(3)
if res.returncode not in ok_rc:
raise RuntimeError(combined or f"supervisorctl failed (exit={res.returncode})")
return combined or out
def _parse_supervisor_status(raw: str) -> Tuple[bool, Optional[int], str]:
"""
典型输出:
- auto_sys RUNNING pid 1234, uptime 0:10:00
- auto_sys STOPPED Not started
"""
if "RUNNING" in raw:
m = re.search(r"\bpid\s+(\d+)\b", raw)
pid = int(m.group(1)) if m else None
return True, pid, "RUNNING"
for state in ["STOPPED", "FATAL", "EXITED", "BACKOFF", "STARTING", "UNKNOWN"]:
if state in raw:
return False, None, state
return False, None, "UNKNOWN"
def _list_supervisor_process_names(status_all_raw: str) -> list[str]:
names: list[str] = []
if not status_all_raw:
return names
for ln in status_all_raw.splitlines():
s = (ln or "").strip()
if not s:
continue
# 每行格式:<name> <STATE> ...
name = s.split(None, 1)[0].strip()
if name:
names.append(name)
return names
def _get_program_name() -> str:
# 你给的宝塔配置是 [program:auto_sys]
return os.getenv("SUPERVISOR_TRADING_PROGRAM", "auto_sys").strip() or "auto_sys"
def _select_best_process_name(program: str, status_all_raw: str) -> Optional[str]:
"""
从 `supervisorctl status` 全量输出中,找到最匹配的真实进程名。
兼容 supervisor 的 group:process 格式例如auto_sys:auto_sys_00
"""
if not status_all_raw:
return None
lines = [ln.strip() for ln in status_all_raw.splitlines() if ln.strip()]
names: list[str] = []
for ln in lines:
name = ln.split(None, 1)[0].strip()
if name:
names.append(name)
# 精确优先program / program_00 / program:program_00
preferred = [program, f"{program}_00", f"{program}:{program}_00"]
for cand in preferred:
if cand in names:
return cand
# 次优:任意以 program_ 开头
for name in names:
if name.startswith(program + "_"):
return name
# 次优:任意以 program: 开头
for name in names:
if name.startswith(program + ":"):
return name
return None
def _status_with_fallback(program: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
- 优先 `status <program>`
- 若 no such process返回全量 status并尝试解析真实 name例如 auto_sys:auto_sys_00
返回:(raw, resolved_name, status_all)
"""
try:
raw = _run_supervisorctl(["status", program])
return raw, program, None
except Exception as e:
msg = str(e).lower()
if "no such process" not in msg:
raise
status_all = _run_supervisorctl(["status"])
resolved = _select_best_process_name(program, status_all)
if resolved:
try:
raw = _run_supervisorctl(["status", resolved])
return raw, resolved, status_all
except Exception:
# 兜底:至少把全量输出返回,方便你确认真实进程名
return status_all, None, status_all
return status_all, None, status_all
def _action_with_fallback(action: str, program: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
对 start/stop/restart 做兜底:如果 program 不存在,尝试解析真实 name 再执行。
返回:(output, resolved_name, status_all)
"""
try:
out = _run_supervisorctl([action, program])
return out, program, None
except Exception as e:
msg = str(e).lower()
if "no such process" not in msg:
raise
status_all = _run_supervisorctl(["status"])
resolved = _select_best_process_name(program, status_all)
if not resolved:
# 没找到就把全量输出带上,方便定位
raise RuntimeError(f"no such process: {program}. 当前 supervisor 进程列表:\n{status_all}")
out = _run_supervisorctl([action, resolved])
return out, resolved, status_all
@router.post("/clear-cache")
async def clear_cache(
_admin: Dict[str, Any] = Depends(require_system_admin),
x_account_id: Optional[int] = Header(default=None, alias="X-Account-Id"),
) -> Dict[str, Any]:
"""
清理配置缓存Redis Hash: trading_config并从数据库回灌到 Redis。
"""
try:
import config_manager
account_id = int(x_account_id or 1)
cm = None
if hasattr(config_manager, "ConfigManager") and hasattr(config_manager.ConfigManager, "for_account"):
cm = config_manager.ConfigManager.for_account(account_id)
else:
cm = getattr(config_manager, "config_manager", None)
if cm is None:
raise HTTPException(status_code=500, detail="config_manager 未初始化")
deleted_keys: list[str] = []
# 1) 清 backend 本地 cache
try:
cm._cache = {}
except Exception:
pass
# 2) 清 Redis 缓存 keyHash: trading_config
try:
redis_client = getattr(cm, "_redis_client", None)
redis_connected = getattr(cm, "_redis_connected", False)
if redis_client is not None and redis_connected:
try:
redis_client.ping()
except Exception:
redis_connected = False
if redis_client is not None and redis_connected:
try:
key = getattr(cm, "_redis_hash_key", "trading_config")
redis_client.delete(key)
deleted_keys.append(str(key))
# 兼容:老 key仅 default 账号)
legacy = getattr(cm, "_legacy_hash_key", None)
if legacy and legacy != key:
redis_client.delete(legacy)
deleted_keys.append(str(legacy))
except Exception as e:
logger.warning(f"删除 Redis key 失败: {e}")
# 可选:实时推荐缓存(如果存在)
try:
redis_client.delete("recommendations:realtime")
deleted_keys.append("recommendations:realtime")
except Exception:
pass
except Exception as e:
logger.warning(f"清 Redis 缓存失败: {e}")
# 3) 立刻从 DB 回灌到 Redis避免 trading_system 读到空)
try:
cm.reload()
except Exception as e:
logger.warning(f"回灌配置到 Redis 失败仍可能使用DB/本地cache: {e}")
return {
"message": "缓存已清理并回灌",
"deleted_keys": deleted_keys,
"note": "如果你使用 supervisor 管理交易系统,请点击“重启交易系统”让新 Key 立即生效。",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"清理缓存失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/trading/status")
async def trading_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
program = _get_program_name()
try:
raw, resolved_name, status_all = _status_with_fallback(program)
running, pid, state = _parse_supervisor_status(raw)
meta = _system_meta_read("trading:last_restart") or {}
return {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all,
"meta": meta,
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"supervisorctl status 失败: {e}. 你可能需要配置 SUPERVISOR_CONF / SUPERVISOR_TRADING_PROGRAM / SUPERVISOR_USE_SUDO",
)
@router.post("/trading/start")
async def trading_start(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
program = _get_program_name()
try:
out, resolved_name, status_all = _action_with_fallback("start", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已启动supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl start 失败: {e}")
@router.post("/trading/stop")
async def trading_stop(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
program = _get_program_name()
try:
out, resolved_name, status_all = _action_with_fallback("stop", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
return {
"message": "交易系统已停止supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl stop 失败: {e}")
@router.post("/trading/restart")
async def trading_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
program = _get_program_name()
try:
requested_at = _beijing_time_str()
requested_at_ms = int(time.time() * 1000)
out, resolved_name, status_all = _action_with_fallback("restart", program)
raw, resolved_name2, status_all2 = _status_with_fallback(resolved_name or program)
running, pid, state = _parse_supervisor_status(raw)
# 记录交易系统重启时间(用于前端展示)
_system_meta_write(
"trading:last_restart",
{
"requested_at": requested_at,
"requested_at_ms": requested_at_ms,
"pid": pid,
"program": resolved_name2 or resolved_name or program,
},
)
return {
"message": "交易系统已重启supervisor",
"output": out,
"status": {
"mode": "supervisor",
"program": program,
"resolved_name": resolved_name2 or resolved_name,
"running": running,
"pid": pid,
"state": state,
"raw": raw,
"status_all": status_all2 or status_all,
},
"meta": {
"requested_at": requested_at,
"requested_at_ms": requested_at_ms,
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"supervisorctl restart 失败: {e}")
@router.post("/trading/restart-all")
async def trading_restart_all(
_admin: Dict[str, Any] = Depends(require_system_admin),
prefix: str = "auto_sys_acc",
include_default: bool = False,
do_update: bool = True,
) -> Dict[str, Any]:
"""
一键重启所有账号交易进程supervisor
- 默认重启所有以 auto_sys_acc 开头的 program例如 auto_sys_acc1/2/3...
- 可选 include_default=true同时包含 SUPERVISOR_TRADING_PROGRAM默认 auto_sys
- 可选 do_update=true先执行 supervisorctl reread/update 再重启(确保新 ini 生效)
"""
try:
prefix = (prefix or "auto_sys_acc").strip()
if not prefix:
prefix = "auto_sys_acc"
# 先读取全量 status拿到有哪些进程
status_all = _run_supervisorctl(["status"])
names = _list_supervisor_process_names(status_all)
targets: list[str] = []
skipped_disabled: list[Dict[str, Any]] = []
for n in names:
if n.startswith(prefix):
# 若能解析出 account_id则跳过 disabled 的账号
try:
m = re.match(rf"^{re.escape(prefix)}(\d+)$", n)
if m:
aid = int(m.group(1))
row = Account.get(aid)
st = (row.get("status") if isinstance(row, dict) else None) or "active"
if str(st).strip().lower() != "active":
skipped_disabled.append({"program": n, "account_id": aid, "status": st})
continue
except Exception:
# 解析失败/查库失败:不影响批量重启流程
pass
targets.append(n)
if include_default:
default_prog = _get_program_name()
if default_prog and default_prog not in targets and default_prog in names:
targets.append(default_prog)
if not targets:
return {
"message": "未找到可重启的交易进程",
"prefix": prefix,
"include_default": include_default,
"count": 0,
"targets": [],
"status_all": status_all,
"skipped_disabled": skipped_disabled,
}
reread_out = ""
update_out = ""
if do_update:
try:
reread_out = _run_supervisorctl(["reread"])
except Exception as e:
reread_out = f"failed: {e}"
try:
update_out = _run_supervisorctl(["update"])
except Exception as e:
update_out = f"failed: {e}"
results: list[Dict[str, Any]] = []
ok = 0
failed = 0
for prog in targets:
try:
out = _run_supervisorctl(["restart", prog])
raw = _run_supervisorctl(["status", prog])
running, pid, state = _parse_supervisor_status(raw)
results.append(
{
"program": prog,
"ok": True,
"output": out,
"status": {"running": running, "pid": pid, "state": state, "raw": raw},
}
)
ok += 1
except Exception as e:
failed += 1
results.append({"program": prog, "ok": False, "error": str(e)})
return {
"message": "已发起批量重启",
"prefix": prefix,
"include_default": include_default,
"do_update": do_update,
"count": len(targets),
"ok": ok,
"failed": failed,
"reread": reread_out,
"update": update_out,
"targets": targets,
"results": results,
"skipped_disabled": skipped_disabled,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"批量重启失败: {e}")
@router.get("/backend/status")
async def backend_status(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
"""
查看后端服务状态(当前 uvicorn 进程)。
说明:
- pid 使用 os.getpid()(当前 FastAPI 进程)
- last_restart 从 Redis 读取(若可用)
"""
meta = _system_meta_read("backend:last_restart") or {}
return {
"running": True,
"pid": os.getpid(),
"started_at_ms": _BACKEND_STARTED_AT_MS,
"started_at": _beijing_time_str(),
"meta": meta,
}
@router.post("/backend/restart")
async def backend_restart(_admin: Dict[str, Any] = Depends(require_system_admin)) -> Dict[str, Any]:
"""
重启后端服务uvicorn
实现方式:
- 后端启动脚本为 nohup uvicorn ... &
- 这里通过后台启动 backend/restart.sh 来完成:
1) grep 找到 uvicorn api.main:app 进程并 kill
2) 再执行 backend/start.sh 拉起新进程
注意:
- 为了让接口能先返回,这里会延迟 1s 再执行 restart.sh
"""
backend_dir = Path(__file__).parent.parent.parent # backend/
restart_script = backend_dir / "restart.sh"
if not restart_script.exists():
raise HTTPException(status_code=500, detail=f"找不到重启脚本: {restart_script}")
requested_at = _beijing_time_str()
requested_at_ms = int(time.time() * 1000)
cur_pid = os.getpid()
_system_meta_write(
"backend:last_restart",
{
"requested_at": requested_at,
"requested_at_ms": requested_at_ms,
"pid_before": cur_pid,
"script": str(restart_script),
},
)
# 后台执行sleep 1 后再重启,保证当前请求可以返回
cmd = ["bash", "-lc", f"sleep 1; '{restart_script}'"]
try:
subprocess.Popen(
cmd,
cwd=str(backend_dir),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"启动重启脚本失败: {e}")
return {
"message": "已发起后端重启1s 后执行)",
"pid_before": cur_pid,
"requested_at": requested_at,
"requested_at_ms": requested_at_ms,
"script": str(restart_script),
"note": "重启期间接口可能短暂不可用,页面可等待 3-5 秒后刷新状态。",
}