42 lines
1.2 KiB
Python
42 lines
1.2 KiB
Python
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import bcrypt
|
|
import jwt
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger("app.services.auth")
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
|
|
|
|
def create_access_token(user_id: int, email: str) -> str:
|
|
expire = datetime.now(timezone.utc) + timedelta(hours=settings.JWT_EXPIRE_HOURS)
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"email": email,
|
|
"exp": expire,
|
|
}
|
|
token = jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
|
|
logger.info("Created JWT for user_id=%d, email=%s, expires=%s", user_id, email, expire)
|
|
return token
|
|
|
|
|
|
def decode_access_token(token: str) -> dict | None:
|
|
try:
|
|
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("JWT expired")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning("Invalid JWT: %s", e)
|
|
return None
|