feat: Implement asset soft deletion with S3 file purging, enhance type safety, and improve error handling in generation and adapter services.
This commit is contained in:
Binary file not shown.
Binary file not shown.
@@ -23,10 +23,10 @@ class GoogleAdapter:
|
|||||||
self.TEXT_MODEL = "gemini-3-pro-preview"
|
self.TEXT_MODEL = "gemini-3-pro-preview"
|
||||||
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
||||||
|
|
||||||
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> tuple:
|
def _prepare_contents(self, prompt: str, images_list: List[bytes] | None = None) -> tuple:
|
||||||
"""Вспомогательный метод для подготовки контента (текст + картинки).
|
"""Вспомогательный метод для подготовки контента (текст + картинки).
|
||||||
Returns (contents, opened_images) — caller MUST close opened_images after use."""
|
Returns (contents, opened_images) — caller MUST close opened_images after use."""
|
||||||
contents = [prompt]
|
contents : list [Any]= [prompt]
|
||||||
opened_images = []
|
opened_images = []
|
||||||
if images_list:
|
if images_list:
|
||||||
logger.info(f"Preparing content with {len(images_list)} images")
|
logger.info(f"Preparing content with {len(images_list)} images")
|
||||||
@@ -41,7 +41,7 @@ class GoogleAdapter:
|
|||||||
logger.info("Preparing content with no images")
|
logger.info("Preparing content with no images")
|
||||||
return contents, opened_images
|
return contents, opened_images
|
||||||
|
|
||||||
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
|
def generate_text(self, prompt: str, images_list: List[bytes] | None = None) -> str:
|
||||||
"""
|
"""
|
||||||
Генерация текста (Чат или Vision).
|
Генерация текста (Чат или Vision).
|
||||||
Возвращает строку с ответом.
|
Возвращает строку с ответом.
|
||||||
@@ -74,7 +74,7 @@ class GoogleAdapter:
|
|||||||
for img in opened_images:
|
for img in opened_images:
|
||||||
img.close()
|
img.close()
|
||||||
|
|
||||||
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] | None = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Генерация изображений (Text-to-Image или Image-to-Image).
|
Генерация изображений (Text-to-Image или Image-to-Image).
|
||||||
Возвращает список байтовых потоков (готовых к отправке).
|
Возвращает список байтовых потоков (готовых к отправке).
|
||||||
@@ -130,7 +130,9 @@ class GoogleAdapter:
|
|||||||
try:
|
try:
|
||||||
# 1. Берем сырые байты
|
# 1. Берем сырые байты
|
||||||
raw_data = part.inline_data.data
|
raw_data = part.inline_data.data
|
||||||
byte_arr = io.BytesIO(raw_data)
|
if raw_data is None:
|
||||||
|
raise GoogleGenerationException("Generation returned no data")
|
||||||
|
byte_arr : io.BytesIO = io.BytesIO(raw_data)
|
||||||
|
|
||||||
# 2. Нейминг (формально, для TG)
|
# 2. Нейминг (формально, для TG)
|
||||||
timestamp = datetime.now().timestamp()
|
timestamp = datetime.now().timestamp()
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ class S3Adapter:
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _get_client(self):
|
async def _get_client(self):
|
||||||
async with self.session.client(
|
async with self.session.client( # type: ignore[reportGeneralTypeIssues]
|
||||||
"s3",
|
"s3",
|
||||||
endpoint_url=self.endpoint_url,
|
endpoint_url=self.endpoint_url,
|
||||||
aws_access_key_id=self.aws_access_key_id,
|
aws_access_key_id=self.aws_access_key_id,
|
||||||
|
|||||||
11
aiws.py
11
aiws.py
@@ -64,6 +64,8 @@ def setup_logging():
|
|||||||
|
|
||||||
|
|
||||||
# --- ИНИЦИАЛИЗАЦИЯ ЗАВИСИМОСТЕЙ ---
|
# --- ИНИЦИАЛИЗАЦИЯ ЗАВИСИМОСТЕЙ ---
|
||||||
|
if BOT_TOKEN is None:
|
||||||
|
raise ValueError("BOT_TOKEN is not set")
|
||||||
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
||||||
|
|
||||||
# Клиент БД создаем глобально, чтобы он был доступен и боту (Storage), и API
|
# Клиент БД создаем глобально, чтобы он был доступен и боту (Storage), и API
|
||||||
@@ -83,8 +85,12 @@ s3_adapter = S3Adapter(
|
|||||||
)
|
)
|
||||||
|
|
||||||
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
||||||
|
if GEMINI_API_KEY is None:
|
||||||
|
raise ValueError("GEMINI_API_KEY is not set")
|
||||||
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
||||||
generation_service = GenerationService(dao, gemini, bot)
|
if bot is None:
|
||||||
|
raise ValueError("bot is not set")
|
||||||
|
generation_service = GenerationService(dao=dao, gemini=gemini, s3_adapter=s3_adapter, bot=bot)
|
||||||
album_service = AlbumService(dao)
|
album_service = AlbumService(dao)
|
||||||
|
|
||||||
# Dispatcher
|
# Dispatcher
|
||||||
@@ -126,11 +132,12 @@ async def start_scheduler(service: GenerationService):
|
|||||||
try:
|
try:
|
||||||
logger.info("Running scheduler for stacked generation killing")
|
logger.info("Running scheduler for stacked generation killing")
|
||||||
await service.cleanup_stale_generations()
|
await service.cleanup_stale_generations()
|
||||||
|
await service.cleanup_old_data(days=2)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Scheduler error: {e}")
|
logger.error(f"Scheduler error: {e}")
|
||||||
await asyncio.sleep(60) # Check every 10 minutes
|
await asyncio.sleep(60) # Check every 60 seconds
|
||||||
|
|
||||||
# --- LIFESPAN (Запуск FastAPI + Bot) ---
|
# --- LIFESPAN (Запуск FastAPI + Bot) ---
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
|||||||
Binary file not shown.
@@ -5,6 +5,7 @@ from motor.motor_asyncio import AsyncIOMotorClient
|
|||||||
from adapters.google_adapter import GoogleAdapter
|
from adapters.google_adapter import GoogleAdapter
|
||||||
from api.service.generation_service import GenerationService
|
from api.service.generation_service import GenerationService
|
||||||
from repos.dao import DAO
|
from repos.dao import DAO
|
||||||
|
from api.service.album_service import AlbumService
|
||||||
|
|
||||||
|
|
||||||
# ... ваши импорты ...
|
# ... ваши импорты ...
|
||||||
@@ -53,4 +54,7 @@ def get_idea_service(dao: DAO = Depends(get_dao)) -> IdeaService:
|
|||||||
from fastapi import Header
|
from fastapi import Header
|
||||||
|
|
||||||
async def get_project_id(x_project_id: Optional[str] = Header(None, alias="X-Project-ID")) -> Optional[str]:
|
async def get_project_id(x_project_id: Optional[str] = Header(None, alias="X-Project-ID")) -> Optional[str]:
|
||||||
return x_project_id
|
return x_project_id
|
||||||
|
|
||||||
|
async def get_album_service(dao: DAO = Depends(get_dao)) -> AlbumService:
|
||||||
|
return AlbumService(dao)
|
||||||
Binary file not shown.
@@ -23,7 +23,7 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], repo:
|
|||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||||
username: str = payload.get("sub")
|
username: str | None = payload.get("sub")
|
||||||
if username is None:
|
if username is None:
|
||||||
raise credentials_exception
|
raise credentials_exception
|
||||||
except JWTError:
|
except JWTError:
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
from fastapi import APIRouter, HTTPException, status, Request
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from api.models.GenerationRequest import GenerationResponse
|
from api.models.GenerationRequest import GenerationResponse
|
||||||
from models.Album import Album
|
from models.Album import Album
|
||||||
from repos.dao import DAO
|
from repos.dao import DAO
|
||||||
|
from api.dependency import get_album_service
|
||||||
|
from api.service.album_service import AlbumService
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/albums", tags=["Albums"])
|
router = APIRouter(prefix="/api/albums", tags=["Albums"])
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
@@ -77,7 +77,7 @@ class GenerationService:
|
|||||||
self.bot = bot
|
self.bot = bot
|
||||||
|
|
||||||
|
|
||||||
async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str:
|
async def ask_prompt_assistant(self, prompt: str, assets: list[str] | None = None) -> str:
|
||||||
future_prompt = """You are an prompt-assistant. You improving user-entered prompts for image generation. User may upload reference image too.
|
future_prompt = """You are an prompt-assistant. You improving user-entered prompts for image generation. User may upload reference image too.
|
||||||
I will provide sources prompt entered by user. Understand user needs and generate best variation of prompt.
|
I will provide sources prompt entered by user. Understand user needs and generate best variation of prompt.
|
||||||
ANSWER ONLY PROMPT STRING!!! USER_ENTERED_PROMPT: """
|
ANSWER ONLY PROMPT STRING!!! USER_ENTERED_PROMPT: """
|
||||||
@@ -157,8 +157,9 @@ class GenerationService:
|
|||||||
# если генерация уже пошла и упала — пометим FAILED
|
# если генерация уже пошла и упала — пометим FAILED
|
||||||
try:
|
try:
|
||||||
db_gen = await self.dao.generations.get_generation(gen.id)
|
db_gen = await self.dao.generations.get_generation(gen.id)
|
||||||
db_gen.status = GenerationStatus.FAILED
|
if db_gen is not None:
|
||||||
await self.dao.generations.update_generation(db_gen)
|
db_gen.status = GenerationStatus.FAILED
|
||||||
|
await self.dao.generations.update_generation(db_gen)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to mark generation as FAILED")
|
logger.exception("Failed to mark generation as FAILED")
|
||||||
logger.exception("create_generation task failed")
|
logger.exception("create_generation task failed")
|
||||||
@@ -172,8 +173,9 @@ class GenerationService:
|
|||||||
if gen_id is not None:
|
if gen_id is not None:
|
||||||
try:
|
try:
|
||||||
gen = await self.dao.generations.get_generation(gen_id)
|
gen = await self.dao.generations.get_generation(gen_id)
|
||||||
gen.status = GenerationStatus.FAILED
|
if gen is not None:
|
||||||
await self.dao.generations.update_generation(gen)
|
gen.status = GenerationStatus.FAILED
|
||||||
|
await self.dao.generations.update_generation(gen)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to mark generation as FAILED in create_generation_task")
|
logger.exception("Failed to mark generation as FAILED in create_generation_task")
|
||||||
raise
|
raise
|
||||||
@@ -201,9 +203,10 @@ class GenerationService:
|
|||||||
if char_info is None:
|
if char_info is None:
|
||||||
raise Exception(f"Character ID {generation.linked_character_id} not found")
|
raise Exception(f"Character ID {generation.linked_character_id} not found")
|
||||||
if generation.use_profile_image:
|
if generation.use_profile_image:
|
||||||
avatar_asset = await self.dao.assets.get_asset(char_info.avatar_asset_id)
|
if char_info.avatar_asset_id is not None:
|
||||||
if avatar_asset:
|
avatar_asset = await self.dao.assets.get_asset(char_info.avatar_asset_id)
|
||||||
media_group_bytes.append(avatar_asset.data)
|
if avatar_asset and avatar_asset.data:
|
||||||
|
media_group_bytes.append(avatar_asset.data)
|
||||||
# generation_prompt = generation_prompt.replace("$char_bio_inserted", f"1. CHARACTER BIO (Must be strictly followed): {char_info.character_bio}")
|
# generation_prompt = generation_prompt.replace("$char_bio_inserted", f"1. CHARACTER BIO (Must be strictly followed): {char_info.character_bio}")
|
||||||
|
|
||||||
reference_assets = await self.dao.assets.get_assets_by_ids(generation.assets_list)
|
reference_assets = await self.dao.assets.get_assets_by_ids(generation.assets_list)
|
||||||
@@ -304,7 +307,9 @@ class GenerationService:
|
|||||||
|
|
||||||
# 5. (Опционально) Обновляем запись генерации ссылками на результаты
|
# 5. (Опционально) Обновляем запись генерации ссылками на результаты
|
||||||
# Предполагаем, что у модели Generation есть поле result_asset_ids
|
# Предполагаем, что у модели Generation есть поле result_asset_ids
|
||||||
result_ids = [a.id for a in created_assets]
|
result_ids = []
|
||||||
|
for a in created_assets:
|
||||||
|
result_ids.append(a.id)
|
||||||
|
|
||||||
generation.result_list = result_ids
|
generation.result_list = result_ids
|
||||||
generation.status = GenerationStatus.DONE
|
generation.status = GenerationStatus.DONE
|
||||||
@@ -479,4 +484,26 @@ class GenerationService:
|
|||||||
if count > 0:
|
if count > 0:
|
||||||
logger.info(f"Cleaned up {count} stale generations (timeout)")
|
logger.info(f"Cleaned up {count} stale generations (timeout)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error cleaning up stale generations: {e}")
|
logger.error(f"Error cleaning up stale generations: {e}")
|
||||||
|
|
||||||
|
async def cleanup_old_data(self, days: int = 2):
|
||||||
|
"""
|
||||||
|
Очистка старых данных:
|
||||||
|
1. Мягко удаляет генерации старше N дней
|
||||||
|
2. Мягко удаляет связанные ассеты + жёстко удаляет файлы из S3
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 1. Мягко удаляем генерации и собираем asset IDs
|
||||||
|
gen_count, asset_ids = await self.dao.generations.soft_delete_old_generations(days=days)
|
||||||
|
|
||||||
|
if gen_count > 0:
|
||||||
|
logger.info(f"Soft-deleted {gen_count} generations older than {days} days. "
|
||||||
|
f"Found {len(asset_ids)} associated asset IDs.")
|
||||||
|
|
||||||
|
# 2. Мягко удаляем ассеты + жёстко удаляем файлы из S3
|
||||||
|
if asset_ids:
|
||||||
|
purged = await self.dao.assets.soft_delete_and_purge_assets(asset_ids)
|
||||||
|
logger.info(f"Purged {purged} assets (soft-deleted + S3 files removed).")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during old data cleanup: {e}")
|
||||||
@@ -30,6 +30,7 @@ class Asset(BaseModel):
|
|||||||
tags: List[str] = []
|
tags: List[str] = []
|
||||||
created_by: Optional[str] = None
|
created_by: Optional[str] = None
|
||||||
project_id: Optional[str] = None
|
project_id: Optional[str] = None
|
||||||
|
is_deleted: bool = False
|
||||||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,5 +1,6 @@
|
|||||||
from typing import List, Optional
|
from typing import Any, List, Optional
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import datetime, UTC
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
@@ -50,7 +51,7 @@ class AssetsRepo:
|
|||||||
return str(res.inserted_id)
|
return str(res.inserted_id)
|
||||||
|
|
||||||
async def get_assets(self, asset_type: Optional[str] = None, limit: int = 10, offset: int = 0, with_data: bool = False, created_by: Optional[str] = None, project_id: Optional[str] = None) -> List[Asset]:
|
async def get_assets(self, asset_type: Optional[str] = None, limit: int = 10, offset: int = 0, with_data: bool = False, created_by: Optional[str] = None, project_id: Optional[str] = None) -> List[Asset]:
|
||||||
filter = {}
|
filter: dict[str, Any]= {"is_deleted": {"$ne": True}}
|
||||||
if asset_type:
|
if asset_type:
|
||||||
filter["type"] = asset_type
|
filter["type"] = asset_type
|
||||||
args = {}
|
args = {}
|
||||||
@@ -202,6 +203,61 @@ class AssetsRepo:
|
|||||||
res = await self.collection.delete_one({"_id": ObjectId(asset_id)})
|
res = await self.collection.delete_one({"_id": ObjectId(asset_id)})
|
||||||
return res.deleted_count > 0
|
return res.deleted_count > 0
|
||||||
|
|
||||||
|
async def soft_delete_and_purge_assets(self, asset_ids: List[str]) -> int:
|
||||||
|
"""
|
||||||
|
Мягко удаляет ассеты и жёстко удаляет их файлы из S3.
|
||||||
|
Возвращает количество обработанных ассетов.
|
||||||
|
"""
|
||||||
|
if not asset_ids:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
object_ids = [ObjectId(aid) for aid in asset_ids if ObjectId.is_valid(aid)]
|
||||||
|
if not object_ids:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Находим ассеты, которые ещё не удалены
|
||||||
|
cursor = self.collection.find(
|
||||||
|
{"_id": {"$in": object_ids}, "is_deleted": {"$ne": True}},
|
||||||
|
{"minio_object_name": 1, "minio_thumbnail_object_name": 1}
|
||||||
|
)
|
||||||
|
|
||||||
|
purged_count = 0
|
||||||
|
ids_to_update = []
|
||||||
|
|
||||||
|
async for doc in cursor:
|
||||||
|
ids_to_update.append(doc["_id"])
|
||||||
|
|
||||||
|
# Жёсткое удаление файлов из S3
|
||||||
|
if self.s3:
|
||||||
|
if doc.get("minio_object_name"):
|
||||||
|
try:
|
||||||
|
await self.s3.delete_file(doc["minio_object_name"])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete S3 object {doc['minio_object_name']}: {e}")
|
||||||
|
if doc.get("minio_thumbnail_object_name"):
|
||||||
|
try:
|
||||||
|
await self.s3.delete_file(doc["minio_thumbnail_object_name"])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete S3 thumbnail {doc['minio_thumbnail_object_name']}: {e}")
|
||||||
|
|
||||||
|
purged_count += 1
|
||||||
|
|
||||||
|
# Мягкое удаление + очистка ссылок на S3
|
||||||
|
if ids_to_update:
|
||||||
|
await self.collection.update_many(
|
||||||
|
{"_id": {"$in": ids_to_update}},
|
||||||
|
{
|
||||||
|
"$set": {
|
||||||
|
"is_deleted": True,
|
||||||
|
"minio_object_name": None,
|
||||||
|
"minio_thumbnail_object_name": None,
|
||||||
|
"updated_at": datetime.now(UTC)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return purged_count
|
||||||
|
|
||||||
async def migrate_to_minio(self) -> dict:
|
async def migrate_to_minio(self) -> dict:
|
||||||
"""Переносит данные и thumbnails из Mongo в MinIO."""
|
"""Переносит данные и thumbnails из Mongo в MinIO."""
|
||||||
if not self.s3:
|
if not self.s3:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Optional, List
|
from typing import Any, Optional, List
|
||||||
from datetime import datetime, timedelta, UTC
|
from datetime import datetime, timedelta, UTC
|
||||||
|
|
||||||
from PIL.ImageChops import offset
|
from PIL.ImageChops import offset
|
||||||
@@ -17,7 +17,7 @@ class GenerationRepo:
|
|||||||
res = await self.collection.insert_one(generation.model_dump())
|
res = await self.collection.insert_one(generation.model_dump())
|
||||||
return str(res.inserted_id)
|
return str(res.inserted_id)
|
||||||
|
|
||||||
async def get_generation(self, generation_id: str) -> Optional[Generation]:
|
async def get_generation(self, generation_id: str) -> Generation | None:
|
||||||
res = await self.collection.find_one({"_id": ObjectId(generation_id)})
|
res = await self.collection.find_one({"_id": ObjectId(generation_id)})
|
||||||
if res is None:
|
if res is None:
|
||||||
return None
|
return None
|
||||||
@@ -28,7 +28,7 @@ class GenerationRepo:
|
|||||||
async def get_generations(self, character_id: Optional[str] = None, status: Optional[GenerationStatus] = None,
|
async def get_generations(self, character_id: Optional[str] = None, status: Optional[GenerationStatus] = None,
|
||||||
limit: int = 10, offset: int = 0, created_by: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> List[Generation]:
|
limit: int = 10, offset: int = 0, created_by: Optional[str] = None, project_id: Optional[str] = None, idea_id: Optional[str] = None) -> List[Generation]:
|
||||||
|
|
||||||
filter = {"is_deleted": False}
|
filter: dict[str, Any] = {"is_deleted": False}
|
||||||
if character_id is not None:
|
if character_id is not None:
|
||||||
filter["linked_character_id"] = character_id
|
filter["linked_character_id"] = character_id
|
||||||
if status is not None:
|
if status is not None:
|
||||||
@@ -69,6 +69,8 @@ class GenerationRepo:
|
|||||||
args["project_id"] = project_id
|
args["project_id"] = project_id
|
||||||
if idea_id is not None:
|
if idea_id is not None:
|
||||||
args["idea_id"] = idea_id
|
args["idea_id"] = idea_id
|
||||||
|
if album_id is not None:
|
||||||
|
args["album_id"] = album_id
|
||||||
return await self.collection.count_documents(args)
|
return await self.collection.count_documents(args)
|
||||||
|
|
||||||
async def get_generations_by_ids(self, generation_ids: List[str]) -> List[Generation]:
|
async def get_generations_by_ids(self, generation_ids: List[str]) -> List[Generation]:
|
||||||
@@ -114,3 +116,37 @@ class GenerationRepo:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
return res.modified_count
|
return res.modified_count
|
||||||
|
|
||||||
|
async def soft_delete_old_generations(self, days: int = 2) -> tuple[int, List[str]]:
|
||||||
|
"""
|
||||||
|
Мягко удаляет генерации старше N дней.
|
||||||
|
Возвращает (количество удалённых, список asset IDs для очистки).
|
||||||
|
"""
|
||||||
|
cutoff_time = datetime.now(UTC) - timedelta(days=days)
|
||||||
|
filter_query = {
|
||||||
|
"is_deleted": False,
|
||||||
|
"status": {"$in": [GenerationStatus.DONE, GenerationStatus.FAILED]},
|
||||||
|
"created_at": {"$lt": cutoff_time}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Сначала собираем asset IDs из удаляемых генераций
|
||||||
|
asset_ids: List[str] = []
|
||||||
|
cursor = self.collection.find(filter_query, {"result_list": 1, "assets_list": 1})
|
||||||
|
async for doc in cursor:
|
||||||
|
asset_ids.extend(doc.get("result_list", []))
|
||||||
|
asset_ids.extend(doc.get("assets_list", []))
|
||||||
|
|
||||||
|
# Мягкое удаление
|
||||||
|
res = await self.collection.update_many(
|
||||||
|
filter_query,
|
||||||
|
{
|
||||||
|
"$set": {
|
||||||
|
"is_deleted": True,
|
||||||
|
"updated_at": datetime.now(UTC)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Убираем дубликаты
|
||||||
|
unique_asset_ids = list(set(asset_ids))
|
||||||
|
return res.modified_count, unique_asset_ids
|
||||||
|
|||||||
Reference in New Issue
Block a user