51 lines
1.6 KiB
Python
51 lines
1.6 KiB
Python
import asyncio
|
|
import os
|
|
from datetime import datetime, timedelta, UTC
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
from models.Generation import Generation, GenerationStatus
|
|
from repos.generation_repo import GenerationRepo
|
|
from config import settings
|
|
|
|
# Mock configs if not present in env
|
|
MONGO_HOST = settings.MONGO_HOST
|
|
DB_NAME = settings.DB_NAME
|
|
|
|
print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}")
|
|
|
|
async def test_scheduler():
|
|
client = AsyncIOMotorClient(MONGO_HOST)
|
|
repo = GenerationRepo(client, db_name=DB_NAME)
|
|
|
|
# 1. Create a "stale" generation (2 hours ago)
|
|
stale_gen = Generation(
|
|
prompt="stale test",
|
|
status=GenerationStatus.RUNNING,
|
|
created_at=datetime.now(UTC) - timedelta(minutes=120),
|
|
assets_list=[],
|
|
aspect_ratio="NINESIXTEEN",
|
|
quality="ONEK"
|
|
)
|
|
gen_id = await repo.create_generation(stale_gen)
|
|
print(f"Created stale generation: {gen_id}")
|
|
|
|
# 2. Run cleanup
|
|
print("Running cleanup...")
|
|
count = await repo.cancel_stale_generations(timeout_minutes=60)
|
|
print(f"Cleaned up {count} generations")
|
|
|
|
# 3. Verify status
|
|
updated_gen = await repo.get_generation(gen_id)
|
|
print(f"Generation status: {updated_gen.status}")
|
|
print(f"Failed reason: {updated_gen.failed_reason}")
|
|
|
|
if updated_gen.status == GenerationStatus.FAILED:
|
|
print("✅ SUCCESS: Generation marked as FAILED")
|
|
else:
|
|
print("❌ FAILURE: Generation status not updated")
|
|
|
|
# Cleanup
|
|
await repo.collection.delete_one({"_id": updated_gen.id}) # Remove test data
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_scheduler())
|