auto_trade_sys/backend/api/auth_deps.py
薇薇安 4c20a7a488 a
2026-01-20 19:03:19 +08:00

93 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI 依赖:解析 JWT、获取当前用户、校验 admin、校验 account_id 访问权
"""
from __future__ import annotations
from fastapi import Header, HTTPException, Depends
from typing import Optional, Dict, Any
import os
from api.auth_utils import jwt_decode
from database.models import User, UserAccountMembership
def _auth_enabled() -> bool:
v = (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower()
return v not in {"0", "false", "no"}
def get_current_user(authorization: Optional[str] = Header(None, alias="Authorization")) -> Dict[str, Any]:
if not _auth_enabled():
# 未启用登录:视为超级管理员(兼容开发/灰度)
return {"id": 0, "username": "dev", "role": "admin", "status": "active"}
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="未登录")
token = authorization.split(" ", 1)[1].strip()
try:
payload = jwt_decode(token)
except Exception:
raise HTTPException(status_code=401, detail="登录已失效")
sub = payload.get("sub")
try:
uid = int(sub)
except Exception:
raise HTTPException(status_code=401, detail="登录已失效")
u = User.get_by_id(uid)
if not u:
raise HTTPException(status_code=401, detail="登录已失效")
if (u.get("status") or "active") != "active":
raise HTTPException(status_code=403, detail="用户已被禁用")
return {"id": int(u["id"]), "username": u.get("username") or "", "role": u.get("role") or "user", "status": u.get("status") or "active"}
def require_admin(user: Dict[str, Any]) -> Dict[str, Any]:
if (user.get("role") or "user") != "admin":
raise HTTPException(status_code=403, detail="需要管理员权限")
return user
def require_account_access(account_id: int, user: Dict[str, Any]) -> int:
aid = int(account_id or 1)
if (user.get("role") or "user") == "admin":
return aid
if UserAccountMembership.has_access(int(user["id"]), aid):
return aid
raise HTTPException(status_code=403, detail="无权访问该账号")
def get_admin_user(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
return require_admin(user)
def get_account_id(
x_account_id: Optional[int] = Header(None, alias="X-Account-Id"),
user: Dict[str, Any] = Depends(get_current_user),
) -> int:
aid = int(x_account_id or 1)
return require_account_access(aid, user)
def require_system_admin(
x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"),
user: Dict[str, Any] = Depends(get_admin_user),
) -> Dict[str, Any]:
"""
/api/system/* 管理员保护:
- 启用登录(ATS_AUTH_ENABLED=true):要求 JWT 为 admin
- 未启用登录:兼容旧逻辑,若配置了 SYSTEM_CONTROL_TOKEN则要求 X-Admin-Token
"""
if _auth_enabled():
return user
token = (os.getenv("SYSTEM_CONTROL_TOKEN") or "").strip()
if not token:
return user
if not x_admin_token or x_admin_token != token:
raise HTTPException(status_code=401, detail="Unauthorized")
return user