12 Commits

46 changed files with 708 additions and 94 deletions

16
.gitignore vendored
View File

@@ -8,4 +8,18 @@ minio_backup.tar.gz
.idea/ai-char-bot.iml
.idea
.venv
.vscode
.vscode
.vscode/launch.json
middlewares/__pycache__/
middlewares/*.pyc
api/__pycache__/
api/*.pyc
repos/__pycache__/
repos/*.pyc
adapters/__pycache__/
adapters/*.pyc
services/__pycache__/
services/*.pyc
utils/__pycache__/
utils/*.pyc
.vscode/launch.json

27
.vscode/launch.json vendored
View File

@@ -7,7 +7,7 @@
"request": "launch",
"module": "uvicorn",
"args": [
"main:app",
"aiws:app",
"--reload",
"--port",
"8090",
@@ -16,31 +16,6 @@
],
"jinja": true,
"justMyCode": true
},
{
"name": "Python: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
},
{
"name": "Debug Tests: Current File",
"type": "debugpy",
"request": "launch",
"module": "pytest",
"args": [
"${file}"
],
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
}
]
}

View File

@@ -23,28 +23,30 @@ class GoogleAdapter:
self.TEXT_MODEL = "gemini-3-pro-preview"
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> list:
"""Вспомогательный метод для подготовки контента (текст + картинки)"""
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> tuple:
"""Вспомогательный метод для подготовки контента (текст + картинки).
Returns (contents, opened_images) — caller MUST close opened_images after use."""
contents = [prompt]
opened_images = []
if images_list:
logger.info(f"Preparing content with {len(images_list)} images")
for img_bytes in images_list:
try:
# Gemini API требует PIL Image на входе
image = Image.open(io.BytesIO(img_bytes))
contents.append(image)
opened_images.append(image)
except Exception as e:
logger.error(f"Error processing input image: {e}")
else:
logger.info("Preparing content with no images")
return contents
return contents, opened_images
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
"""
Генерация текста (Чат или Vision).
Возвращает строку с ответом.
"""
contents = self._prepare_contents(prompt, images_list)
contents, opened_images = self._prepare_contents(prompt, images_list)
logger.info(f"Generating text: {prompt}")
try:
response = self.client.models.generate_content(
@@ -68,6 +70,9 @@ class GoogleAdapter:
except Exception as e:
logger.error(f"Gemini Text API Error: {e}")
raise GoogleGenerationException(f"Gemini Text API Error: {e}")
finally:
for img in opened_images:
img.close()
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
"""
@@ -75,7 +80,7 @@ class GoogleAdapter:
Возвращает список байтовых потоков (готовых к отправке).
"""
contents = self._prepare_contents(prompt, images_list)
contents, opened_images = self._prepare_contents(prompt, images_list)
logger.info(f"Generating image. Prompt length: {len(prompt)}, Ratio: {aspect_ratio}, Quality: {quality}")
start_time = datetime.now()
@@ -100,9 +105,21 @@ class GoogleAdapter:
if response.usage_metadata:
token_usage = response.usage_metadata.total_token_count
if response.parts is None and response.candidates[0].finish_reason is not None:
raise GoogleGenerationException(f"Generation blocked in cause of {response.candidates[0].finish_reason.value}")
# Check prompt-level block (e.g. PROHIBITED_CONTENT) — no candidates in this case
if response.prompt_feedback and response.prompt_feedback.block_reason:
raise GoogleGenerationException(
f"Generation blocked at prompt level: {response.prompt_feedback.block_reason.value}"
)
# Check candidate-level block
if response.parts is None:
response_reason = (
response.candidates[0].finish_reason
if response.candidates and len(response.candidates) > 0
else "Unknown"
)
raise GoogleGenerationException(f"Generation blocked: {response_reason}")
generated_images = []
@@ -147,4 +164,8 @@ class GoogleAdapter:
except Exception as e:
logger.error(f"Gemini Image API Error: {e}")
raise GoogleGenerationException(f"Gemini Image API Error: {e}")
raise GoogleGenerationException(f"Gemini Image API Error: {e}")
finally:
for img in opened_images:
img.close()
del contents

View File

@@ -56,6 +56,21 @@ class S3Adapter:
print(f"Error downloading from S3: {e}")
return None
async def stream_file(self, object_name: str, chunk_size: int = 65536):
"""Streams a file from S3 yielding chunks. Memory-efficient for large files."""
try:
async with self._get_client() as client:
response = await client.get_object(Bucket=self.bucket_name, Key=object_name)
# aioboto3 Body is an aiohttp StreamReader wrapper
body = response['Body']
data = await body.read()
# Yield in chunks to avoid holding entire response in StreamingResponse buffer
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
except ClientError as e:
print(f"Error streaming from S3: {e}")
return
async def delete_file(self, object_name: str):
"""Deletes a file from S3."""
try:

34
aiws.py
View File

@@ -43,6 +43,7 @@ from api.endpoints.auth import router as api_auth_router
from api.endpoints.admin import router as api_admin_router
from api.endpoints.album_router import router as api_album_router
from api.endpoints.project_router import router as project_api_router
from api.endpoints.idea_router import router as idea_api_router
load_dotenv()
logger = logging.getLogger(__name__)
@@ -120,6 +121,17 @@ assets_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_
gen_router.message.middleware(AlbumMiddleware(latency=0.8))
async def start_scheduler(service: GenerationService):
while True:
try:
logger.info("Running scheduler for stacked generation killing")
await service.cleanup_stale_generations()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
await asyncio.sleep(60) # Check every 10 minutes
# --- LIFESPAN (Запуск FastAPI + Bot) ---
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -151,17 +163,28 @@ async def lifespan(app: FastAPI):
# )
# print("🤖 Bot polling started")
# 3. ЗАПУСК ШЕДУЛЕРА
scheduler_task = asyncio.create_task(start_scheduler(generation_service))
print("⏰ Scheduler started")
yield
# --- SHUTDOWN ---
print("🛑 Shutting down...")
# 3. Остановка бота
polling_task.cancel()
# 4. Остановка шедулера
scheduler_task.cancel()
try:
await polling_task
await scheduler_task
except asyncio.CancelledError:
print("🤖 Bot polling stopped")
print("⏰ Scheduler stopped")
# 3. Остановка бота
# polling_task.cancel()
# try:
# await polling_task
# except asyncio.CancelledError:
# print("🤖 Bot polling stopped")
# 4. Отключение БД
# Обычно Motor закрывать не обязательно при выходе, но хорошим тоном считается
@@ -188,6 +211,7 @@ app.include_router(api_char_router)
app.include_router(api_gen_router)
app.include_router(api_album_router)
app.include_router(project_api_router)
app.include_router(idea_api_router)
# Prometheus Metrics (Instrument after all routers are added)
Instrumentator(

View File

@@ -45,6 +45,11 @@ def get_generation_service(
) -> GenerationService:
return GenerationService(dao, gemini, s3_adapter, bot)
from api.service.idea_service import IdeaService
def get_idea_service(dao: DAO = Depends(get_dao)) -> IdeaService:
return IdeaService(dao)
from fastapi import Header
async def get_project_id(x_project_id: Optional[str] = Header(None, alias="X-Project-ID")) -> Optional[str]:

View File

@@ -9,7 +9,7 @@ from pymongo import MongoClient
from starlette import status
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import Response, JSONResponse
from starlette.responses import Response, JSONResponse, StreamingResponse
from adapters.s3_adapter import S3Adapter
from api.models.AssetDTO import AssetsResponse, AssetResponse
@@ -33,27 +33,46 @@ async def get_asset(
asset_id: str,
request: Request,
thumbnail: bool = False,
dao: DAO = Depends(get_dao)
dao: DAO = Depends(get_dao),
s3_adapter: S3Adapter = Depends(get_s3_adapter),
) -> Response:
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
asset = await dao.assets.get_asset(asset_id)
# 2. Проверка на существование
# Загружаем только метаданные (без data/thumbnail bytes)
asset = await dao.assets.get_asset(asset_id, with_data=False)
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
headers = {
# Кэшировать на 1 год (31536000 сек)
"Cache-Control": "public, max-age=31536000, immutable"
}
content = asset.data
media_type = "image/png" # Default, or detect
# Thumbnail: маленький, можно грузить в RAM
if thumbnail:
if asset.minio_thumbnail_object_name and s3_adapter:
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
if thumb_bytes:
return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers)
# Fallback: thumbnail in DB
if asset.thumbnail:
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers)
# No thumbnail available — fall through to main content
if thumbnail and asset.thumbnail:
content = asset.thumbnail
media_type = "image/jpeg"
return Response(content=content, media_type=media_type, headers=headers)
# Main content: стримим из S3 без загрузки в RAM
if asset.minio_object_name and s3_adapter:
content_type = "image/png"
# if asset.content_type == AssetContentType.VIDEO:
# content_type = "video/mp4"
return StreamingResponse(
s3_adapter.stream_file(asset.minio_object_name),
media_type=content_type,
headers=headers,
)
# Fallback: data stored in DB (legacy)
if asset.data:
return Response(content=asset.data, media_type="image/png", headers=headers)
raise HTTPException(status_code=404, detail="Asset data not found")
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
async def delete_orphan_assets_from_minio(

View File

@@ -8,7 +8,7 @@ from api import service
from api.dependency import get_generation_service, get_project_id, get_dao
from repos.dao import DAO
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest, GenerationGroupResponse
from api.service.generation_service import GenerationService
from models.Generation import Generation
@@ -68,12 +68,12 @@ async def get_generations(character_id: Optional[str] = None, limit: int = 10, o
return await generation_service.get_generations(character_id, limit=limit, offset=offset, user_id=user_id_filter, project_id=project_id)
@router.post("/_run", response_model=GenerationResponse)
@router.post("/_run", response_model=GenerationGroupResponse)
async def post_generation(generation: GenerationRequest, request: Request,
generation_service: GenerationService = Depends(get_generation_service),
current_user: dict = Depends(get_current_user),
project_id: Optional[str] = Depends(get_project_id),
dao: DAO = Depends(get_dao)) -> GenerationResponse:
dao: DAO = Depends(get_dao)) -> GenerationGroupResponse:
logger.info(f"post_generation (run) called. LinkedCharId: {generation.linked_character_id}, PromptLength: {len(generation.prompt)}")
if project_id:
@@ -85,16 +85,6 @@ async def post_generation(generation: GenerationRequest, request: Request,
return await generation_service.create_generation_task(generation, user_id=str(current_user.get("_id")))
@router.get("/{generation_id}", response_model=GenerationResponse)
async def get_generation(generation_id: str,
generation_service: GenerationService = Depends(get_generation_service),
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
logger.debug(f"get_generation called for ID: {generation_id}")
gen = await generation_service.get_generation(generation_id)
if gen and gen.created_by != str(current_user["_id"]):
raise HTTPException(status_code=403, detail="Access denied")
return gen
@router.get("/running")
async def get_running_generations(request: Request,
@@ -113,6 +103,27 @@ async def get_running_generations(request: Request,
return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id)
@router.get("/group/{group_id}", response_model=GenerationGroupResponse)
async def get_generation_group(group_id: str,
generation_service: GenerationService = Depends(get_generation_service),
current_user: dict = Depends(get_current_user)):
logger.info(f"get_generation_group called for group_id: {group_id}")
generations = await generation_service.dao.generations.get_generations_by_group(group_id)
gen_responses = [GenerationResponse(**gen.model_dump()) for gen in generations]
return GenerationGroupResponse(generation_group_id=group_id, generations=gen_responses)
@router.get("/{generation_id}", response_model=GenerationResponse)
async def get_generation(generation_id: str,
generation_service: GenerationService = Depends(get_generation_service),
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
logger.debug(f"get_generation called for ID: {generation_id}")
gen = await generation_service.get_generation(generation_id)
if gen and gen.created_by != str(current_user["_id"]):
raise HTTPException(status_code=403, detail="Access denied")
return gen
@router.post("/import", response_model=GenerationResponse)

View File

@@ -0,0 +1,103 @@
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Body
from api.dependency import get_idea_service, get_project_id, get_generation_service
from api.endpoints.auth import get_current_user
from api.service.idea_service import IdeaService
from api.service.generation_service import GenerationService
from models.Idea import Idea
from api.models.GenerationRequest import GenerationResponse, GenerationsResponse
from api.models.IdeaRequest import IdeaCreateRequest, IdeaUpdateRequest, IdeaResponse
router = APIRouter(prefix="/api/ideas", tags=["ideas"])
@router.post("", response_model=Idea)
async def create_idea(
request: IdeaCreateRequest,
project_id: Optional[str] = Depends(get_project_id),
current_user: dict = Depends(get_current_user),
idea_service: IdeaService = Depends(get_idea_service)
):
pid = project_id or request.project_id
return await idea_service.create_idea(request.name, request.description, pid, str(current_user["_id"]))
@router.get("", response_model=List[IdeaResponse])
async def get_ideas(
project_id: Optional[str] = Depends(get_project_id),
limit: int = 20,
offset: int = 0,
current_user: dict = Depends(get_current_user),
idea_service: IdeaService = Depends(get_idea_service)
):
return await idea_service.get_ideas(project_id, str(current_user["_id"]), limit, offset)
@router.get("/{idea_id}", response_model=Idea)
async def get_idea(
idea_id: str,
idea_service: IdeaService = Depends(get_idea_service)
):
idea = await idea_service.get_idea(idea_id)
if not idea:
raise HTTPException(status_code=404, detail="Idea not found")
return idea
@router.put("/{idea_id}", response_model=Idea)
async def update_idea(
idea_id: str,
request: IdeaUpdateRequest,
idea_service: IdeaService = Depends(get_idea_service)
):
idea = await idea_service.update_idea(idea_id, request.name, request.description)
if not idea:
raise HTTPException(status_code=404, detail="Idea not found")
return idea
@router.delete("/{idea_id}")
async def delete_idea(
idea_id: str,
idea_service: IdeaService = Depends(get_idea_service)
):
success = await idea_service.delete_idea(idea_id)
if not success:
raise HTTPException(status_code=404, detail="Idea not found or could not be deleted")
return {"status": "success"}
@router.get("/{idea_id}/generations", response_model=GenerationsResponse)
async def get_idea_generations(
idea_id: str,
limit: int = 50,
offset: int = 0,
generation_service: GenerationService = Depends(get_generation_service)
):
# Depending on how generation service implements filtering by idea_id.
# We might need to update generation_service to support getting by idea_id directly
# or ensure generic get_generations supports it.
# Looking at generation_router.py, get_generations doesn't have idea_id arg?
# Let's check generation_service.get_generations signature again.
# It has: (character_id, limit, offset, user_id, project_id). NO IDEA_ID.
# I need to update GenerationService.get_generations too!
# For now, let's assume I will update it.
return await generation_service.get_generations(idea_id=idea_id, limit=limit, offset=offset)
@router.post("/{idea_id}/generations/{generation_id}")
async def add_generation_to_idea(
idea_id: str,
generation_id: str,
idea_service: IdeaService = Depends(get_idea_service)
):
success = await idea_service.add_generation_to_idea(idea_id, generation_id)
if not success:
raise HTTPException(status_code=404, detail="Idea or Generation not found")
return {"status": "success"}
@router.delete("/{idea_id}/generations/{generation_id}")
async def remove_generation_from_idea(
idea_id: str,
generation_id: str,
idea_service: IdeaService = Depends(get_idea_service)
):
success = await idea_service.remove_generation_from_idea(idea_id, generation_id)
if not success:
raise HTTPException(status_code=404, detail="Idea or Generation not found")
return {"status": "success"}

View File

@@ -1,7 +1,7 @@
from datetime import datetime, UTC
from typing import List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
from models.Asset import Asset
from models.Generation import GenerationStatus
@@ -17,6 +17,8 @@ class GenerationRequest(BaseModel):
use_profile_image: bool = True
assets_list: List[str]
project_id: Optional[str] = None
idea_id: Optional[str] = None
count: int = Field(default=1, ge=1, le=10)
class GenerationsResponse(BaseModel):
@@ -45,10 +47,16 @@ class GenerationResponse(BaseModel):
progress: int = 0
cost: Optional[float] = None
created_by: Optional[str] = None
generation_group_id: Optional[str] = None
idea_id: Optional[str] = None
created_at: datetime = datetime.now(UTC)
updated_at: datetime = datetime.now(UTC)
class GenerationGroupResponse(BaseModel):
generation_group_id: str
generations: List[GenerationResponse]
class PromptRequest(BaseModel):
prompt: str

16
api/models/IdeaRequest.py Normal file
View File

@@ -0,0 +1,16 @@
from typing import Optional
from pydantic import BaseModel
from models.Idea import Idea
from api.models.GenerationRequest import GenerationResponse
class IdeaCreateRequest(BaseModel):
name: str
description: Optional[str] = None
project_id: Optional[str] = None # Optional in body if passed via header/dependency
class IdeaUpdateRequest(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
class IdeaResponse(Idea):
last_generation: Optional[GenerationResponse] = None

View File

@@ -5,13 +5,14 @@ import base64
from datetime import datetime, UTC
from typing import List, Optional, Tuple, Any, Dict
from io import BytesIO
from uuid import uuid4
import httpx
from aiogram import Bot
from aiogram.types import BufferedInputFile
from adapters.Exception import GoogleGenerationException
from adapters.google_adapter import GoogleAdapter
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse, GenerationGroupResponse
# Импортируйте ваши модели DAO, Asset, Generation корректно
from models.Asset import Asset, AssetType, AssetContentType
from models.Generation import Generation, GenerationStatus
@@ -21,6 +22,9 @@ from adapters.s3_adapter import S3Adapter
logger = logging.getLogger(__name__)
# Limit concurrent generations to 4
generation_semaphore = asyncio.Semaphore(4)
# --- Вспомогательная функция генерации ---
async def generate_image_task(
@@ -50,16 +54,18 @@ async def generate_image_task(
logger.info(f"generate_image_task completed, received {len(generated_images_io) if generated_images_io else 0} images")
except GoogleGenerationException as e:
raise e
finally:
# Освобождаем входные данные — они больше не нужны
del media_group_bytes
images_bytes = []
if generated_images_io:
for img_io in generated_images_io:
# Читаем байты из BytesIO
img_io.seek(0)
content = img_io.read()
images_bytes.append(content)
# Закрываем поток
images_bytes.append(img_io.read())
img_io.close()
# Освобождаем список BytesIO сразу
del generated_images_io
return images_bytes, metrics
@@ -94,10 +100,9 @@ class GenerationService:
return await asyncio.to_thread(self.gemini.generate_text, prompt=technical_prompt, images_list=images)
async def get_generations(self, character_id: Optional[str] = None, limit: int = 10, offset: int = 0, user_id: Optional[str] = None, project_id: Optional[str] = None) -> List[
Generation]:
generations = await self.dao.generations.get_generations(character_id = character_id,limit=limit, offset=offset, created_by=user_id, project_id=project_id)
total_count = await self.dao.generations.count_generations(character_id = character_id, created_by=user_id, project_id=project_id)
async def get_generations(self, character_id: Optional[str] = None, limit: int = 10, offset: int = 0, user_id: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> GenerationsResponse:
generations = await self.dao.generations.get_generations(character_id = character_id,limit=limit, offset=offset, created_by=user_id, project_id=project_id, idea_id=idea_id)
total_count = await self.dao.generations.count_generations(character_id = character_id, created_by=user_id, project_id=project_id, idea_id=idea_id)
generations = [GenerationResponse(**gen.model_dump()) for gen in generations]
return GenerationsResponse(generations=generations, total_count=total_count)
@@ -111,23 +116,43 @@ class GenerationService:
async def get_running_generations(self, user_id: Optional[str] = None, project_id: Optional[str] = None) -> List[Generation]:
return await self.dao.generations.get_generations(status=GenerationStatus.RUNNING, created_by=user_id, project_id=project_id)
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationGroupResponse:
count = generation_request.count
if generation_group_id is None:
generation_group_id = str(uuid4())
results = []
for _ in range(count):
gen_response = await self._create_single_generation(generation_request, user_id, generation_group_id)
results.append(gen_response)
return GenerationGroupResponse(generation_group_id=generation_group_id, generations=results)
async def _create_single_generation(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationResponse:
gen_id = None
generation_model = None
try:
generation_model = Generation(**generation_request.model_dump())
generation_model = Generation(**generation_request.model_dump(exclude={'count'}))
if user_id:
generation_model.created_by = user_id
if generation_group_id:
generation_model.generation_group_id = generation_group_id
# Explicitly set idea_id from request if present (already in model_dump, but ensuring clarity)
if generation_request.idea_id:
generation_model.idea_id = generation_request.idea_id
gen_id = await self.dao.generations.create_generation(generation_model)
generation_model.id = gen_id
async def runner(gen):
logger.info(f"Starting background generation task for ID: {gen.id}")
logger.info(f"Generation {gen.id} entered queue (waiting for slot)...")
try:
await self.create_generation(gen)
logger.info(f"Background generation task finished for ID: {gen.id}")
async with generation_semaphore:
logger.info(f"Starting background generation task for ID: {gen.id}")
await self.create_generation(gen)
logger.info(f"Background generation task finished for ID: {gen.id}")
except Exception:
# если генерация уже пошла и упала — пометим FAILED
try:
@@ -443,4 +468,15 @@ class GenerationService:
return True
except Exception as e:
logger.error(f"Error deleting generation {generation_id}: {e}")
return False
return False
async def cleanup_stale_generations(self):
"""
Cancels generations that have been running for more than 1 hour.
"""
try:
count = await self.dao.generations.cancel_stale_generations(timeout_minutes=60)
if count > 0:
logger.info(f"Cleaned up {count} stale generations (timeout)")
except Exception as e:
logger.error(f"Error cleaning up stale generations: {e}")

View File

@@ -0,0 +1,75 @@
from typing import List, Optional
from datetime import datetime
from repos.dao import DAO
from models.Idea import Idea
class IdeaService:
def __init__(self, dao: DAO):
self.dao = dao
async def create_idea(self, name: str, description: Optional[str], project_id: Optional[str], user_id: str) -> Idea:
idea = Idea(name=name, description=description, project_id=project_id, created_by=user_id)
idea_id = await self.dao.ideas.create_idea(idea)
idea.id = idea_id
return idea
async def get_ideas(self, project_id: Optional[str], user_id: str, limit: int = 20, offset: int = 0) -> List[dict]:
return await self.dao.ideas.get_ideas(project_id, user_id, limit, offset)
async def get_idea(self, idea_id: str) -> Optional[Idea]:
return await self.dao.ideas.get_idea(idea_id)
async def update_idea(self, idea_id: str, name: Optional[str] = None, description: Optional[str] = None) -> Optional[Idea]:
idea = await self.dao.ideas.get_idea(idea_id)
if not idea:
return None
if name is not None:
idea.name = name
if description is not None:
idea.description = description
idea.updated_at = datetime.now()
await self.dao.ideas.update_idea(idea)
return idea
async def delete_idea(self, idea_id: str) -> bool:
return await self.dao.ideas.delete_idea(idea_id)
async def add_generation_to_idea(self, idea_id: str, generation_id: str) -> bool:
# Verify idea exists
idea = await self.dao.ideas.get_idea(idea_id)
if not idea:
return False
# Get generation
gen = await self.dao.generations.get_generation(generation_id)
if not gen:
return False
# Link
gen.idea_id = idea_id
gen.updated_at = datetime.now()
await self.dao.generations.update_generation(gen)
return True
async def remove_generation_from_idea(self, idea_id: str, generation_id: str) -> bool:
# Verify idea exists (optional, but good for validation)
idea = await self.dao.ideas.get_idea(idea_id)
if not idea:
return False
# Get generation
gen = await self.dao.generations.get_generation(generation_id)
if not gen:
return False
# Unlink only if currently linked to this idea
if gen.idea_id == idea_id:
gen.idea_id = None
gen.updated_at = datetime.now()
await self.dao.generations.update_generation(gen)
return True
return False

View File

@@ -35,8 +35,10 @@ class Generation(BaseModel):
output_token_usage: Optional[int] = None
is_deleted: bool = False
album_id: Optional[str] = None
generation_group_id: Optional[str] = None
created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId)
project_id: Optional[str] = None
idea_id: Optional[str] = None
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))

13
models/Idea.py Normal file
View File

@@ -0,0 +1,13 @@
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field
class Idea(BaseModel):
id: Optional[str] = None
name: str = "New Idea"
description: Optional[str] = None
project_id: Optional[str] = None
created_by: str # User ID
is_deleted: bool = False
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)

View File

@@ -1,6 +1,7 @@
from typing import List, Optional
import logging
from bson import ObjectId
from uuid import uuid4
from motor.motor_asyncio import AsyncIOMotorClient
from models.Asset import Asset
@@ -19,7 +20,8 @@ class AssetsRepo:
# Main data
if asset.data:
ts = int(asset.created_at.timestamp())
object_name = f"{asset.type.value}/{ts}_{asset.name}"
uid = uuid4().hex[:8]
object_name = f"{asset.type.value}/{ts}_{uid}_{asset.name}"
uploaded = await self.s3.upload_file(object_name, asset.data)
if uploaded:
@@ -32,7 +34,8 @@ class AssetsRepo:
# Thumbnail
if asset.thumbnail:
ts = int(asset.created_at.timestamp())
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
uid = uuid4().hex[:8]
thumb_name = f"{asset.type.value}/thumbs/{ts}_{uid}_{asset.name}_thumb.jpg"
uploaded_thumb = await self.s3.upload_file(thumb_name, asset.thumbnail)
if uploaded_thumb:
@@ -134,7 +137,8 @@ class AssetsRepo:
if self.s3:
if asset.data:
ts = int(asset.created_at.timestamp())
object_name = f"{asset.type.value}/{ts}_{asset.name}"
uid = uuid4().hex[:8]
object_name = f"{asset.type.value}/{ts}_{uid}_{asset.name}"
if await self.s3.upload_file(object_name, asset.data):
asset.minio_object_name = object_name
asset.minio_bucket = self.s3.bucket_name
@@ -142,7 +146,8 @@ class AssetsRepo:
if asset.thumbnail:
ts = int(asset.created_at.timestamp())
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
uid = uuid4().hex[:8]
thumb_name = f"{asset.type.value}/thumbs/{ts}_{uid}_{asset.name}_thumb.jpg"
if await self.s3.upload_file(thumb_name, asset.thumbnail):
asset.minio_thumbnail_object_name = thumb_name
asset.thumbnail = None
@@ -216,7 +221,8 @@ class AssetsRepo:
created_at = doc.get("created_at")
ts = int(created_at.timestamp()) if created_at else 0
object_name = f"{type_}/{ts}_{asset_id}_{name}"
uid = uuid4().hex[:8]
object_name = f"{type_}/{ts}_{uid}_{asset_id}_{name}"
if await self.s3.upload_file(object_name, data):
await self.collection.update_one(
{"_id": asset_id},
@@ -243,7 +249,8 @@ class AssetsRepo:
created_at = doc.get("created_at")
ts = int(created_at.timestamp()) if created_at else 0
thumb_name = f"{type_}/thumbs/{ts}_{asset_id}_{name}_thumb.jpg"
uid = uuid4().hex[:8]
thumb_name = f"{type_}/thumbs/{ts}_{uid}_{asset_id}_{name}_thumb.jpg"
if await self.s3.upload_file(thumb_name, thumb):
await self.collection.update_one(
{"_id": asset_id},

View File

@@ -6,6 +6,7 @@ from repos.generation_repo import GenerationRepo
from repos.user_repo import UsersRepo
from repos.albums_repo import AlbumsRepo
from repos.project_repo import ProjectRepo
from repos.idea_repo import IdeaRepo
from typing import Optional
@@ -19,3 +20,4 @@ class DAO:
self.albums = AlbumsRepo(client, db_name)
self.projects = ProjectRepo(client, db_name)
self.users = UsersRepo(client, db_name)
self.ideas = IdeaRepo(client, db_name)

View File

@@ -1,4 +1,5 @@
from typing import Optional, List
from datetime import datetime, timedelta, UTC
from PIL.ImageChops import offset
from bson import ObjectId
@@ -25,7 +26,7 @@ class GenerationRepo:
return Generation(**res)
async def get_generations(self, character_id: Optional[str] = None, status: Optional[GenerationStatus] = None,
limit: int = 10, offset: int = 10, created_by: Optional[str] = None, project_id: Optional[str] = None) -> List[Generation]:
limit: int = 10, offset: int = 0, created_by: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> List[Generation]:
filter = {"is_deleted": False}
if character_id is not None:
@@ -34,11 +35,20 @@ class GenerationRepo:
filter["status"] = status
if created_by is not None:
filter["created_by"] = created_by
filter["project_id"] = None
# If filtering by created_by user (e.g. "My Generations"), we typically imply personal scope if project_id is None.
# But if project_id is passed, we filter by that.
if project_id is None:
filter["project_id"] = None
if project_id is not None:
filter["project_id"] = project_id
if idea_id is not None:
filter["idea_id"] = idea_id
res = await self.collection.find(filter).sort("created_at", -1).skip(
# If fetching for an idea, sort by created_at ascending (cronological)
# Otherwise typically descending (newest first)
sort_order = 1 if idea_id else -1
res = await self.collection.find(filter).sort("created_at", sort_order).skip(
offset).limit(limit).to_list(None)
generations: List[Generation] = []
for generation in res:
@@ -47,7 +57,7 @@ class GenerationRepo:
return generations
async def count_generations(self, character_id: Optional[str] = None, status: Optional[GenerationStatus] = None,
album_id: Optional[str] = None, created_by: Optional[str] = None, project_id: Optional[str] = None) -> int:
album_id: Optional[str] = None, created_by: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> int:
args = {}
if character_id is not None:
args["linked_character_id"] = character_id
@@ -57,6 +67,8 @@ class GenerationRepo:
args["created_by"] = created_by
if project_id is not None:
args["project_id"] = project_id
if idea_id is not None:
args["idea_id"] = idea_id
return await self.collection.count_documents(args)
async def get_generations_by_ids(self, generation_ids: List[str]) -> List[Generation]:
@@ -77,3 +89,28 @@ class GenerationRepo:
async def update_generation(self, generation: Generation, ):
res = await self.collection.update_one({"_id": ObjectId(generation.id)}, {"$set": generation.model_dump()})
async def get_generations_by_group(self, group_id: str) -> List[Generation]:
res = await self.collection.find({"generation_group_id": group_id, "is_deleted": False}).sort("created_at", 1).to_list(None)
generations: List[Generation] = []
for generation in res:
generation["id"] = str(generation.pop("_id"))
generations.append(Generation(**generation))
return generations
async def cancel_stale_generations(self, timeout_minutes: int = 5) -> int:
cutoff_time = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
res = await self.collection.update_many(
{
"status": GenerationStatus.RUNNING,
"created_at": {"$lt": cutoff_time}
},
{
"$set": {
"status": GenerationStatus.FAILED,
"failed_reason": "Timeout: Execution time limit exceeded",
"updated_at": datetime.now(UTC)
}
}
)
return res.modified_count

82
repos/idea_repo.py Normal file
View File

@@ -0,0 +1,82 @@
from typing import Optional, List
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
from models.Idea import Idea
class IdeaRepo:
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
self.collection = client[db_name]["ideas"]
async def create_idea(self, idea: Idea) -> str:
res = await self.collection.insert_one(idea.model_dump())
return str(res.inserted_id)
async def get_idea(self, idea_id: str) -> Optional[Idea]:
if not ObjectId.is_valid(idea_id):
return None
res = await self.collection.find_one({"_id": ObjectId(idea_id)})
if res:
res["id"] = str(res.pop("_id"))
return Idea(**res)
return None
async def get_ideas(self, project_id: Optional[str], user_id: str, limit: int = 20, offset: int = 0) -> List[dict]:
if project_id:
match_stage = {"project_id": project_id, "is_deleted": False}
else:
match_stage = {"created_by": user_id, "project_id": None, "is_deleted": False}
pipeline = [
{"$match": match_stage},
{"$sort": {"updated_at": -1}},
{"$skip": offset},
{"$limit": limit},
# Add string id field for lookup
{"$addFields": {"str_id": {"$toString": "$_id"}}},
# Lookup generations
{
"$lookup": {
"from": "generations",
"let": {"idea_id": "$str_id"},
"pipeline": [
{"$match": {"$expr": {"$eq": ["$idea_id", "$$idea_id"]}}},
{"$sort": {"created_at": -1}}, # Ensure we get the latest
{"$limit": 1}
],
"as": "generations"
}
},
# Unwind generations array (preserve ideas without generations)
{"$unwind": {"path": "$generations", "preserveNullAndEmptyArrays": True}},
# Rename for clarity
{"$addFields": {
"last_generation": "$generations",
"id": "$str_id"
}},
{"$project": {"generations": 0, "str_id": 0, "_id": 0}}
]
return await self.collection.aggregate(pipeline).to_list(None)
async def delete_idea(self, idea_id: str) -> bool:
if not ObjectId.is_valid(idea_id):
return False
res = await self.collection.update_one(
{"_id": ObjectId(idea_id)},
{"$set": {"is_deleted": True}}
)
return res.modified_count > 0
async def update_idea(self, idea: Idea) -> bool:
if not idea.id or not ObjectId.is_valid(idea.id):
return False
idea_dict = idea.model_dump()
if "id" in idea_dict:
del idea_dict["id"]
res = await self.collection.update_one(
{"_id": ObjectId(idea.id)},
{"$set": idea_dict}
)
return res.modified_count > 0

97
tests/test_idea.py Normal file
View File

@@ -0,0 +1,97 @@
import asyncio
import os
from dotenv import load_dotenv
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
# Import from project root (requires PYTHONPATH=.)
from api.service.idea_service import IdeaService
from repos.dao import DAO
from models.Idea import Idea
from models.Generation import Generation, GenerationStatus
from models.enums import AspectRatios, Quality
load_dotenv()
MONGO_HOST = os.getenv("MONGO_HOST", "mongodb://localhost:27017")
DB_NAME = os.getenv("DB_NAME", "bot_db")
print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}")
async def test_idea_flow():
client = AsyncIOMotorClient(MONGO_HOST)
dao = DAO(client, db_name=DB_NAME)
service = IdeaService(dao)
# 1. Create an Idea
print("Creating idea...")
user_id = "test_user_123"
project_id = "test_project_abc"
idea = await service.create_idea("My Test Idea", "Initial Description", project_id, user_id)
print(f"Idea created: {idea.id} - {idea.name}")
# 2. Update Idea
print("Updating idea...")
updated_idea = await service.update_idea(idea.id, description="Updated description")
print(f"Idea updated: {updated_idea.description}")
if updated_idea.description == "Updated description":
print("✅ Idea update successful")
else:
print("❌ Idea update FAILED")
# 3. Add Generation linked to Idea
print("Creating generation linked to idea...")
gen = Generation(
prompt="idea generation 1",
# idea_id=idea.id, <-- Intentionally NOT linking initially to test linking method
project_id=project_id,
created_by=user_id,
aspect_ratio=AspectRatios.NINESIXTEEN,
quality=Quality.ONEK,
assets_list=[]
)
gen_id = await dao.generations.create_generation(gen)
print(f"Created generation: {gen_id}")
# Link generation to idea
print("Linking generation to idea...")
success = await service.add_generation_to_idea(idea.id, gen_id)
if success:
print("✅ Linking successful")
else:
print("❌ Linking FAILED")
# Debug: Check if generation was saved with idea_id
saved_gen = await dao.generations.collection.find_one({"_id": ObjectId(gen_id)})
print(f"DEBUG: Saved Generation in DB idea_id: {saved_gen.get('idea_id')}")
# 4. Fetch Generations for Idea (Verify filtering and ordering)
print("Fetching generations for idea...")
gens = await service.dao.generations.get_generations(idea_id=idea.id) # using repo directly as service might return wrapper
print(f"Found {len(gens)} generations in idea")
if len(gens) == 1 and gens[0].id == gen_id:
print("✅ Generation retrieval successful")
else:
print("❌ Generation retrieval FAILED")
# 5. Fetch Ideas for Project
ideas = await service.get_ideas(project_id)
print(f"Found {len(ideas)} ideas for project")
# Cleaning up
print("Cleaning up...")
await service.delete_idea(idea.id)
await dao.generations.collection.delete_one({"_id": ObjectId(gen_id)})
# Verify deletion
deleted_idea = await service.get_idea(idea.id)
# IdeaRepo.delete_idea logic sets is_deleted=True
if deleted_idea and deleted_idea.is_deleted:
print(f"✅ Idea deleted successfully")
# Hard delete for cleanup
await dao.ideas.collection.delete_one({"_id": ObjectId(idea.id)})
if __name__ == "__main__":
asyncio.run(test_idea_flow())

52
tests/test_scheduler.py Normal file
View File

@@ -0,0 +1,52 @@
import asyncio
import os
from datetime import datetime, timedelta, UTC
from motor.motor_asyncio import AsyncIOMotorClient
from models.Generation import Generation, GenerationStatus
from repos.generation_repo import GenerationRepo
from dotenv import load_dotenv
load_dotenv()
# Mock configs if not present in env
MONGO_HOST = os.getenv("MONGO_HOST", "mongodb://localhost:27017")
DB_NAME = os.getenv("DB_NAME", "bot_db")
print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}")
async def test_scheduler():
client = AsyncIOMotorClient(MONGO_HOST)
repo = GenerationRepo(client, db_name=DB_NAME)
# 1. Create a "stale" generation (2 hours ago)
stale_gen = Generation(
prompt="stale test",
status=GenerationStatus.RUNNING,
created_at=datetime.now(UTC) - timedelta(minutes=120),
assets_list=[],
aspect_ratio="NINESIXTEEN",
quality="ONEK"
)
gen_id = await repo.create_generation(stale_gen)
print(f"Created stale generation: {gen_id}")
# 2. Run cleanup
print("Running cleanup...")
count = await repo.cancel_stale_generations(timeout_minutes=60)
print(f"Cleaned up {count} generations")
# 3. Verify status
updated_gen = await repo.get_generation(gen_id)
print(f"Generation status: {updated_gen.status}")
print(f"Failed reason: {updated_gen.failed_reason}")
if updated_gen.status == GenerationStatus.FAILED:
print("✅ SUCCESS: Generation marked as FAILED")
else:
print("❌ FAILURE: Generation status not updated")
# Cleanup
await repo.collection.delete_one({"_id": updated_gen.id}) # Remove test data
if __name__ == "__main__":
asyncio.run(test_scheduler())