114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
"""
|
||
FastAPI 依赖:解析 JWT、获取当前用户、校验 admin、校验 account_id 访问权
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from fastapi import Header, HTTPException, Depends, Security
|
||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
from typing import Optional, Dict, Any
|
||
import os
|
||
|
||
from api.auth_utils import jwt_decode
|
||
from database.models import User, UserAccountMembership
|
||
|
||
|
||
def _auth_enabled() -> bool:
|
||
v = (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower()
|
||
return v not in {"0", "false", "no"}
|
||
|
||
|
||
_bearer_scheme = HTTPBearer(auto_error=False)
|
||
|
||
|
||
def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer_scheme)) -> Dict[str, Any]:
|
||
if not _auth_enabled():
|
||
# 未启用登录:视为超级管理员(兼容开发/灰度)
|
||
return {"id": 0, "username": "dev", "role": "admin", "status": "active"}
|
||
|
||
if not credentials:
|
||
raise HTTPException(status_code=401, detail="未登录")
|
||
if (credentials.scheme or "").lower() != "bearer":
|
||
raise HTTPException(status_code=401, detail="未登录")
|
||
token = (credentials.credentials or "").strip()
|
||
if not token:
|
||
raise HTTPException(status_code=401, detail="未登录")
|
||
try:
|
||
payload = jwt_decode(token)
|
||
except Exception:
|
||
raise HTTPException(status_code=401, detail="登录已失效")
|
||
|
||
sub = payload.get("sub")
|
||
try:
|
||
uid = int(sub)
|
||
except Exception:
|
||
raise HTTPException(status_code=401, detail="登录已失效")
|
||
|
||
u = User.get_by_id(uid)
|
||
if not u:
|
||
raise HTTPException(status_code=401, detail="登录已失效")
|
||
if (u.get("status") or "active") != "active":
|
||
raise HTTPException(status_code=403, detail="用户已被禁用")
|
||
return {"id": int(u["id"]), "username": u.get("username") or "", "role": u.get("role") or "user", "status": u.get("status") or "active"}
|
||
|
||
|
||
def require_admin(user: Dict[str, Any]) -> Dict[str, Any]:
|
||
if (user.get("role") or "user") != "admin":
|
||
raise HTTPException(status_code=403, detail="需要管理员权限")
|
||
return user
|
||
|
||
|
||
def require_account_access(account_id: int, user: Dict[str, Any]) -> int:
|
||
aid = int(account_id or 1)
|
||
if (user.get("role") or "user") == "admin":
|
||
return aid
|
||
if UserAccountMembership.has_access(int(user["id"]), aid):
|
||
return aid
|
||
raise HTTPException(status_code=403, detail="无权访问该账号")
|
||
|
||
|
||
def require_account_owner(account_id: int, user: Dict[str, Any]) -> int:
|
||
"""
|
||
账号“拥有者”权限:用于启停交易进程等高危操作。
|
||
"""
|
||
aid = int(account_id or 1)
|
||
if (user.get("role") or "user") == "admin":
|
||
return aid
|
||
role = UserAccountMembership.get_role(int(user["id"]), aid)
|
||
if role == "owner":
|
||
return aid
|
||
raise HTTPException(status_code=403, detail="需要该账号 owner 权限")
|
||
|
||
|
||
def get_admin_user(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
|
||
return require_admin(user)
|
||
|
||
|
||
def get_account_id(
|
||
x_account_id: Optional[int] = Header(None, alias="X-Account-Id"),
|
||
user: Dict[str, Any] = Depends(get_current_user),
|
||
) -> int:
|
||
aid = int(x_account_id or 1)
|
||
return require_account_access(aid, user)
|
||
|
||
|
||
def require_system_admin(
|
||
x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"),
|
||
user: Dict[str, Any] = Depends(get_admin_user),
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
/api/system/* 管理员保护:
|
||
- 启用登录(ATS_AUTH_ENABLED=true):要求 JWT 为 admin
|
||
- 未启用登录:兼容旧逻辑,若配置了 SYSTEM_CONTROL_TOKEN,则要求 X-Admin-Token
|
||
"""
|
||
if _auth_enabled():
|
||
return user
|
||
|
||
token = (os.getenv("SYSTEM_CONTROL_TOKEN") or "").strip()
|
||
if not token:
|
||
return user
|
||
if not x_admin_token or x_admin_token != token:
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
return user
|
||
|