from typing import Any, List, Optional import logging from datetime import datetime, UTC from bson import ObjectId from uuid import uuid4 from motor.motor_asyncio import AsyncIOMotorClient from models.Asset import Asset from adapters.s3_adapter import S3Adapter logger = logging.getLogger(__name__) class AssetsRepo: def __init__(self, client: AsyncIOMotorClient, s3_adapter: Optional[S3Adapter] = None, db_name="bot_db"): self.collection = client[db_name]["assets"] self.s3 = s3_adapter async def create_asset(self, asset: Asset) -> str: # Если есть S3 и данные - грузим в S3 if self.s3: # Main data if asset.data: ts = int(asset.created_at.timestamp()) uid = uuid4().hex[:8] object_name = f"{asset.type.value}/{ts}_{uid}_{asset.name}" uploaded = await self.s3.upload_file(object_name, asset.data) if uploaded: asset.minio_object_name = object_name asset.minio_bucket = self.s3.bucket_name asset.data = None # Clear data else: logger.error(f"Failed to upload asset {asset.name} to MinIO") # Thumbnail if asset.thumbnail: ts = int(asset.created_at.timestamp()) uid = uuid4().hex[:8] thumb_name = f"{asset.type.value}/thumbs/{ts}_{uid}_{asset.name}_thumb.jpg" uploaded_thumb = await self.s3.upload_file(thumb_name, asset.thumbnail) if uploaded_thumb: asset.minio_thumbnail_object_name = thumb_name asset.minio_bucket = self.s3.bucket_name # Assumes same bucket asset.thumbnail = None # Clear thumbnail data else: logger.error(f"Failed to upload thumbnail for {asset.name} to MinIO") res = await self.collection.insert_one(asset.model_dump()) return str(res.inserted_id) async def get_assets(self, asset_type: Optional[str] = None, limit: int = 10, offset: int = 0, with_data: bool = False, created_by: Optional[str] = None, project_id: Optional[str] = None) -> List[Asset]: filter: dict[str, Any]= {"is_deleted": {"$ne": True}} if asset_type: filter["type"] = asset_type args = {} if not with_data: args["data"] = 0 # We assume thumbnails are fetched only if needed or kept sparse. # If they are in MinIO, we don't fetch them by default list unless specifically asked? # User requirement "Get bytes ... from minio" usually refers to full asset. used in detail view. # In list view, we might want thumbnails. # If thumbnails are in MinIO, list view will be slow if we fetch all. # Usually we return a URL. But this bot might serve bytes. # Let's assuming list view needs thumbnails if they are small. # But if we moved them to S3, we probably don't want to fetch 10x S3 requests for list. # For now: If minio_thumbnail_object_name is present, user might need to fetch separately # or we fetch if `with_data` is True? # Standard pattern: return URL or ID. # Let's keep existing logic: args["thumbnail"] = 0 if not with_data. # EXCEPT if we want to show thumbnails in list. # Original code: # if not with_data: args["data"] = 0; args["thumbnail"] = 0 # So list DOES NOT return thumbnails by default. args["thumbnail"] = 0 if created_by: filter["created_by"] = created_by filter['project_id'] = None if project_id: filter["project_id"] = project_id res = await self.collection.find(filter, args).sort("created_at", -1).skip(offset).limit(limit).to_list(None) assets = [] for doc in res: doc["id"] = str(doc.pop("_id")) asset = Asset(**doc) if with_data and self.s3: # Fetch data if asset.minio_object_name: data = await self.s3.get_file(asset.minio_object_name) if data: asset.data = data # Fetch thumbnail if asset.minio_thumbnail_object_name: thumb = await self.s3.get_file(asset.minio_thumbnail_object_name) if thumb: asset.thumbnail = thumb assets.append(asset) return assets async def get_asset(self, asset_id: str, with_data: bool = True) -> Asset: projection = None if not with_data: projection = {"data": 0, "thumbnail": 0} res = await self.collection.find_one({"_id": ObjectId(asset_id)}, projection) if not res: return None res["id"] = str(res.pop("_id")) asset = Asset(**res) if with_data and self.s3: if asset.minio_object_name: data = await self.s3.get_file(asset.minio_object_name) if data: asset.data = data if asset.minio_thumbnail_object_name: thumb = await self.s3.get_file(asset.minio_thumbnail_object_name) if thumb: asset.thumbnail = thumb return asset async def update_asset(self, asset_id: str, asset: Asset): if not asset.id: if asset_id: asset.id = asset_id else: raise Exception(f"Asset ID not found: {asset_id}") # NOTE: simplistic update. If asset has data/thumbnail bytes, we might need to upload? # Assuming for now we just save what's given. # If user wants to update data, they should probably use a specialized method or we handle it here. # Let's handle it: If data/thumbnail is present AND we have S3, upload it. if self.s3: if asset.data: ts = int(asset.created_at.timestamp()) uid = uuid4().hex[:8] object_name = f"{asset.type.value}/{ts}_{uid}_{asset.name}" if await self.s3.upload_file(object_name, asset.data): asset.minio_object_name = object_name asset.minio_bucket = self.s3.bucket_name asset.data = None if asset.thumbnail: ts = int(asset.created_at.timestamp()) uid = uuid4().hex[:8] thumb_name = f"{asset.type.value}/thumbs/{ts}_{uid}_{asset.name}_thumb.jpg" if await self.s3.upload_file(thumb_name, asset.thumbnail): asset.minio_thumbnail_object_name = thumb_name asset.thumbnail = None model_dump = asset.model_dump() await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": model_dump}) async def set_tg_photo_file_id(self, asset_id: str, tg_photo_file_id: str): await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": {"tg_photo_file_id": tg_photo_file_id}}) async def get_assets_by_char_id(self, character_id: str, limit: int = 10, offset: int = 0) -> List[Asset]: docs = await self.collection.find({"linked_char_id": character_id}, {"data": 0}, sort=[("created_at", -1)]).limit(limit).skip(offset).to_list( None) assets = [] for doc in docs: doc["id"] = str(doc.pop("_id")) assets.append(Asset(**doc)) return assets async def get_asset_count(self, character_id: Optional[str] = None, created_by: Optional[str] = None, project_id: Optional[str] = None) -> int: filter = {} if character_id: filter["linked_char_id"] = character_id if created_by: filter["created_by"] = created_by if project_id is None: filter["project_id"] = None if project_id: filter["project_id"] = project_id return await self.collection.count_documents(filter) async def get_assets_by_ids(self, asset_ids: List[str]) -> List[Asset]: object_ids = [ObjectId(asset_id) for asset_id in asset_ids] res = self.collection.find({"_id": {"$in": object_ids}}, {"data": 0}) # Exclude data but maybe allow thumbnail if small? # Original excluded thumbnail too. assets = [] async for doc in res: doc["id"] = str(doc.pop("_id")) assets.append(Asset(**doc)) return assets async def delete_asset(self, asset_id: str) -> bool: asset_doc = await self.collection.find_one({"_id": ObjectId(asset_id)}) if not asset_doc: return False if self.s3: if asset_doc.get("minio_object_name"): await self.s3.delete_file(asset_doc["minio_object_name"]) if asset_doc.get("minio_thumbnail_object_name"): await self.s3.delete_file(asset_doc["minio_thumbnail_object_name"]) res = await self.collection.delete_one({"_id": ObjectId(asset_id)}) return res.deleted_count > 0 async def soft_delete_and_purge_assets(self, asset_ids: List[str]) -> int: """ Мягко удаляет ассеты и жёстко удаляет их файлы из S3. Возвращает количество обработанных ассетов. """ if not asset_ids: return 0 object_ids = [ObjectId(aid) for aid in asset_ids if ObjectId.is_valid(aid)] if not object_ids: return 0 # Находим ассеты, которые ещё не удалены cursor = self.collection.find( {"_id": {"$in": object_ids}, "is_deleted": {"$ne": True}}, {"minio_object_name": 1, "minio_thumbnail_object_name": 1} ) purged_count = 0 ids_to_update = [] async for doc in cursor: ids_to_update.append(doc["_id"]) # Жёсткое удаление файлов из S3 if self.s3: if doc.get("minio_object_name"): try: await self.s3.delete_file(doc["minio_object_name"]) except Exception as e: logger.error(f"Failed to delete S3 object {doc['minio_object_name']}: {e}") if doc.get("minio_thumbnail_object_name"): try: await self.s3.delete_file(doc["minio_thumbnail_object_name"]) except Exception as e: logger.error(f"Failed to delete S3 thumbnail {doc['minio_thumbnail_object_name']}: {e}") purged_count += 1 # Мягкое удаление + очистка ссылок на S3 if ids_to_update: await self.collection.update_many( {"_id": {"$in": ids_to_update}}, { "$set": { "is_deleted": True, "minio_object_name": None, "minio_thumbnail_object_name": None, "updated_at": datetime.now(UTC) } } ) return purged_count async def migrate_to_minio(self) -> dict: """Переносит данные и thumbnails из Mongo в MinIO.""" if not self.s3: return {"error": "MinIO adapter not initialized"} # 1. Migrate Data cursor_data = self.collection.find({"data": {"$ne": None}, "minio_object_name": {"$eq": None}}) count_data = 0 errors_data = 0 async for doc in cursor_data: try: asset_id = doc["_id"] data = doc.get("data") name = doc.get("name", "unknown") type_ = doc.get("type", "image") created_at = doc.get("created_at") ts = int(created_at.timestamp()) if created_at else 0 uid = uuid4().hex[:8] object_name = f"{type_}/{ts}_{uid}_{asset_id}_{name}" if await self.s3.upload_file(object_name, data): await self.collection.update_one( {"_id": asset_id}, {"$set": {"minio_object_name": object_name, "minio_bucket": self.s3.bucket_name, "data": None}} ) count_data += 1 else: errors_data += 1 except Exception as e: logger.error(f"Data migration error for {doc.get('_id')}: {e}") errors_data += 1 # 2. Migrate Thumbnails cursor_thumb = self.collection.find({"thumbnail": {"$ne": None}, "minio_thumbnail_object_name": {"$eq": None}}) count_thumb = 0 errors_thumb = 0 async for doc in cursor_thumb: try: asset_id = doc["_id"] thumb = doc.get("thumbnail") name = doc.get("name", "unknown") type_ = doc.get("type", "image") created_at = doc.get("created_at") ts = int(created_at.timestamp()) if created_at else 0 uid = uuid4().hex[:8] thumb_name = f"{type_}/thumbs/{ts}_{uid}_{asset_id}_{name}_thumb.jpg" if await self.s3.upload_file(thumb_name, thumb): await self.collection.update_one( {"_id": asset_id}, {"$set": {"minio_thumbnail_object_name": thumb_name, "minio_bucket": self.s3.bucket_name, "thumbnail": None}} ) count_thumb += 1 else: errors_thumb += 1 except Exception as e: logger.error(f"Thumbnail migration error for {doc.get('_id')}: {e}") errors_thumb += 1 return { "migrated_data": count_data, "errors_data": errors_data, "migrated_thumbnails": count_thumb, "errors_thumbnails": errors_thumb }