import pytest from fastapi.testclient import TestClient from unittest.mock import AsyncMock, MagicMock from datetime import datetime from main import app from api.endpoints.auth import get_users_repo from repos.user_repo import UsersRepo, UserStatus from utils.security import get_password_hash # Mock Repository class MockUsersRepo: def __init__(self): self.users = {} async def get_user_by_username(self, username: str): return self.users.get(username) async def create_user(self, username: str, password: str, full_name: str = None): if username in self.users: raise ValueError("User with this username already exists") user = { "username": username, "hashed_password": get_password_hash(password), "full_name": full_name, "status": UserStatus.PENDING, "is_email_user": False, "is_admin": False, "created_at": datetime.now() } self.users[username] = user return user async def get_pending_users(self): return [u for u in self.users.values() if u["status"] == UserStatus.PENDING] async def approve_user(self, username: str): if username in self.users: self.users[username]["status"] = UserStatus.ALLOWED async def deny_user(self, username: str): if username in self.users: self.users[username]["status"] = UserStatus.DENIED mock_repo = MockUsersRepo() # Override Dependency app.dependency_overrides[get_users_repo] = lambda: mock_repo client = TestClient(app) def test_auth_flow_with_approval(): # 1. Register (User) user_data = { "username": "newuser", "password": "password123", "full_name": "New User" } response = client.post("/auth/register", json=user_data) assert response.status_code == 200 assert response.json()["message"] == "Registration successful. Please wait for administrator approval." # 2. Try Login (User) -> Should Fail (Pending) login_data = { "username": "newuser", "password": "password123" } response = client.post("/auth/token", data=login_data) assert response.status_code == 403 assert "not approved" in response.json()["detail"] # 3. Setup Admin (Backdoor for test) mock_repo.users["admin"] = { "username": "admin", "hashed_password": get_password_hash("adminpass"), "status": UserStatus.ALLOWED, "is_admin": True, "created_at": datetime.now() } # 4. Admin Login admin_login = { "username": "admin", "password": "adminpass" } response = client.post("/auth/token", data=admin_login) assert response.status_code == 200 admin_token = response.json()["access_token"] admin_auth = {"Authorization": f"Bearer {admin_token}"} # 5. List Pending (Admin) response = client.get("/admin/approvals", headers=admin_auth) assert response.status_code == 200 users = response.json() assert len(users) >= 1 assert users[0]["username"] == "newuser" assert users[0]["status"] == "pending" # 6. Approve User (Admin) response = client.post("/admin/approve/newuser", headers=admin_auth) assert response.status_code == 200 assert response.json()["message"] == "User newuser approved" # 7. Login User (Again) -> Should Success response = client.post("/auth/token", data=login_data) assert response.status_code == 200 assert "access_token" in response.json()