diff --git a/adapters/__pycache__/google_adapter.cpython-313.pyc b/adapters/__pycache__/google_adapter.cpython-313.pyc index 5179ccb..21119cc 100644 Binary files a/adapters/__pycache__/google_adapter.cpython-313.pyc and b/adapters/__pycache__/google_adapter.cpython-313.pyc differ diff --git a/adapters/__pycache__/s3_adapter.cpython-313.pyc b/adapters/__pycache__/s3_adapter.cpython-313.pyc index fbe35de..3d604c1 100644 Binary files a/adapters/__pycache__/s3_adapter.cpython-313.pyc and b/adapters/__pycache__/s3_adapter.cpython-313.pyc differ diff --git a/adapters/google_adapter.py b/adapters/google_adapter.py index 2b48d8f..adcfe0a 100644 --- a/adapters/google_adapter.py +++ b/adapters/google_adapter.py @@ -23,10 +23,10 @@ class GoogleAdapter: self.TEXT_MODEL = "gemini-3-pro-preview" self.IMAGE_MODEL = "gemini-3-pro-image-preview" - def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> tuple: + def _prepare_contents(self, prompt: str, images_list: List[bytes] | None = None) -> tuple: """Вспомогательный метод для подготовки контента (текст + картинки). Returns (contents, opened_images) — caller MUST close opened_images after use.""" - contents = [prompt] + contents : list [Any]= [prompt] opened_images = [] if images_list: logger.info(f"Preparing content with {len(images_list)} images") @@ -41,7 +41,7 @@ class GoogleAdapter: logger.info("Preparing content with no images") return contents, opened_images - def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str: + def generate_text(self, prompt: str, images_list: List[bytes] | None = None) -> str: """ Генерация текста (Чат или Vision). Возвращает строку с ответом. @@ -74,7 +74,7 @@ class GoogleAdapter: for img in opened_images: img.close() - def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]: + def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] | None = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]: """ Генерация изображений (Text-to-Image или Image-to-Image). Возвращает список байтовых потоков (готовых к отправке). @@ -130,7 +130,9 @@ class GoogleAdapter: try: # 1. Берем сырые байты raw_data = part.inline_data.data - byte_arr = io.BytesIO(raw_data) + if raw_data is None: + raise GoogleGenerationException("Generation returned no data") + byte_arr : io.BytesIO = io.BytesIO(raw_data) # 2. Нейминг (формально, для TG) timestamp = datetime.now().timestamp() diff --git a/adapters/s3_adapter.py b/adapters/s3_adapter.py index 957cbec..28ba113 100644 --- a/adapters/s3_adapter.py +++ b/adapters/s3_adapter.py @@ -18,7 +18,7 @@ class S3Adapter: @asynccontextmanager async def _get_client(self): - async with self.session.client( + async with self.session.client( # type: ignore[reportGeneralTypeIssues] "s3", endpoint_url=self.endpoint_url, aws_access_key_id=self.aws_access_key_id, diff --git a/aiws.py b/aiws.py index e08bdda..f755f37 100644 --- a/aiws.py +++ b/aiws.py @@ -64,6 +64,8 @@ def setup_logging(): # --- ИНИЦИАЛИЗАЦИЯ ЗАВИСИМОСТЕЙ --- +if BOT_TOKEN is None: + raise ValueError("BOT_TOKEN is not set") bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) # Клиент БД создаем глобально, чтобы он был доступен и боту (Storage), и API @@ -83,8 +85,12 @@ s3_adapter = S3Adapter( ) dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота +if GEMINI_API_KEY is None: + raise ValueError("GEMINI_API_KEY is not set") gemini = GoogleAdapter(api_key=GEMINI_API_KEY) -generation_service = GenerationService(dao, gemini, bot) +if bot is None: + raise ValueError("bot is not set") +generation_service = GenerationService(dao=dao, gemini=gemini, s3_adapter=s3_adapter, bot=bot) album_service = AlbumService(dao) # Dispatcher @@ -126,11 +132,12 @@ async def start_scheduler(service: GenerationService): try: logger.info("Running scheduler for stacked generation killing") await service.cleanup_stale_generations() + await service.cleanup_old_data(days=2) except asyncio.CancelledError: break except Exception as e: logger.error(f"Scheduler error: {e}") - await asyncio.sleep(60) # Check every 10 minutes + await asyncio.sleep(60) # Check every 60 seconds # --- LIFESPAN (Запуск FastAPI + Bot) --- @asynccontextmanager diff --git a/api/__pycache__/dependency.cpython-313.pyc b/api/__pycache__/dependency.cpython-313.pyc index c32ebf6..94bb23c 100644 Binary files a/api/__pycache__/dependency.cpython-313.pyc and b/api/__pycache__/dependency.cpython-313.pyc differ diff --git a/api/dependency.py b/api/dependency.py index 51674c5..42fba46 100644 --- a/api/dependency.py +++ b/api/dependency.py @@ -5,6 +5,7 @@ from motor.motor_asyncio import AsyncIOMotorClient from adapters.google_adapter import GoogleAdapter from api.service.generation_service import GenerationService from repos.dao import DAO +from api.service.album_service import AlbumService # ... ваши импорты ... @@ -53,4 +54,7 @@ def get_idea_service(dao: DAO = Depends(get_dao)) -> IdeaService: from fastapi import Header async def get_project_id(x_project_id: Optional[str] = Header(None, alias="X-Project-ID")) -> Optional[str]: - return x_project_id \ No newline at end of file + return x_project_id + +async def get_album_service(dao: DAO = Depends(get_dao)) -> AlbumService: + return AlbumService(dao) \ No newline at end of file diff --git a/api/endpoints/__pycache__/admin.cpython-313.pyc b/api/endpoints/__pycache__/admin.cpython-313.pyc index 4654462..5e1f99e 100644 Binary files a/api/endpoints/__pycache__/admin.cpython-313.pyc and b/api/endpoints/__pycache__/admin.cpython-313.pyc differ diff --git a/api/endpoints/admin.py b/api/endpoints/admin.py index 5c16ec8..262712b 100644 --- a/api/endpoints/admin.py +++ b/api/endpoints/admin.py @@ -23,7 +23,7 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], repo: ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - username: str = payload.get("sub") + username: str | None = payload.get("sub") if username is None: raise credentials_exception except JWTError: diff --git a/api/endpoints/album_router.py b/api/endpoints/album_router.py index 3c4b74f..252a619 100644 --- a/api/endpoints/album_router.py +++ b/api/endpoints/album_router.py @@ -1,10 +1,13 @@ from typing import List, Optional -from fastapi import APIRouter, Depends, HTTPException, status, Request +from fastapi import APIRouter, HTTPException, status, Request from pydantic import BaseModel from api.models.GenerationRequest import GenerationResponse from models.Album import Album from repos.dao import DAO +from api.dependency import get_album_service +from api.service.album_service import AlbumService + router = APIRouter(prefix="/api/albums", tags=["Albums"]) diff --git a/api/service/__pycache__/generation_service.cpython-313.pyc b/api/service/__pycache__/generation_service.cpython-313.pyc index 99d983a..20d4140 100644 Binary files a/api/service/__pycache__/generation_service.cpython-313.pyc and b/api/service/__pycache__/generation_service.cpython-313.pyc differ diff --git a/api/service/generation_service.py b/api/service/generation_service.py index 9417706..3d1824e 100644 --- a/api/service/generation_service.py +++ b/api/service/generation_service.py @@ -77,7 +77,7 @@ class GenerationService: self.bot = bot - async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str: + async def ask_prompt_assistant(self, prompt: str, assets: list[str] | None = None) -> str: future_prompt = """You are an prompt-assistant. You improving user-entered prompts for image generation. User may upload reference image too. I will provide sources prompt entered by user. Understand user needs and generate best variation of prompt. ANSWER ONLY PROMPT STRING!!! USER_ENTERED_PROMPT: """ @@ -157,8 +157,9 @@ class GenerationService: # если генерация уже пошла и упала — пометим FAILED try: db_gen = await self.dao.generations.get_generation(gen.id) - db_gen.status = GenerationStatus.FAILED - await self.dao.generations.update_generation(db_gen) + if db_gen is not None: + db_gen.status = GenerationStatus.FAILED + await self.dao.generations.update_generation(db_gen) except Exception: logger.exception("Failed to mark generation as FAILED") logger.exception("create_generation task failed") @@ -172,8 +173,9 @@ class GenerationService: if gen_id is not None: try: gen = await self.dao.generations.get_generation(gen_id) - gen.status = GenerationStatus.FAILED - await self.dao.generations.update_generation(gen) + if gen is not None: + gen.status = GenerationStatus.FAILED + await self.dao.generations.update_generation(gen) except Exception: logger.exception("Failed to mark generation as FAILED in create_generation_task") raise @@ -201,9 +203,10 @@ class GenerationService: if char_info is None: raise Exception(f"Character ID {generation.linked_character_id} not found") if generation.use_profile_image: - avatar_asset = await self.dao.assets.get_asset(char_info.avatar_asset_id) - if avatar_asset: - media_group_bytes.append(avatar_asset.data) + if char_info.avatar_asset_id is not None: + avatar_asset = await self.dao.assets.get_asset(char_info.avatar_asset_id) + if avatar_asset and avatar_asset.data: + media_group_bytes.append(avatar_asset.data) # generation_prompt = generation_prompt.replace("$char_bio_inserted", f"1. CHARACTER BIO (Must be strictly followed): {char_info.character_bio}") reference_assets = await self.dao.assets.get_assets_by_ids(generation.assets_list) @@ -304,7 +307,9 @@ class GenerationService: # 5. (Опционально) Обновляем запись генерации ссылками на результаты # Предполагаем, что у модели Generation есть поле result_asset_ids - result_ids = [a.id for a in created_assets] + result_ids = [] + for a in created_assets: + result_ids.append(a.id) generation.result_list = result_ids generation.status = GenerationStatus.DONE @@ -479,4 +484,26 @@ class GenerationService: if count > 0: logger.info(f"Cleaned up {count} stale generations (timeout)") except Exception as e: - logger.error(f"Error cleaning up stale generations: {e}") \ No newline at end of file + logger.error(f"Error cleaning up stale generations: {e}") + + async def cleanup_old_data(self, days: int = 2): + """ + Очистка старых данных: + 1. Мягко удаляет генерации старше N дней + 2. Мягко удаляет связанные ассеты + жёстко удаляет файлы из S3 + """ + try: + # 1. Мягко удаляем генерации и собираем asset IDs + gen_count, asset_ids = await self.dao.generations.soft_delete_old_generations(days=days) + + if gen_count > 0: + logger.info(f"Soft-deleted {gen_count} generations older than {days} days. " + f"Found {len(asset_ids)} associated asset IDs.") + + # 2. Мягко удаляем ассеты + жёстко удаляем файлы из S3 + if asset_ids: + purged = await self.dao.assets.soft_delete_and_purge_assets(asset_ids) + logger.info(f"Purged {purged} assets (soft-deleted + S3 files removed).") + + except Exception as e: + logger.error(f"Error during old data cleanup: {e}") \ No newline at end of file diff --git a/models/Asset.py b/models/Asset.py index ff4eeef..6c4d3dc 100644 --- a/models/Asset.py +++ b/models/Asset.py @@ -30,6 +30,7 @@ class Asset(BaseModel): tags: List[str] = [] created_by: Optional[str] = None project_id: Optional[str] = None + is_deleted: bool = False created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) diff --git a/models/__pycache__/Asset.cpython-313.pyc b/models/__pycache__/Asset.cpython-313.pyc index 90efbf5..e5c7d04 100644 Binary files a/models/__pycache__/Asset.cpython-313.pyc and b/models/__pycache__/Asset.cpython-313.pyc differ diff --git a/repos/__pycache__/assets_repo.cpython-313.pyc b/repos/__pycache__/assets_repo.cpython-313.pyc index 2dc1d3b..acfb672 100644 Binary files a/repos/__pycache__/assets_repo.cpython-313.pyc and b/repos/__pycache__/assets_repo.cpython-313.pyc differ diff --git a/repos/__pycache__/generation_repo.cpython-313.pyc b/repos/__pycache__/generation_repo.cpython-313.pyc index a18e31f..d7d90a6 100644 Binary files a/repos/__pycache__/generation_repo.cpython-313.pyc and b/repos/__pycache__/generation_repo.cpython-313.pyc differ diff --git a/repos/assets_repo.py b/repos/assets_repo.py index 651b8cb..93e7005 100644 --- a/repos/assets_repo.py +++ b/repos/assets_repo.py @@ -1,5 +1,6 @@ -from typing import List, Optional +from typing import Any, List, Optional import logging +from datetime import datetime, UTC from bson import ObjectId from uuid import uuid4 from motor.motor_asyncio import AsyncIOMotorClient @@ -50,7 +51,7 @@ class AssetsRepo: return str(res.inserted_id) async def get_assets(self, asset_type: Optional[str] = None, limit: int = 10, offset: int = 0, with_data: bool = False, created_by: Optional[str] = None, project_id: Optional[str] = None) -> List[Asset]: - filter = {} + filter: dict[str, Any]= {"is_deleted": {"$ne": True}} if asset_type: filter["type"] = asset_type args = {} @@ -202,6 +203,61 @@ class AssetsRepo: res = await self.collection.delete_one({"_id": ObjectId(asset_id)}) return res.deleted_count > 0 + async def soft_delete_and_purge_assets(self, asset_ids: List[str]) -> int: + """ + Мягко удаляет ассеты и жёстко удаляет их файлы из S3. + Возвращает количество обработанных ассетов. + """ + if not asset_ids: + return 0 + + object_ids = [ObjectId(aid) for aid in asset_ids if ObjectId.is_valid(aid)] + if not object_ids: + return 0 + + # Находим ассеты, которые ещё не удалены + cursor = self.collection.find( + {"_id": {"$in": object_ids}, "is_deleted": {"$ne": True}}, + {"minio_object_name": 1, "minio_thumbnail_object_name": 1} + ) + + purged_count = 0 + ids_to_update = [] + + async for doc in cursor: + ids_to_update.append(doc["_id"]) + + # Жёсткое удаление файлов из S3 + if self.s3: + if doc.get("minio_object_name"): + try: + await self.s3.delete_file(doc["minio_object_name"]) + except Exception as e: + logger.error(f"Failed to delete S3 object {doc['minio_object_name']}: {e}") + if doc.get("minio_thumbnail_object_name"): + try: + await self.s3.delete_file(doc["minio_thumbnail_object_name"]) + except Exception as e: + logger.error(f"Failed to delete S3 thumbnail {doc['minio_thumbnail_object_name']}: {e}") + + purged_count += 1 + + # Мягкое удаление + очистка ссылок на S3 + if ids_to_update: + await self.collection.update_many( + {"_id": {"$in": ids_to_update}}, + { + "$set": { + "is_deleted": True, + "minio_object_name": None, + "minio_thumbnail_object_name": None, + "updated_at": datetime.now(UTC) + } + } + ) + + return purged_count + async def migrate_to_minio(self) -> dict: """Переносит данные и thumbnails из Mongo в MinIO.""" if not self.s3: diff --git a/repos/generation_repo.py b/repos/generation_repo.py index f77ceee..5132eaa 100644 --- a/repos/generation_repo.py +++ b/repos/generation_repo.py @@ -1,4 +1,4 @@ -from typing import Optional, List +from typing import Any, Optional, List from datetime import datetime, timedelta, UTC from PIL.ImageChops import offset @@ -17,7 +17,7 @@ class GenerationRepo: res = await self.collection.insert_one(generation.model_dump()) return str(res.inserted_id) - async def get_generation(self, generation_id: str) -> Optional[Generation]: + async def get_generation(self, generation_id: str) -> Generation | None: res = await self.collection.find_one({"_id": ObjectId(generation_id)}) if res is None: return None @@ -28,7 +28,7 @@ class GenerationRepo: async def get_generations(self, character_id: Optional[str] = None, status: Optional[GenerationStatus] = None, limit: int = 10, offset: int = 0, created_by: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> List[Generation]: - filter = {"is_deleted": False} + filter: dict[str, Any] = {"is_deleted": False} if character_id is not None: filter["linked_character_id"] = character_id if status is not None: @@ -69,6 +69,8 @@ class GenerationRepo: args["project_id"] = project_id if idea_id is not None: args["idea_id"] = idea_id + if album_id is not None: + args["album_id"] = album_id return await self.collection.count_documents(args) async def get_generations_by_ids(self, generation_ids: List[str]) -> List[Generation]: @@ -114,3 +116,37 @@ class GenerationRepo: } ) return res.modified_count + + async def soft_delete_old_generations(self, days: int = 2) -> tuple[int, List[str]]: + """ + Мягко удаляет генерации старше N дней. + Возвращает (количество удалённых, список asset IDs для очистки). + """ + cutoff_time = datetime.now(UTC) - timedelta(days=days) + filter_query = { + "is_deleted": False, + "status": {"$in": [GenerationStatus.DONE, GenerationStatus.FAILED]}, + "created_at": {"$lt": cutoff_time} + } + + # Сначала собираем asset IDs из удаляемых генераций + asset_ids: List[str] = [] + cursor = self.collection.find(filter_query, {"result_list": 1, "assets_list": 1}) + async for doc in cursor: + asset_ids.extend(doc.get("result_list", [])) + asset_ids.extend(doc.get("assets_list", [])) + + # Мягкое удаление + res = await self.collection.update_many( + filter_query, + { + "$set": { + "is_deleted": True, + "updated_at": datetime.now(UTC) + } + } + ) + + # Убираем дубликаты + unique_asset_ids = list(set(asset_ids)) + return res.modified_count, unique_asset_ids