from typing import Annotated, List from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel from repos.user_repo import UsersRepo, UserStatus from api.dependency import get_dao from repos.dao import DAO from utils.security import verify_password, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES, ALGORITHM, SECRET_KEY from jose import JWTError, jwt from starlette.requests import Request router = APIRouter(prefix="/api/admin", tags=["admin"]) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") from api.endpoints.auth import get_users_repo async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], repo: Annotated[UsersRepo, Depends(get_users_repo)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str | None = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = await repo.get_user_by_username(username) if user is None: raise credentials_exception return user async def get_current_admin(user: Annotated[dict, Depends(get_current_user)]): if not user.get("is_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) return user class UserResponse(BaseModel): username: str full_name: str | None = None status: str created_at: str | None = None is_admin: bool class Config: from_attributes = True @router.get("/approvals", response_model=List[UserResponse]) async def list_pending_users( admin: Annotated[dict, Depends(get_current_admin)], repo: Annotated[UsersRepo, Depends(get_users_repo)] ): users = await repo.get_pending_users() # Pydantic conversion handles the list of dicts return [ UserResponse( username=u["username"], full_name=u.get("full_name"), status=u["status"], created_at=str(u.get("created_at")), is_admin=u.get("is_admin", False) ) for u in users ] @router.post("/approve/{username}") async def approve_user( username: str, admin: Annotated[dict, Depends(get_current_admin)], repo: Annotated[UsersRepo, Depends(get_users_repo)] ): user = await repo.get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") await repo.approve_user(username) return {"message": f"User {username} approved"} @router.post("/deny/{username}") async def deny_user( username: str, admin: Annotated[dict, Depends(get_current_admin)], repo: Annotated[UsersRepo, Depends(get_users_repo)] ): user = await repo.get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") await repo.deny_user(username) return {"message": f"User {username} denied"}