37 lines
1.4 KiB
Python
37 lines
1.4 KiB
Python
import logging
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import get_db
|
|
from app.models.admin_user import AdminUser
|
|
from app.services.auth import decode_access_token
|
|
|
|
logger = logging.getLogger("app.dependencies")
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
async def get_current_admin(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> AdminUser:
|
|
"""Dependency that validates JWT and returns the current admin user."""
|
|
token = credentials.credentials
|
|
payload = decode_access_token(token)
|
|
if not payload or payload.get("type", "admin") != "admin":
|
|
logger.warning("Invalid or expired token")
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Невалидный или просроченный токен")
|
|
|
|
user_id = int(payload["sub"])
|
|
result = await db.execute(select(AdminUser).where(AdminUser.id == user_id, AdminUser.is_active == True))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
logger.warning("Admin user not found or inactive: id=%d", user_id)
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден")
|
|
|
|
logger.debug("Authenticated admin: id=%d, email=%s", user.id, user.email)
|
|
return user
|