from datetime import timedelta from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from jose import JWTError, jwt from repos.user_repo import UsersRepo, UserStatus from utils.security import verify_password, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES, ALGORITHM, SECRET_KEY from starlette.requests import Request router = APIRouter(prefix="/api/auth", tags=["auth"]) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") async def get_users_repo(request: Request) -> UsersRepo: if not hasattr(request.app.state, "users_repo"): raise HTTPException(status_code=500, detail="Users repo not initialized") return request.app.state.users_repo async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], repo: Annotated[UsersRepo, Depends(get_users_repo)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = await repo.get_user_by_username(username) if user is None: raise credentials_exception return user async def get_current_admin(user: Annotated[dict, Depends(get_current_user)]): if not user.get("is_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) return user class UserRegister(BaseModel): username: str password: str full_name: str | None = None class Token(BaseModel): access_token: str token_type: str class UserResponse(BaseModel): id: str username: str full_name: str | None = None status: str is_admin: bool = False @router.get("/me", response_model=UserResponse) async def read_users_me(current_user: Annotated[dict, Depends(get_current_user)]): return current_user @router.post("/register") async def register(user_data: UserRegister, repo: Annotated[UsersRepo, Depends(get_users_repo)]): try: await repo.create_user( username=user_data.username, password=user_data.password, full_name=user_data.full_name ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return {"message": "Registration successful. Please wait for administrator approval."} @router.post("/token", response_model=Token) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], repo: Annotated[UsersRepo, Depends(get_users_repo)] ): user = await repo.get_user_by_username(form_data.username) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Проверяем пароль if not verify_password(form_data.password, user["hashed_password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Проверка статуса if user.get("status") != UserStatus.ALLOWED: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is not approved yet. Please contact administrator.", ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user["username"]}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}