126 lines
4.2 KiB
Python
126 lines
4.2 KiB
Python
"""
|
|
管理员接口:用户管理 / 授权管理
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
from api.auth_deps import get_admin_user
|
|
from api.auth_utils import hash_password
|
|
from database.models import User, UserAccountMembership, Account
|
|
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
|
|
|
|
class UserCreateReq(BaseModel):
|
|
username: str = Field(..., min_length=1, max_length=64)
|
|
password: str = Field(..., min_length=1, max_length=200)
|
|
role: str = Field("user", pattern="^(admin|user)$")
|
|
status: str = Field("active", pattern="^(active|disabled)$")
|
|
|
|
|
|
@router.get("/users")
|
|
async def list_users(_admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
return User.list_all()
|
|
|
|
|
|
@router.post("/users")
|
|
async def create_user(payload: UserCreateReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
exists = User.get_by_username(payload.username)
|
|
if exists:
|
|
raise HTTPException(status_code=400, detail="用户名已存在")
|
|
uid = User.create(
|
|
username=payload.username,
|
|
password_hash=hash_password(payload.password),
|
|
role=payload.role,
|
|
status=payload.status,
|
|
)
|
|
return {"success": True, "id": int(uid)}
|
|
|
|
|
|
class UserPasswordReq(BaseModel):
|
|
password: str = Field(..., min_length=1, max_length=200)
|
|
|
|
|
|
@router.put("/users/{user_id}/password")
|
|
async def set_user_password(user_id: int, payload: UserPasswordReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
u = User.get_by_id(int(user_id))
|
|
if not u:
|
|
raise HTTPException(status_code=404, detail="用户不存在")
|
|
User.set_password(int(user_id), hash_password(payload.password))
|
|
return {"success": True}
|
|
|
|
|
|
class UserRoleReq(BaseModel):
|
|
role: str = Field(..., pattern="^(admin|user)$")
|
|
|
|
|
|
@router.put("/users/{user_id}/role")
|
|
async def set_user_role(user_id: int, payload: UserRoleReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
u = User.get_by_id(int(user_id))
|
|
if not u:
|
|
raise HTTPException(status_code=404, detail="用户不存在")
|
|
User.set_role(int(user_id), payload.role)
|
|
return {"success": True}
|
|
|
|
|
|
class UserStatusReq(BaseModel):
|
|
status: str = Field(..., pattern="^(active|disabled)$")
|
|
|
|
|
|
@router.put("/users/{user_id}/status")
|
|
async def set_user_status(user_id: int, payload: UserStatusReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
u = User.get_by_id(int(user_id))
|
|
if not u:
|
|
raise HTTPException(status_code=404, detail="用户不存在")
|
|
User.set_status(int(user_id), payload.status)
|
|
return {"success": True}
|
|
|
|
|
|
@router.get("/users/{user_id}/accounts")
|
|
async def list_user_accounts(user_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
u = User.get_by_id(int(user_id))
|
|
if not u:
|
|
raise HTTPException(status_code=404, detail="用户不存在")
|
|
memberships = UserAccountMembership.list_for_user(int(user_id))
|
|
# 追加账号名称(便于前端展示)
|
|
out = []
|
|
for m in memberships or []:
|
|
aid = int(m.get("account_id"))
|
|
a = Account.get(aid) or {}
|
|
out.append(
|
|
{
|
|
"user_id": int(m.get("user_id")),
|
|
"account_id": aid,
|
|
"role": m.get("role") or "viewer",
|
|
"account_name": a.get("name") or "",
|
|
"account_status": a.get("status") or "",
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
class GrantReq(BaseModel):
|
|
role: str = Field("viewer", pattern="^(owner|viewer)$")
|
|
|
|
|
|
@router.put("/users/{user_id}/accounts/{account_id}")
|
|
async def grant_user_account(user_id: int, account_id: int, payload: GrantReq, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
u = User.get_by_id(int(user_id))
|
|
if not u:
|
|
raise HTTPException(status_code=404, detail="用户不存在")
|
|
a = Account.get(int(account_id))
|
|
if not a:
|
|
raise HTTPException(status_code=404, detail="账号不存在")
|
|
UserAccountMembership.add(int(user_id), int(account_id), role=payload.role)
|
|
return {"success": True}
|
|
|
|
|
|
@router.delete("/users/{user_id}/accounts/{account_id}")
|
|
async def revoke_user_account(user_id: int, account_id: int, _admin: Dict[str, Any] = Depends(get_admin_user)):
|
|
UserAccountMembership.remove(int(user_id), int(account_id))
|
|
return {"success": True}
|
|
|