from typing import List, Optional, Dict, Any from aiogram.types import BufferedInputFile from bson import ObjectId from fastapi import APIRouter, UploadFile, File, Form, Depends from fastapi.openapi.models import MediaType from motor.motor_asyncio import AsyncIOMotorClient from pymongo import MongoClient from starlette import status from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import Response, JSONResponse, StreamingResponse from adapters.s3_adapter import S3Adapter from api.models import AssetsResponse, AssetResponse from models.Asset import Asset, AssetType, AssetContentType from repos.dao import DAO from api.dependency import get_dao, get_mongo_client, get_s3_adapter import asyncio import logging logger = logging.getLogger(__name__) from api.endpoints.auth import get_current_user from api.dependency import get_project_id router = APIRouter(prefix="/api/assets", tags=["Assets"]) @router.get("/{asset_id}") async def get_asset( asset_id: str, request: Request, thumbnail: bool = False, dao: DAO = Depends(get_dao), s3_adapter: S3Adapter = Depends(get_s3_adapter), ) -> Response: logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}") # Загружаем только метаданные (без data/thumbnail bytes) asset = await dao.assets.get_asset(asset_id, with_data=False) if not asset: raise HTTPException(status_code=404, detail="Asset not found") headers = { "Cache-Control": "public, max-age=31536000, immutable" } # Thumbnail: маленький, можно грузить в RAM if thumbnail: if asset.minio_thumbnail_object_name and s3_adapter: thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name) if thumb_bytes: return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers) # Fallback: thumbnail in DB if asset.thumbnail: return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers) # No thumbnail available — fall through to main content # Main content: стримим из S3 без загрузки в RAM if asset.minio_object_name and s3_adapter: content_type = "image/png" # if asset.content_type == AssetContentType.VIDEO: # content_type = "video/mp4" return StreamingResponse( s3_adapter.stream_file(asset.minio_object_name), media_type=content_type, headers=headers, ) # Fallback: data stored in DB (legacy) if asset.data: return Response(content=asset.data, media_type="image/png", headers=headers) raise HTTPException(status_code=404, detail="Asset data not found") @router.delete("/orphans", dependencies=[Depends(get_current_user)]) async def delete_orphan_assets_from_minio( mongo: AsyncIOMotorClient = Depends(get_mongo_client), minio_client: S3Adapter = Depends(get_s3_adapter), *, assets_collection: str = "assets", generations_collection: str = "generations", asset_type: Optional[str] = "generated", project_id: Optional[str] = None, dry_run: bool = True, mark_assets_deleted: bool = False, batch_size: int = 500, ) -> Dict[str, Any]: db = mongo['bot_db'] # БД уже выбрана в get_mongo_client assets = db[assets_collection] match_assets: Dict[str, Any] = {} if asset_type is not None: match_assets["type"] = asset_type if project_id is not None: match_assets["project_id"] = project_id pipeline: List[Dict[str, Any]] = [ {"$match": match_assets} if match_assets else {"$match": {}}, { "$lookup": { "from": generations_collection, "let": {"assetIdStr": {"$toString": "$_id"}}, "pipeline": [ # считаем "живыми" те, где is_deleted != True (т.е. false или поля нет) {"$match": {"is_deleted": {"$ne": True}}}, { "$match": { "$expr": { "$in": [ "$$assetIdStr", {"$ifNull": ["$result_list", []]}, ] } } }, {"$limit": 1}, ], "as": "alive_generations", } }, { "$match": { "$expr": {"$eq": [{"$size": "$alive_generations"}, 0]} } }, { "$project": { "_id": 1, "minio_object_name": 1, "minio_thumbnail_object_name": 1, } }, ] print(pipeline) cursor = assets.aggregate(pipeline, allowDiskUse=True, batchSize=batch_size) deleted_objects = 0 deleted_assets = 0 errors: List[Dict[str, Any]] = [] orphan_asset_ids: List[ObjectId] = [] async for asset in cursor: aid = asset["_id"] obj = asset.get("minio_object_name") thumb = asset.get("minio_thumbnail_object_name") orphan_asset_ids.append(aid) if dry_run: print(f"[DRY RUN] orphan asset={aid} obj={obj} thumb={thumb}") continue try: if obj: await minio_client.delete_file(obj) deleted_objects += 1 if thumb: await minio_client.delete_file(thumb) deleted_objects += 1 deleted_assets += 1 except Exception as e: errors.append({"asset_id": str(aid), "error": str(e)}) if (not dry_run) and mark_assets_deleted and orphan_asset_ids: res = await assets.update_many( {"_id": {"$in": orphan_asset_ids}}, {"$set": {"is_deleted": True}}, ) marked = res.modified_count else: marked = 0 return { "dry_run": dry_run, "filter": { "asset_type": asset_type, "project_id": project_id, }, "orphans_found": len(orphan_asset_ids), "deleted_assets": deleted_assets, "deleted_objects": deleted_objects, "marked_assets_deleted": marked, "errors": errors, } @router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_current_user)]) async def delete_asset( asset_id: str, dao: DAO = Depends(get_dao) ): logger.info(f"delete_asset called for ID: {asset_id}") # 1. Проверяем наличие (опционально, delete_one вернет false если нет, но для 404 нужно знать) # Можно просто попробовать удалить deleted = await dao.assets.delete_asset(asset_id) if not deleted: raise HTTPException(status_code=404, detail="Asset not found") logger.info(f"Asset {asset_id} deleted successfully") return None @router.get("", dependencies=[Depends(get_current_user)]) async def get_assets(request: Request, dao: DAO = Depends(get_dao), type: Optional[str] = None, limit: int = 10, offset: int = 0, current_user: dict = Depends(get_current_user), project_id: Optional[str] = Depends(get_project_id)) -> AssetsResponse: logger.info(f"get_assets called. Limit: {limit}, Offset: {offset}") user_id_filter = current_user["id"] if project_id: project = await dao.projects.get_project(project_id) if not project or str(current_user["id"]) not in project.members: raise HTTPException(status_code=403, detail="Project access denied") user_id_filter = None assets = await dao.assets.get_assets(type, limit, offset, created_by=user_id_filter, project_id=project_id) # assets = await dao.assets.get_assets() # This line seemed redundant/conflicting in original code total_count = await dao.assets.get_asset_count(created_by=user_id_filter, project_id=project_id) # Manually map to DTO to trigger computed fields validation if necessary, # but primarily to ensure valid Pydantic models for the response list. # Asset.model_dump() generally includes computed fields (url) if configured. # Let's ensure strict conversion. asset_responses = [AssetResponse.model_validate(a.model_dump()) for a in assets] return AssetsResponse(assets=asset_responses, total_count=total_count) @router.post("/upload", response_model=AssetResponse, status_code=status.HTTP_201_CREATED) async def upload_asset( file: UploadFile = File(...), linked_char_id: Optional[str] = Form(None), dao: DAO = Depends(get_dao), current_user: dict = Depends(get_current_user), project_id: Optional[str] = Depends(get_project_id) ): logger.info(f"upload_asset called. Filename: {file.filename}, ContentType: {file.content_type}, LinkedCharId: {linked_char_id}") if not file.content_type: raise HTTPException(status_code=400, detail="Unknown file type") if not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail=f"Unsupported content type: {file.content_type}") if project_id: project = await dao.projects.get_project(project_id) if not project or str(current_user["_id"]) not in project.members: raise HTTPException(status_code=403, detail="Project access denied") data = await file.read() if not data: raise HTTPException(status_code=400, detail="Empty file") # Generate thumbnail from utils.image_utils import create_thumbnail thumbnail_bytes = await asyncio.to_thread(create_thumbnail, data) asset = Asset( name=file.filename or "upload", type=AssetType.UPLOADED, content_type=AssetContentType.IMAGE, linked_char_id=linked_char_id, data=data, thumbnail=thumbnail_bytes, created_by=str(current_user["_id"]), project_id=project_id, ) asset_id = await dao.assets.create_asset(asset) asset.id = str(asset_id) logger.info(f"Asset created successfully. ID: {asset_id}") return AssetResponse( id=asset.id, name=asset.name, type=asset.type.value if hasattr(asset.type, "value") else asset.type, content_type=asset.content_type.value if hasattr(asset.content_type, "value") else asset.content_type, linked_char_id=asset.linked_char_id, created_at=asset.created_at ) @router.post("/regenerate_thumbnails", dependencies=[Depends(get_current_user)]) async def regenerate_thumbnails(dao: DAO = Depends(get_dao)): """ Regenerates thumbnails for all existing image assets that don't have one. """ logger.info("Starting thumbnail regeneration task") from utils.image_utils import create_thumbnail import asyncio # Get all assets (pagination loop might be needed for huge datasets, but simple list for now) # We'll rely on DAO providing a method or just fetch large chunk. # Assuming get_assets might have limit, let's fetch in chunks or just all if possible within limit. # Ideally should use a specific repo method for iteration. # For now, let's fetch first 1000 or similar. assets = await dao.assets.get_assets(limit=1000, offset=0, with_data=True) logger.info(f"Found {len(assets)} assets") count = 0 updated = 0 for asset in assets: if asset.content_type == AssetContentType.IMAGE and asset.data : try: thumb = await asyncio.to_thread(create_thumbnail, asset.data) if thumb: asset.thumbnail = thumb await dao.assets.update_asset(asset.id, asset) updated += 1 except Exception as e: logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}") count += 1 return {"status": "completed", "processed": count, "updated": updated} @router.post("/migrate_to_minio", dependencies=[Depends(get_current_user)]) async def migrate_to_minio(dao: DAO = Depends(get_dao)): """ Migrates assets from MongoDB to MinIO. """ logger.info("Starting migration to MinIO") result = await dao.assets.migrate_to_minio() logger.info(f"Migration result: {result}") return result