auto_trade_sys/backend/api/routes/accounts.py
薇薇安 e80fd1059b a
2026-01-21 21:26:44 +08:00

336 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
账号管理 API多账号
说明:
- 这是“多账号第一步”的管理入口:创建/禁用/更新密钥
- 交易/配置/统计接口通过 X-Account-Id 头来选择账号(默认 1
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
import logging
from database.models import Account, UserAccountMembership
from api.auth_deps import get_current_user, get_admin_user, require_account_access, require_account_owner
from api.supervisor_account import (
ensure_account_program,
run_supervisorctl,
parse_supervisor_status,
program_name_for_account,
tail_supervisor,
)
logger = logging.getLogger(__name__)
router = APIRouter()
class AccountCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
api_key: Optional[str] = ""
api_secret: Optional[str] = ""
use_testnet: bool = False
status: str = Field("active", pattern="^(active|disabled)$")
class AccountUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
status: Optional[str] = Field(None, pattern="^(active|disabled)$")
use_testnet: Optional[bool] = None
class AccountCredentialsUpdate(BaseModel):
api_key: Optional[str] = None
api_secret: Optional[str] = None
use_testnet: Optional[bool] = None
def _mask(s: str) -> str:
s = "" if s is None else str(s)
if not s:
return ""
if len(s) <= 8:
return "****"
return f"{s[:4]}...{s[-4:]}"
@router.get("")
@router.get("/")
async def list_accounts(user: Dict[str, Any] = Depends(get_current_user)) -> List[Dict[str, Any]]:
try:
is_admin = (user.get("role") or "user") == "admin"
out: List[Dict[str, Any]] = []
if is_admin:
rows = Account.list_all()
for r in rows or []:
aid = int(r.get("id"))
api_key, api_secret, use_testnet = Account.get_credentials(aid)
out.append(
{
"id": aid,
"name": r.get("name") or "",
"status": r.get("status") or "active",
"use_testnet": bool(use_testnet),
"has_api_key": bool(api_key),
"has_api_secret": bool(api_secret),
"api_key_masked": _mask(api_key),
}
)
return out
memberships = UserAccountMembership.list_for_user(int(user["id"]))
membership_map = {int(m.get("account_id")): (m.get("role") or "viewer") for m in (memberships or []) if m.get("account_id") is not None}
account_ids = list(membership_map.keys())
for aid in account_ids:
r = Account.get(int(aid))
if not r:
continue
# 普通用户:不返回密钥明文,但返回“是否已配置”的状态,方便前端提示
api_key, api_secret, use_testnet = Account.get_credentials(int(aid))
out.append(
{
"id": int(aid),
"name": r.get("name") or "",
"status": r.get("status") or "active",
"use_testnet": bool(use_testnet),
"role": membership_map.get(int(aid), "viewer"),
"has_api_key": bool(api_key),
"has_api_secret": bool(api_secret),
}
)
return out
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取账号列表失败: {e}")
@router.post("")
@router.post("/")
async def create_account(payload: AccountCreate, _admin: Dict[str, Any] = Depends(get_admin_user)):
try:
aid = Account.create(
name=payload.name,
api_key=payload.api_key or "",
api_secret=payload.api_secret or "",
use_testnet=bool(payload.use_testnet),
status=payload.status,
)
# 自动为该账号生成 supervisor program 配置(失败不影响账号创建)
sup = ensure_account_program(int(aid))
return {
"success": True,
"id": int(aid),
"message": "账号已创建",
"supervisor": {
"ok": bool(sup.ok),
"program": sup.program,
"program_dir": sup.program_dir,
"ini_path": sup.ini_path,
"supervisor_conf": sup.supervisor_conf,
"reread": sup.reread,
"update": sup.update,
"error": sup.error,
"note": "如需自动启停:请确保 backend 进程有写入 program_dir 权限,并允许执行 supervisorctl可选 sudo -n",
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"创建账号失败: {e}")
@router.put("/{account_id}")
async def update_account(account_id: int, payload: AccountUpdate, _admin: Dict[str, Any] = Depends(get_admin_user)):
try:
row = Account.get(int(account_id))
if not row:
raise HTTPException(status_code=404, detail="账号不存在")
# name/status
fields = []
params = []
if payload.name is not None:
fields.append("name = %s")
params.append(payload.name)
if payload.status is not None:
fields.append("status = %s")
params.append(payload.status)
if payload.use_testnet is not None:
fields.append("use_testnet = %s")
params.append(bool(payload.use_testnet))
if fields:
params.append(int(account_id))
from database.connection import db
db.execute_update(f"UPDATE accounts SET {', '.join(fields)} WHERE id = %s", tuple(params))
return {"success": True, "message": "账号已更新"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"更新账号失败: {e}")
@router.put("/{account_id}/credentials")
async def update_credentials(account_id: int, payload: AccountCredentialsUpdate, user: Dict[str, Any] = Depends(get_current_user)):
try:
if (user.get("role") or "user") != "admin":
require_account_owner(int(account_id), user)
row = Account.get(int(account_id))
if not row:
raise HTTPException(status_code=404, detail="账号不存在")
Account.update_credentials(
int(account_id),
api_key=payload.api_key,
api_secret=payload.api_secret,
use_testnet=payload.use_testnet,
)
return {"success": True, "message": "账号密钥已更新(建议重启该账号交易进程)"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"更新账号密钥失败: {e}")
@router.post("/{account_id}/trading/ensure-program")
async def ensure_trading_program(account_id: int, user: Dict[str, Any] = Depends(get_current_user)):
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
# 允许管理员或该账号 owner 执行owner 用于“我重建配置再启动”)
if (user.get("role") or "user") != "admin":
require_account_owner(int(account_id), user)
sup = ensure_account_program(int(account_id))
if not sup.ok:
raise HTTPException(status_code=500, detail=sup.error or "生成 supervisor 配置失败")
return {
"ok": True,
"program": sup.program,
"ini_path": sup.ini_path,
"program_dir": sup.program_dir,
"supervisor_conf": sup.supervisor_conf,
"reread": sup.reread,
"update": sup.update,
}
@router.get("/{account_id}/trading/status")
async def trading_status_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)):
# 有访问权即可查看状态
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
require_account_access(int(account_id), user)
program = program_name_for_account(int(account_id))
try:
raw = run_supervisorctl(["status", program])
running, pid, state = parse_supervisor_status(raw)
# 仅 owner/admin 可看 tail便于自助排障
stderr_tail = ""
try:
is_admin = (user.get("role") or "user") == "admin"
role = UserAccountMembership.get_role(int(user["id"]), int(account_id)) if not is_admin else "admin"
if is_admin or role == "owner":
if state in {"FATAL", "EXITED", "BACKOFF"}:
stderr_tail = tail_supervisor(program, "stderr", 120)
except Exception:
stderr_tail = ""
resp = {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}
if stderr_tail:
resp["stderr_tail"] = stderr_tail
return resp
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取交易进程状态失败: {e}")
@router.get("/{account_id}/trading/tail")
async def trading_tail_for_account(
account_id: int,
stream: str = "stderr",
lines: int = 200,
user: Dict[str, Any] = Depends(get_current_user),
):
"""
读取该账号交易进程日志尾部(用于排障)。仅 owner/admin 可读。
"""
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
require_account_owner(int(account_id), user)
program = program_name_for_account(int(account_id))
try:
out = tail_supervisor(program, stream=stream, lines=lines)
return {"program": program, "stream": stream, "lines": lines, "tail": out}
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取交易进程日志失败: {e}")
@router.post("/{account_id}/trading/start")
async def trading_start_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)):
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
require_account_owner(int(account_id), user)
program = program_name_for_account(int(account_id))
try:
out = run_supervisorctl(["start", program])
raw = run_supervisorctl(["status", program])
running, pid, state = parse_supervisor_status(raw)
return {"message": "已启动", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}}
except Exception as e:
tail = ""
try:
tail = tail_supervisor(program, "stderr", 120)
except Exception:
tail = ""
raise HTTPException(
status_code=500,
detail={
"error": f"启动交易进程失败: {e}",
"program": program,
"hint": "如果是 spawn error重点看 stderr_tail常见python 路径不可执行/依赖缺失/权限/工作目录不存在)",
"stderr_tail": tail,
},
)
@router.post("/{account_id}/trading/stop")
async def trading_stop_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)):
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
require_account_owner(int(account_id), user)
program = program_name_for_account(int(account_id))
try:
out = run_supervisorctl(["stop", program])
raw = run_supervisorctl(["status", program])
running, pid, state = parse_supervisor_status(raw)
return {"message": "已停止", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}}
except Exception as e:
raise HTTPException(status_code=500, detail=f"停止交易进程失败: {e}")
@router.post("/{account_id}/trading/restart")
async def trading_restart_for_account(account_id: int, user: Dict[str, Any] = Depends(get_current_user)):
if int(account_id) <= 0:
raise HTTPException(status_code=400, detail="account_id 必须 >= 1")
require_account_owner(int(account_id), user)
program = program_name_for_account(int(account_id))
try:
out = run_supervisorctl(["restart", program])
raw = run_supervisorctl(["status", program])
running, pid, state = parse_supervisor_status(raw)
return {"message": "已重启", "output": out, "status": {"program": program, "running": running, "pid": pid, "state": state, "raw": raw}}
except Exception as e:
tail = ""
try:
tail = tail_supervisor(program, "stderr", 120)
except Exception:
tail = ""
raise HTTPException(
status_code=500,
detail={
"error": f"重启交易进程失败: {e}",
"program": program,
"hint": "如果是 spawn error重点看 stderr_tail常见python 路径不可执行/依赖缺失/权限/工作目录不存在)",
"stderr_tail": tail,
},
)