108 lines
3.5 KiB
Python
108 lines
3.5 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from datetime import datetime
|
|
from main import app
|
|
from api.endpoints.auth import get_users_repo
|
|
from repos.user_repo import UsersRepo, UserStatus
|
|
from utils.security import get_password_hash
|
|
|
|
# Mock Repository
|
|
class MockUsersRepo:
|
|
def __init__(self):
|
|
self.users = {}
|
|
|
|
async def get_user_by_username(self, username: str):
|
|
return self.users.get(username)
|
|
|
|
async def create_user(self, username: str, password: str, full_name: str = None):
|
|
if username in self.users:
|
|
raise ValueError("User with this username already exists")
|
|
|
|
user = {
|
|
"username": username,
|
|
"hashed_password": get_password_hash(password),
|
|
"full_name": full_name,
|
|
"status": UserStatus.PENDING,
|
|
"is_email_user": False,
|
|
"is_admin": False,
|
|
"created_at": datetime.now()
|
|
}
|
|
self.users[username] = user
|
|
return user
|
|
|
|
async def get_pending_users(self):
|
|
return [u for u in self.users.values() if u["status"] == UserStatus.PENDING]
|
|
|
|
async def approve_user(self, username: str):
|
|
if username in self.users:
|
|
self.users[username]["status"] = UserStatus.ALLOWED
|
|
|
|
async def deny_user(self, username: str):
|
|
if username in self.users:
|
|
self.users[username]["status"] = UserStatus.DENIED
|
|
|
|
mock_repo = MockUsersRepo()
|
|
|
|
# Override Dependency
|
|
app.dependency_overrides[get_users_repo] = lambda: mock_repo
|
|
|
|
client = TestClient(app)
|
|
|
|
def test_auth_flow_with_approval():
|
|
# 1. Register (User)
|
|
user_data = {
|
|
"username": "newuser",
|
|
"password": "password123",
|
|
"full_name": "New User"
|
|
}
|
|
response = client.post("/auth/register", json=user_data)
|
|
assert response.status_code == 200
|
|
assert response.json()["message"] == "Registration successful. Please wait for administrator approval."
|
|
|
|
# 2. Try Login (User) -> Should Fail (Pending)
|
|
login_data = {
|
|
"username": "newuser",
|
|
"password": "password123"
|
|
}
|
|
response = client.post("/auth/token", data=login_data)
|
|
assert response.status_code == 403
|
|
assert "not approved" in response.json()["detail"]
|
|
|
|
# 3. Setup Admin (Backdoor for test)
|
|
mock_repo.users["admin"] = {
|
|
"username": "admin",
|
|
"hashed_password": get_password_hash("adminpass"),
|
|
"status": UserStatus.ALLOWED,
|
|
"is_admin": True,
|
|
"created_at": datetime.now()
|
|
}
|
|
|
|
# 4. Admin Login
|
|
admin_login = {
|
|
"username": "admin",
|
|
"password": "adminpass"
|
|
}
|
|
response = client.post("/auth/token", data=admin_login)
|
|
assert response.status_code == 200
|
|
admin_token = response.json()["access_token"]
|
|
admin_auth = {"Authorization": f"Bearer {admin_token}"}
|
|
|
|
# 5. List Pending (Admin)
|
|
response = client.get("/admin/approvals", headers=admin_auth)
|
|
assert response.status_code == 200
|
|
users = response.json()
|
|
assert len(users) >= 1
|
|
assert users[0]["username"] == "newuser"
|
|
assert users[0]["status"] == "pending"
|
|
|
|
# 6. Approve User (Admin)
|
|
response = client.post("/admin/approve/newuser", headers=admin_auth)
|
|
assert response.status_code == 200
|
|
assert response.json()["message"] == "User newuser approved"
|
|
|
|
# 7. Login User (Again) -> Should Success
|
|
response = client.post("/auth/token", data=login_data)
|
|
assert response.status_code == 200
|
|
assert "access_token" in response.json()
|