Files
ai-char-bot/api/endpoints/assets_router.py
2026-02-12 15:32:28 +03:00

330 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Optional, Dict, Any
from aiogram.types import BufferedInputFile
from bson import ObjectId
from fastapi import APIRouter, UploadFile, File, Form, Depends
from fastapi.openapi.models import MediaType
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient
from starlette import status
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import Response, JSONResponse, StreamingResponse
from adapters.s3_adapter import S3Adapter
from api.models.AssetDTO import AssetsResponse, AssetResponse
from models.Asset import Asset, AssetType, AssetContentType
from repos.dao import DAO
from api.dependency import get_dao, get_mongo_client, get_s3_adapter
import asyncio
import logging
logger = logging.getLogger(__name__)
from api.endpoints.auth import get_current_user
from api.dependency import get_project_id
router = APIRouter(prefix="/api/assets", tags=["Assets"])
@router.get("/{asset_id}")
async def get_asset(
asset_id: str,
request: Request,
thumbnail: bool = False,
dao: DAO = Depends(get_dao),
s3_adapter: S3Adapter = Depends(get_s3_adapter),
) -> Response:
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
# Загружаем только метаданные (без data/thumbnail bytes)
asset = await dao.assets.get_asset(asset_id, with_data=False)
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
headers = {
"Cache-Control": "public, max-age=31536000, immutable"
}
# Thumbnail: маленький, можно грузить в RAM
if thumbnail:
if asset.minio_thumbnail_object_name and s3_adapter:
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
if thumb_bytes:
return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers)
# Fallback: thumbnail in DB
if asset.thumbnail:
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers)
# No thumbnail available — fall through to main content
# Main content: стримим из S3 без загрузки в RAM
if asset.minio_object_name and s3_adapter:
content_type = "image/png"
# if asset.content_type == AssetContentType.VIDEO:
# content_type = "video/mp4"
return StreamingResponse(
s3_adapter.stream_file(asset.minio_object_name),
media_type=content_type,
headers=headers,
)
# Fallback: data stored in DB (legacy)
if asset.data:
return Response(content=asset.data, media_type="image/png", headers=headers)
raise HTTPException(status_code=404, detail="Asset data not found")
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
async def delete_orphan_assets_from_minio(
mongo: AsyncIOMotorClient = Depends(get_mongo_client),
minio_client: S3Adapter = Depends(get_s3_adapter),
*,
assets_collection: str = "assets",
generations_collection: str = "generations",
asset_type: Optional[str] = "generated",
project_id: Optional[str] = None,
dry_run: bool = True,
mark_assets_deleted: bool = False,
batch_size: int = 500,
) -> Dict[str, Any]:
db = mongo['bot_db'] # БД уже выбрана в get_mongo_client
assets = db[assets_collection]
match_assets: Dict[str, Any] = {}
if asset_type is not None:
match_assets["type"] = asset_type
if project_id is not None:
match_assets["project_id"] = project_id
pipeline: List[Dict[str, Any]] = [
{"$match": match_assets} if match_assets else {"$match": {}},
{
"$lookup": {
"from": generations_collection,
"let": {"assetIdStr": {"$toString": "$_id"}},
"pipeline": [
# считаем "живыми" те, где is_deleted != True (т.е. false или поля нет)
{"$match": {"is_deleted": {"$ne": True}}},
{
"$match": {
"$expr": {
"$in": [
"$$assetIdStr",
{"$ifNull": ["$result_list", []]},
]
}
}
},
{"$limit": 1},
],
"as": "alive_generations",
}
},
{
"$match": {
"$expr": {"$eq": [{"$size": "$alive_generations"}, 0]}
}
},
{
"$project": {
"_id": 1,
"minio_object_name": 1,
"minio_thumbnail_object_name": 1,
}
},
]
print(pipeline)
cursor = assets.aggregate(pipeline, allowDiskUse=True, batchSize=batch_size)
deleted_objects = 0
deleted_assets = 0
errors: List[Dict[str, Any]] = []
orphan_asset_ids: List[ObjectId] = []
async for asset in cursor:
aid = asset["_id"]
obj = asset.get("minio_object_name")
thumb = asset.get("minio_thumbnail_object_name")
orphan_asset_ids.append(aid)
if dry_run:
print(f"[DRY RUN] orphan asset={aid} obj={obj} thumb={thumb}")
continue
try:
if obj:
await minio_client.delete_file(obj)
deleted_objects += 1
if thumb:
await minio_client.delete_file(thumb)
deleted_objects += 1
deleted_assets += 1
except Exception as e:
errors.append({"asset_id": str(aid), "error": str(e)})
if (not dry_run) and mark_assets_deleted and orphan_asset_ids:
res = await assets.update_many(
{"_id": {"$in": orphan_asset_ids}},
{"$set": {"is_deleted": True}},
)
marked = res.modified_count
else:
marked = 0
return {
"dry_run": dry_run,
"filter": {
"asset_type": asset_type,
"project_id": project_id,
},
"orphans_found": len(orphan_asset_ids),
"deleted_assets": deleted_assets,
"deleted_objects": deleted_objects,
"marked_assets_deleted": marked,
"errors": errors,
}
@router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_current_user)])
async def delete_asset(
asset_id: str,
dao: DAO = Depends(get_dao)
):
logger.info(f"delete_asset called for ID: {asset_id}")
# 1. Проверяем наличие (опционально, delete_one вернет false если нет, но для 404 нужно знать)
# Можно просто попробовать удалить
deleted = await dao.assets.delete_asset(asset_id)
if not deleted:
raise HTTPException(status_code=404, detail="Asset not found")
logger.info(f"Asset {asset_id} deleted successfully")
return None
@router.get("", dependencies=[Depends(get_current_user)])
async def get_assets(request: Request, dao: DAO = Depends(get_dao), type: Optional[str] = None, limit: int = 10, offset: int = 0, current_user: dict = Depends(get_current_user), project_id: Optional[str] = Depends(get_project_id)) -> AssetsResponse:
logger.info(f"get_assets called. Limit: {limit}, Offset: {offset}")
user_id_filter = current_user["id"]
if project_id:
project = await dao.projects.get_project(project_id)
if not project or str(current_user["id"]) not in project.members:
raise HTTPException(status_code=403, detail="Project access denied")
user_id_filter = None
assets = await dao.assets.get_assets(type, limit, offset, created_by=user_id_filter, project_id=project_id)
# assets = await dao.assets.get_assets() # This line seemed redundant/conflicting in original code
total_count = await dao.assets.get_asset_count(created_by=user_id_filter, project_id=project_id)
# Manually map to DTO to trigger computed fields validation if necessary,
# but primarily to ensure valid Pydantic models for the response list.
# Asset.model_dump() generally includes computed fields (url) if configured.
# Let's ensure strict conversion.
asset_responses = [AssetResponse.model_validate(a.model_dump()) for a in assets]
return AssetsResponse(assets=asset_responses, total_count=total_count)
@router.post("/upload", response_model=AssetResponse, status_code=status.HTTP_201_CREATED)
async def upload_asset(
file: UploadFile = File(...),
linked_char_id: Optional[str] = Form(None),
dao: DAO = Depends(get_dao),
current_user: dict = Depends(get_current_user),
project_id: Optional[str] = Depends(get_project_id)
):
logger.info(f"upload_asset called. Filename: {file.filename}, ContentType: {file.content_type}, LinkedCharId: {linked_char_id}")
if not file.content_type:
raise HTTPException(status_code=400, detail="Unknown file type")
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail=f"Unsupported content type: {file.content_type}")
if project_id:
project = await dao.projects.get_project(project_id)
if not project or str(current_user["_id"]) not in project.members:
raise HTTPException(status_code=403, detail="Project access denied")
data = await file.read()
if not data:
raise HTTPException(status_code=400, detail="Empty file")
# Generate thumbnail
from utils.image_utils import create_thumbnail
thumbnail_bytes = await asyncio.to_thread(create_thumbnail, data)
asset = Asset(
name=file.filename or "upload",
type=AssetType.UPLOADED,
content_type=AssetContentType.IMAGE,
linked_char_id=linked_char_id,
data=data,
thumbnail=thumbnail_bytes,
created_by=str(current_user["_id"]),
project_id=project_id,
)
asset_id = await dao.assets.create_asset(asset)
asset.id = str(asset_id)
logger.info(f"Asset created successfully. ID: {asset_id}")
return AssetResponse(
id=asset.id,
name=asset.name,
type=asset.type.value if hasattr(asset.type, "value") else asset.type,
content_type=asset.content_type.value if hasattr(asset.content_type, "value") else asset.content_type,
linked_char_id=asset.linked_char_id,
created_at=asset.created_at,
url=asset.url
)
@router.post("/regenerate_thumbnails", dependencies=[Depends(get_current_user)])
async def regenerate_thumbnails(dao: DAO = Depends(get_dao)):
"""
Regenerates thumbnails for all existing image assets that don't have one.
"""
logger.info("Starting thumbnail regeneration task")
from utils.image_utils import create_thumbnail
import asyncio
# Get all assets (pagination loop might be needed for huge datasets, but simple list for now)
# We'll rely on DAO providing a method or just fetch large chunk.
# Assuming get_assets might have limit, let's fetch in chunks or just all if possible within limit.
# Ideally should use a specific repo method for iteration.
# For now, let's fetch first 1000 or similar.
assets = await dao.assets.get_assets(limit=1000, offset=0, with_data=True)
logger.info(f"Found {len(assets)} assets")
count = 0
updated = 0
for asset in assets:
if asset.content_type == AssetContentType.IMAGE and asset.data :
try:
thumb = await asyncio.to_thread(create_thumbnail, asset.data)
if thumb:
asset.thumbnail = thumb
await dao.assets.update_asset(asset.id, asset)
updated += 1
except Exception as e:
logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}")
count += 1
return {"status": "completed", "processed": count, "updated": updated}
@router.post("/migrate_to_minio", dependencies=[Depends(get_current_user)])
async def migrate_to_minio(dao: DAO = Depends(get_dao)):
"""
Migrates assets from MongoDB to MinIO.
"""
logger.info("Starting migration to MinIO")
result = await dao.assets.migrate_to_minio()
logger.info(f"Migration result: {result}")
return result