""" 登录鉴权 API(JWT) """ from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel, Field from typing import Optional, Dict, Any import os from database.models import User from api.auth_utils import verify_password, jwt_encode from api.auth_deps import get_current_user router = APIRouter(prefix="/api/auth", tags=["auth"]) class LoginReq(BaseModel): username: str = Field(..., min_length=1, max_length=64) password: str = Field(..., min_length=1, max_length=200) class LoginResp(BaseModel): access_token: str token_type: str = "bearer" user: Dict[str, Any] def _auth_enabled() -> bool: v = (os.getenv("ATS_AUTH_ENABLED") or "true").strip().lower() return v not in {"0", "false", "no"} @router.post("/login", response_model=LoginResp) async def login(payload: LoginReq): if not _auth_enabled(): raise HTTPException(status_code=400, detail="当前环境未启用登录(ATS_AUTH_ENABLED=false)") u = User.get_by_username(payload.username) if not u: raise HTTPException(status_code=401, detail="用户名或密码错误") if (u.get("status") or "active") != "active": raise HTTPException(status_code=403, detail="用户已被禁用") if not verify_password(payload.password, u.get("password_hash") or ""): raise HTTPException(status_code=401, detail="用户名或密码错误") token = jwt_encode({"sub": str(u["id"]), "role": u.get("role") or "user"}, exp_sec=24 * 3600) return { "access_token": token, "token_type": "bearer", "user": {"id": u["id"], "username": u["username"], "role": u.get("role") or "user", "status": u.get("status") or "active"}, } class MeResp(BaseModel): id: int username: str role: str status: str @router.get("/me", response_model=MeResp) async def me(user: Dict[str, Any] = Depends(get_current_user)): return { "id": int(user["id"]), "username": user.get("username") or "", "role": user.get("role") or "user", "status": user.get("status") or "active", }