""" 账号管理 API(多账号) 说明: - 这是“多账号第一步”的管理入口:创建/禁用/更新密钥 - 交易/配置/统计接口通过 X-Account-Id 头来选择账号(默认 1) """ from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any import logging from database.models import Account, UserAccountMembership from api.auth_deps import get_current_user, get_admin_user, require_account_access, require_account_owner from api.supervisor_account import ( ensure_account_program, run_supervisorctl, parse_supervisor_status, program_name_for_account, tail_supervisor, tail_supervisord_log, tail_trading_log_files, ) logger = logging.getLogger(__name__) router = APIRouter() class AccountCreate(BaseModel): name: str = Field(..., min_length=1, max_length=100) api_key: Optional[str] = "" api_secret: Optional[str] = "" use_testnet: bool = False status: str = Field("active", pattern="^(active|disabled)$") class AccountUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=100) status: Optional[str] = Field(None, pattern="^(active|disabled)$") use_testnet: Optional[bool] = None class AccountCredentialsUpdate(BaseModel): api_key: Optional[str] = None api_secret: Optional[str] = None use_testnet: Optional[bool] = None def _mask(s: str) -> str: s = "" if s is None else str(s) if not s: return "" if len(s) <= 8: return "****" return f"{s[:4]}...{s[-4:]}" def _ensure_account_active_for_start(account_id: int): row = Account.get(int(account_id)) if not row: raise HTTPException(status_code=404, detail="账号不存在") status = (row.get("status") or "active").strip().lower() if status != "active": raise HTTPException(status_code=400, detail="账号已禁用,不能启动/重启交易进程") @router.get("") @router.get("/") async def list_accounts(user: Dict[str, Any] = Depends(get_current_user)) -> List[Dict[str, Any]]: try: is_admin = (user.get("role") or "user") == "admin" out: List[Dict[str, Any]] = [] if is_admin: rows = Account.list_all() for r in rows or []: aid = int(r.get("id")) api_key, api_secret, use_testnet = Account.get_credentials(aid) out.append( { "id": aid, "name": r.get("name") or "", "status": r.get("status") or "active", "use_testnet": bool(use_testnet), "has_api_key": bool(api_key), "has_api_secret": bool(api_secret), "api_key_masked": _mask(api_key), } ) return out memberships = UserAccountMembership.list_for_user(int(user["id"])) membership_map = {int(m.get("account_id")): (m.get("role") or "viewer") for m in (memberships or []) if m.get("account_id") is not None} account_ids = list(membership_map.keys()) for aid in account_ids: r = Account.get(int(aid)) if not r: continue # 普通用户:不返回密钥明文,但返回“是否已配置”的状态,方便前端提示 api_key, api_secret, use_testnet = Account.get_credentials(int(aid)) out.append( { "id": int(aid), "name": r.get("name") or "", "status": r.get("status") or "active", "use_testnet": bool(use_testnet), "role": membership_map.get(int(aid), "viewer"), "has_api_key": bool(api_key), "has_api_secret": bool(api_secret), } ) return out except Exception as e: raise HTTPException(status_code=500, detail=f"获取账号列表失败: {e}") @router.post("") @router.post("/") async def create_account(payload: AccountCreate, _admin: Dict[str, Any] = Depends(get_admin_user)): try: aid = Account.create( name=payload.name, api_key=payload.api_key or "", api_secret=payload.api_secret or "", use_testnet=bool(payload.use_testnet), status=payload.status, ) # 自动为该账号生成 supervisor program 配置(失败不影响账号创建) sup = ensure_account_program(int(aid)) return { "success": True, "id": int(aid), "message": "账号已创建", "supervisor": { "ok": bool(sup.ok), "program": sup.program, "program_dir": sup.program_dir, "ini_path": sup.ini_path, "supervisor_conf": sup.supervisor_conf, "reread": sup.reread, "update": sup.update, "error": sup.error, "note": "如需自动启停:请确保 backend 进程有写入 program_dir 权限,并允许执行 supervisorctl(可选 sudo -n)。", }, } except Exception as e: raise HTTPException(status_code=500, detail=f"创建账号失败: {e}") @router.put("/{account_id}") async def update_account(account_id: int, payload: AccountUpdate, _admin: Dict[str, Any] = Depends(get_admin_user)): try: row = Account.get(int(account_id)) if not row: raise HTTPException(status_code=404, detail="账号不存在") # name/status fields = [] params = [] if payload.name is not None: fields.append("name = %s") params.append(payload.name) if payload.status is not None: fields.append("status = %s") params.append(payload.status) if payload.use_testnet is not None: fields.append("use_testnet = %s") params.append(bool(payload.use_testnet)) if fields: params.append(int(account_id)) from database.connection import db db.execute_update(f"UPDATE accounts SET {', '.join(fields)} WHERE id = %s", tuple(params)) return {"success": True, "message": "账号已更新"} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"更新账号失败: {e}") @router.put("/{account_id}/credentials") async def update_credentials(account_id: int, payload: AccountCredentialsUpdate, user: Dict[str, Any] = Depends(get_current_user)): try: if (user.get("role") or "user") != "admin": require_account_owner(int(account_id), user) row = Account.get(int(account_id)) if not row: raise HTTPException(status_code=404, detail="账号不存在") Account.update_credentials( int(account_id), api_key=payload.api_key, api_secret=payload.api_secret, use_testnet=payload.use_testnet, ) return {"success": True, "message": "账号密钥已更新(建议重启该账号交易进程)"} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"更新账号密钥失败: {e}") @router.post("/{account_id}/trading/ensure-program") async def ensure_trading_program(account_id: int, user: Dict[str, Any] = Depends(get_current_user)): if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") # 允许管理员或该账号 owner 执行(owner 用于“我重建配置再启动”) if (user.get("role") or "user") != "admin": require_account_owner(int(account_id), user) sup = ensure_account_program(int(account_id)) if not sup.ok: raise HTTPException(status_code=500, detail=sup.error or "生成 supervisor 配置失败") return { "ok": True, "program": sup.program, "ini_path": sup.ini_path, "program_dir": sup.program_dir, "supervisor_conf": sup.supervisor_conf, "reread": sup.reread, "update": sup.update, } @router.get("/{account_id}/trading/status") async def trading_status_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)): # 有访问权即可查看状态 if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") require_account_access(int(account_id), user) program = program_name_for_account(int(account_id)) try: raw = run_supervisorctl(["status", program]) running, pid, state = parse_supervisor_status(raw) # 仅 owner/admin 可看 tail(便于自助排障) stderr_tail = "" stdout_tail = "" stderr_tail_error = "" supervisord_tail = "" logfile_tail: Dict[str, Any] = {} try: is_admin = (user.get("role") or "user") == "admin" role = UserAccountMembership.get_role(int(user["id"]), int(account_id)) if not is_admin else "admin" if is_admin or role == "owner": if state in {"FATAL", "EXITED", "BACKOFF"}: stderr_tail = tail_supervisor(program, "stderr", 120) stdout_tail = tail_supervisor(program, "stdout", 200) try: logfile_tail = tail_trading_log_files(int(account_id), lines=200) except Exception: logfile_tail = {} if not stderr_tail: try: supervisord_tail = tail_supervisord_log(80) except Exception: supervisord_tail = "" except Exception as te: stderr_tail_error = str(te) stderr_tail = "" stdout_tail = "" # spawn error 时 program stderr 可能为空,尝试给 supervisord 主日志做兜底 try: supervisord_tail = tail_supervisord_log(80) except Exception: supervisord_tail = "" resp = {"program": program, "running": running, "pid": pid, "state": state, "raw": raw} if stderr_tail: resp["stderr_tail"] = stderr_tail if stdout_tail: resp["stdout_tail"] = stdout_tail if logfile_tail: resp["logfile_tail"] = logfile_tail if stderr_tail_error: resp["stderr_tail_error"] = stderr_tail_error if supervisord_tail: resp["supervisord_tail"] = supervisord_tail return resp except Exception as e: raise HTTPException(status_code=500, detail=f"读取交易进程状态失败: {e}") @router.get("/{account_id}/trading/tail") async def trading_tail_for_account( account_id: int, stream: str = "stderr", lines: int = 200, user: Dict[str, Any] = Depends(get_current_user), ): """ 读取该账号交易进程日志尾部(用于排障)。仅 owner/admin 可读。 """ if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") require_account_owner(int(account_id), user) program = program_name_for_account(int(account_id)) try: out = tail_supervisor(program, stream=stream, lines=lines) return {"program": program, "stream": stream, "lines": lines, "tail": out} except Exception as e: raise HTTPException(status_code=500, detail=f"读取交易进程日志失败: {e}") @router.post("/{account_id}/trading/start") async def trading_start_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)): if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") require_account_owner(int(account_id), user) _ensure_account_active_for_start(int(account_id)) program = program_name_for_account(int(account_id)) try: out = run_supervisorctl(["start", program]) raw = run_supervisorctl(["status", program]) running, pid, state = parse_supervisor_status(raw) return {"message": "已启动", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}} except Exception as e: tail = "" out_tail = "" tail_err = "" status_raw = "" supervisord_tail = "" logfile_tail: Dict[str, Any] = {} try: tail = tail_supervisor(program, "stderr", 120) except Exception as te: tail_err = str(te) tail = "" try: out_tail = tail_supervisor(program, "stdout", 200) except Exception: out_tail = "" try: logfile_tail = tail_trading_log_files(int(account_id), lines=200) except Exception: logfile_tail = {} try: status_raw = run_supervisorctl(["status", program]) except Exception: status_raw = "" if not tail: try: supervisord_tail = tail_supervisord_log(120) except Exception: supervisord_tail = "" raise HTTPException( status_code=500, detail={ "error": f"启动交易进程失败: {e}", "program": program, "hint": "如果是 spawn error,重点看 stderr_tail(常见:python 路径不可执行/依赖缺失/权限/工作目录不存在)", "stderr_tail": tail, "stdout_tail": out_tail, "logfile_tail": logfile_tail, "stderr_tail_error": tail_err, "status_raw": status_raw, "supervisord_tail": supervisord_tail, }, ) @router.post("/{account_id}/trading/stop") async def trading_stop_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)): if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") require_account_owner(int(account_id), user) program = program_name_for_account(int(account_id)) try: out = run_supervisorctl(["stop", program]) raw = run_supervisorctl(["status", program]) running, pid, state = parse_supervisor_status(raw) return {"message": "已停止", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}} except Exception as e: raise HTTPException(status_code=500, detail=f"停止交易进程失败: {e}") @router.post("/{account_id}/trading/restart") async def trading_restart_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)): if int(account_id) <= 0: raise HTTPException(status_code=400, detail="account_id 必须 >= 1") require_account_owner(int(account_id), user) _ensure_account_active_for_start(int(account_id)) program = program_name_for_account(int(account_id)) try: out = run_supervisorctl(["restart", program]) raw = run_supervisorctl(["status", program]) running, pid, state = parse_supervisor_status(raw) return {"message": "已重启", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}} except Exception as e: tail = "" out_tail = "" tail_err = "" status_raw = "" supervisord_tail = "" logfile_tail: Dict[str, Any] = {} try: tail = tail_supervisor(program, "stderr", 120) except Exception as te: tail_err = str(te) tail = "" try: out_tail = tail_supervisor(program, "stdout", 200) except Exception: out_tail = "" try: logfile_tail = tail_trading_log_files(int(account_id), lines=200) except Exception: logfile_tail = {} try: status_raw = run_supervisorctl(["status", program]) except Exception: status_raw = "" if not tail: try: supervisord_tail = tail_supervisord_log(120) except Exception: supervisord_tail = "" raise HTTPException( status_code=500, detail={ "error": f"重启交易进程失败: {e}", "program": program, "hint": "如果是 spawn error,重点看 stderr_tail(常见:python 路径不可执行/依赖缺失/权限/工作目录不存在)", "stderr_tail": tail, "stdout_tail": out_tail, "logfile_tail": logfile_tail, "stderr_tail_error": tail_err, "status_raw": status_raw, "supervisord_tail": supervisord_tail, }, )