init auth
This commit is contained in:
Binary file not shown.
BIN
tests/__pycache__/test_auth_flow.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/__pycache__/test_auth_flow.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
22
tests/test_api_protection.py
Normal file
22
tests/test_api_protection.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from main import app
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
def test_api_protection():
|
||||
# 1. Assets
|
||||
response = client.get("/api/assets")
|
||||
assert response.status_code == 401
|
||||
|
||||
# 2. Characters
|
||||
response = client.get("/api/characters")
|
||||
assert response.status_code == 401
|
||||
|
||||
# 3. Generations
|
||||
response = client.get("/api/generations")
|
||||
assert response.status_code == 401
|
||||
|
||||
# 4. Upload Asset (POST)
|
||||
response = client.post("/api/assets/upload")
|
||||
assert response.status_code == 401
|
||||
107
tests/test_auth_flow.py
Normal file
107
tests/test_auth_flow.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
from datetime import datetime
|
||||
from main import app
|
||||
from api.endpoints.auth import get_users_repo
|
||||
from repos.user_repo import UsersRepo, UserStatus
|
||||
from utils.security import get_password_hash
|
||||
|
||||
# Mock Repository
|
||||
class MockUsersRepo:
|
||||
def __init__(self):
|
||||
self.users = {}
|
||||
|
||||
async def get_user_by_username(self, username: str):
|
||||
return self.users.get(username)
|
||||
|
||||
async def create_user(self, username: str, password: str, full_name: str = None):
|
||||
if username in self.users:
|
||||
raise ValueError("User with this username already exists")
|
||||
|
||||
user = {
|
||||
"username": username,
|
||||
"hashed_password": get_password_hash(password),
|
||||
"full_name": full_name,
|
||||
"status": UserStatus.PENDING,
|
||||
"is_email_user": False,
|
||||
"is_admin": False,
|
||||
"created_at": datetime.now()
|
||||
}
|
||||
self.users[username] = user
|
||||
return user
|
||||
|
||||
async def get_pending_users(self):
|
||||
return [u for u in self.users.values() if u["status"] == UserStatus.PENDING]
|
||||
|
||||
async def approve_user(self, username: str):
|
||||
if username in self.users:
|
||||
self.users[username]["status"] = UserStatus.ALLOWED
|
||||
|
||||
async def deny_user(self, username: str):
|
||||
if username in self.users:
|
||||
self.users[username]["status"] = UserStatus.DENIED
|
||||
|
||||
mock_repo = MockUsersRepo()
|
||||
|
||||
# Override Dependency
|
||||
app.dependency_overrides[get_users_repo] = lambda: mock_repo
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
def test_auth_flow_with_approval():
|
||||
# 1. Register (User)
|
||||
user_data = {
|
||||
"username": "newuser",
|
||||
"password": "password123",
|
||||
"full_name": "New User"
|
||||
}
|
||||
response = client.post("/auth/register", json=user_data)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["message"] == "Registration successful. Please wait for administrator approval."
|
||||
|
||||
# 2. Try Login (User) -> Should Fail (Pending)
|
||||
login_data = {
|
||||
"username": "newuser",
|
||||
"password": "password123"
|
||||
}
|
||||
response = client.post("/auth/token", data=login_data)
|
||||
assert response.status_code == 403
|
||||
assert "not approved" in response.json()["detail"]
|
||||
|
||||
# 3. Setup Admin (Backdoor for test)
|
||||
mock_repo.users["admin"] = {
|
||||
"username": "admin",
|
||||
"hashed_password": get_password_hash("adminpass"),
|
||||
"status": UserStatus.ALLOWED,
|
||||
"is_admin": True,
|
||||
"created_at": datetime.now()
|
||||
}
|
||||
|
||||
# 4. Admin Login
|
||||
admin_login = {
|
||||
"username": "admin",
|
||||
"password": "adminpass"
|
||||
}
|
||||
response = client.post("/auth/token", data=admin_login)
|
||||
assert response.status_code == 200
|
||||
admin_token = response.json()["access_token"]
|
||||
admin_auth = {"Authorization": f"Bearer {admin_token}"}
|
||||
|
||||
# 5. List Pending (Admin)
|
||||
response = client.get("/admin/approvals", headers=admin_auth)
|
||||
assert response.status_code == 200
|
||||
users = response.json()
|
||||
assert len(users) >= 1
|
||||
assert users[0]["username"] == "newuser"
|
||||
assert users[0]["status"] == "pending"
|
||||
|
||||
# 6. Approve User (Admin)
|
||||
response = client.post("/admin/approve/newuser", headers=admin_auth)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["message"] == "User newuser approved"
|
||||
|
||||
# 7. Login User (Again) -> Should Success
|
||||
response = client.post("/auth/token", data=login_data)
|
||||
assert response.status_code == 200
|
||||
assert "access_token" in response.json()
|
||||
Reference in New Issue
Block a user