auto_trade_sys/backend/api/auth_deps.py
2026-01-20 15:55:34 +08:00

73 lines
2.5 KiB
Python

"""
FastAPI 依赖:解析 JWT、获取当前用户、校验 admin、校验 account_id 访问权
"""
from __future__ import annotations
from fastapi import Header, HTTPException, Depends
from typing import Optional, Dict, Any
import os
from api.auth_utils import jwt_decode
from database.models import User, UserAccountMembership
def _auth_enabled() -> bool:
v = (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower()
return v not in {"0", "false", "no"}
def get_current_user(authorization: Optional[str] = Header(None, alias="Authorization")) -> Dict[str, Any]:
if not _auth_enabled():
# 未启用登录:视为超级管理员(兼容开发/灰度)
return {"id": 0, "username": "dev", "role": "admin", "status": "active"}
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="未登录")
token = authorization.split(" ", 1)[1].strip()
try:
payload = jwt_decode(token)
except Exception:
raise HTTPException(status_code=401, detail="登录已失效")
sub = payload.get("sub")
try:
uid = int(sub)
except Exception:
raise HTTPException(status_code=401, detail="登录已失效")
u = User.get_by_id(uid)
if not u:
raise HTTPException(status_code=401, detail="登录已失效")
if (u.get("status") or "active") != "active":
raise HTTPException(status_code=403, detail="用户已被禁用")
return {"id": int(u["id"]), "username": u.get("username") or "", "role": u.get("role") or "user", "status": u.get("status") or "active"}
def require_admin(user: Dict[str, Any]) -> Dict[str, Any]:
if (user.get("role") or "user") != "admin":
raise HTTPException(status_code=403, detail="需要管理员权限")
return user
def require_account_access(account_id: int, user: Dict[str, Any]) -> int:
aid = int(account_id or 1)
if (user.get("role") or "user") == "admin":
return aid
if UserAccountMembership.has_access(int(user["id"]), aid):
return aid
raise HTTPException(status_code=403, detail="无权访问该账号")
def get_admin_user(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
return require_admin(user)
def get_account_id(
x_account_id: Optional[int] = Header(None, alias="X-Account-Id"),
user: Dict[str, Any] = Depends(get_current_user),
) -> int:
aid = int(x_account_id or 1)
return require_account_access(aid, user)