""" FastAPI 依赖:解析 JWT、获取当前用户、校验 admin、校验 account_id 访问权 """ from __future__ import annotations from fastapi import Header, HTTPException, Depends, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional, Dict, Any import os from api.auth_utils import jwt_decode from database.models import User, UserAccountMembership def _auth_enabled() -> bool: v = (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower() return v not in {"0", "false", "no"} _bearer_scheme = HTTPBearer(auto_error=False) def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer_scheme)) -> Dict[str, Any]: if not _auth_enabled(): # 未启用登录:视为超级管理员(兼容开发/灰度) return {"id": 0, "username": "dev", "role": "admin", "status": "active"} if not credentials: raise HTTPException(status_code=401, detail="未登录") if (credentials.scheme or "").lower() != "bearer": raise HTTPException(status_code=401, detail="未登录") token = (credentials.credentials or "").strip() if not token: raise HTTPException(status_code=401, detail="未登录") try: payload = jwt_decode(token) except Exception: raise HTTPException(status_code=401, detail="登录已失效") sub = payload.get("sub") try: uid = int(sub) except Exception: raise HTTPException(status_code=401, detail="登录已失效") u = User.get_by_id(uid) if not u: raise HTTPException(status_code=401, detail="登录已失效") if (u.get("status") or "active") != "active": raise HTTPException(status_code=403, detail="用户已被禁用") return {"id": int(u["id"]), "username": u.get("username") or "", "role": u.get("role") or "user", "status": u.get("status") or "active"} def require_admin(user: Dict[str, Any]) -> Dict[str, Any]: if (user.get("role") or "user") != "admin": raise HTTPException(status_code=403, detail="需要管理员权限") return user def require_account_access(account_id: int, user: Dict[str, Any]) -> int: aid = int(account_id or 1) if (user.get("role") or "user") == "admin": return aid if UserAccountMembership.has_access(int(user["id"]), aid): return aid raise HTTPException(status_code=403, detail="无权访问该账号") def require_account_owner(account_id: int, user: Dict[str, Any]) -> int: """ 账号“拥有者”权限:用于启停交易进程等高危操作。 """ aid = int(account_id or 1) if (user.get("role") or "user") == "admin": return aid role = UserAccountMembership.get_role(int(user["id"]), aid) if role == "owner": return aid raise HTTPException(status_code=403, detail="需要该账号 owner 权限") def get_admin_user(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]: return require_admin(user) def get_account_id( x_account_id: Optional[int] = Header(None, alias="X-Account-Id"), user: Dict[str, Any] = Depends(get_current_user), ) -> int: import logging logger = logging.getLogger(__name__) # 注意:x_account_id 可能是 None,需要处理 raw_header_value = x_account_id aid = int(x_account_id or 1) logger.info(f"get_account_id: X-Account-Id header={raw_header_value}, parsed account_id={aid}, user_id={user.get('id')}, username={user.get('username')}") result = require_account_access(aid, user) logger.info(f"get_account_id: 最终返回 account_id={result}") return result def require_system_admin( x_admin_token: Optional[str] = Header(default=None, alias="X-Admin-Token"), user: Dict[str, Any] = Depends(get_admin_user), ) -> Dict[str, Any]: """ /api/system/* 管理员保护: - 启用登录(ATS_AUTH_ENABLED=true):要求 JWT 为 admin - 未启用登录:兼容旧逻辑,若配置了 SYSTEM_CONTROL_TOKEN,则要求 X-Admin-Token """ if _auth_enabled(): return user token = (os.getenv("SYSTEM_CONTROL_TOKEN") or "").strip() if not token: return user if not x_admin_token or x_admin_token != token: raise HTTPException(status_code=401, detail="Unauthorized") return user