330 lines
12 KiB
Python
330 lines
12 KiB
Python
from typing import List, Optional, Dict, Any
|
||
|
||
from aiogram.types import BufferedInputFile
|
||
from bson import ObjectId
|
||
from fastapi import APIRouter, UploadFile, File, Form, Depends
|
||
from fastapi.openapi.models import MediaType
|
||
from motor.motor_asyncio import AsyncIOMotorClient
|
||
from pymongo import MongoClient
|
||
from starlette import status
|
||
from starlette.exceptions import HTTPException
|
||
from starlette.requests import Request
|
||
from starlette.responses import Response, JSONResponse, StreamingResponse
|
||
|
||
from adapters.s3_adapter import S3Adapter
|
||
from api.models.AssetDTO import AssetsResponse, AssetResponse
|
||
from models.Asset import Asset, AssetType, AssetContentType
|
||
from repos.dao import DAO
|
||
from api.dependency import get_dao, get_mongo_client, get_s3_adapter
|
||
import asyncio
|
||
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
from api.endpoints.auth import get_current_user
|
||
from api.dependency import get_project_id
|
||
|
||
router = APIRouter(prefix="/api/assets", tags=["Assets"])
|
||
|
||
|
||
@router.get("/{asset_id}")
|
||
async def get_asset(
|
||
asset_id: str,
|
||
request: Request,
|
||
thumbnail: bool = False,
|
||
dao: DAO = Depends(get_dao),
|
||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
||
) -> Response:
|
||
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
|
||
# Загружаем только метаданные (без data/thumbnail bytes)
|
||
asset = await dao.assets.get_asset(asset_id, with_data=False)
|
||
if not asset:
|
||
raise HTTPException(status_code=404, detail="Asset not found")
|
||
|
||
headers = {
|
||
"Cache-Control": "public, max-age=31536000, immutable"
|
||
}
|
||
|
||
# Thumbnail: маленький, можно грузить в RAM
|
||
if thumbnail:
|
||
if asset.minio_thumbnail_object_name and s3_adapter:
|
||
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
|
||
if thumb_bytes:
|
||
return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers)
|
||
# Fallback: thumbnail in DB
|
||
if asset.thumbnail:
|
||
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers)
|
||
# No thumbnail available — fall through to main content
|
||
|
||
# Main content: стримим из S3 без загрузки в RAM
|
||
if asset.minio_object_name and s3_adapter:
|
||
content_type = "image/png"
|
||
# if asset.content_type == AssetContentType.VIDEO:
|
||
# content_type = "video/mp4"
|
||
return StreamingResponse(
|
||
s3_adapter.stream_file(asset.minio_object_name),
|
||
media_type=content_type,
|
||
headers=headers,
|
||
)
|
||
|
||
# Fallback: data stored in DB (legacy)
|
||
if asset.data:
|
||
return Response(content=asset.data, media_type="image/png", headers=headers)
|
||
|
||
raise HTTPException(status_code=404, detail="Asset data not found")
|
||
|
||
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
|
||
async def delete_orphan_assets_from_minio(
|
||
mongo: AsyncIOMotorClient = Depends(get_mongo_client),
|
||
minio_client: S3Adapter = Depends(get_s3_adapter),
|
||
*,
|
||
assets_collection: str = "assets",
|
||
generations_collection: str = "generations",
|
||
asset_type: Optional[str] = "generated",
|
||
project_id: Optional[str] = None,
|
||
dry_run: bool = True,
|
||
mark_assets_deleted: bool = False,
|
||
batch_size: int = 500,
|
||
) -> Dict[str, Any]:
|
||
db = mongo['bot_db'] # БД уже выбрана в get_mongo_client
|
||
assets = db[assets_collection]
|
||
|
||
match_assets: Dict[str, Any] = {}
|
||
if asset_type is not None:
|
||
match_assets["type"] = asset_type
|
||
if project_id is not None:
|
||
match_assets["project_id"] = project_id
|
||
|
||
pipeline: List[Dict[str, Any]] = [
|
||
{"$match": match_assets} if match_assets else {"$match": {}},
|
||
{
|
||
"$lookup": {
|
||
"from": generations_collection,
|
||
"let": {"assetIdStr": {"$toString": "$_id"}},
|
||
"pipeline": [
|
||
# считаем "живыми" те, где is_deleted != True (т.е. false или поля нет)
|
||
{"$match": {"is_deleted": {"$ne": True}}},
|
||
{
|
||
"$match": {
|
||
"$expr": {
|
||
"$in": [
|
||
"$$assetIdStr",
|
||
{"$ifNull": ["$result_list", []]},
|
||
]
|
||
}
|
||
}
|
||
},
|
||
{"$limit": 1},
|
||
],
|
||
"as": "alive_generations",
|
||
}
|
||
},
|
||
{
|
||
"$match": {
|
||
"$expr": {"$eq": [{"$size": "$alive_generations"}, 0]}
|
||
}
|
||
},
|
||
{
|
||
"$project": {
|
||
"_id": 1,
|
||
"minio_object_name": 1,
|
||
"minio_thumbnail_object_name": 1,
|
||
}
|
||
},
|
||
]
|
||
print(pipeline)
|
||
cursor = assets.aggregate(pipeline, allowDiskUse=True, batchSize=batch_size)
|
||
|
||
deleted_objects = 0
|
||
deleted_assets = 0
|
||
errors: List[Dict[str, Any]] = []
|
||
orphan_asset_ids: List[ObjectId] = []
|
||
|
||
async for asset in cursor:
|
||
aid = asset["_id"]
|
||
obj = asset.get("minio_object_name")
|
||
thumb = asset.get("minio_thumbnail_object_name")
|
||
|
||
orphan_asset_ids.append(aid)
|
||
|
||
if dry_run:
|
||
print(f"[DRY RUN] orphan asset={aid} obj={obj} thumb={thumb}")
|
||
continue
|
||
|
||
try:
|
||
if obj:
|
||
await minio_client.delete_file(obj)
|
||
deleted_objects += 1
|
||
|
||
if thumb:
|
||
await minio_client.delete_file(thumb)
|
||
deleted_objects += 1
|
||
|
||
deleted_assets += 1
|
||
|
||
except Exception as e:
|
||
errors.append({"asset_id": str(aid), "error": str(e)})
|
||
|
||
if (not dry_run) and mark_assets_deleted and orphan_asset_ids:
|
||
res = await assets.update_many(
|
||
{"_id": {"$in": orphan_asset_ids}},
|
||
{"$set": {"is_deleted": True}},
|
||
)
|
||
marked = res.modified_count
|
||
else:
|
||
marked = 0
|
||
|
||
return {
|
||
"dry_run": dry_run,
|
||
"filter": {
|
||
"asset_type": asset_type,
|
||
"project_id": project_id,
|
||
},
|
||
"orphans_found": len(orphan_asset_ids),
|
||
"deleted_assets": deleted_assets,
|
||
"deleted_objects": deleted_objects,
|
||
"marked_assets_deleted": marked,
|
||
"errors": errors,
|
||
}
|
||
|
||
@router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_current_user)])
|
||
async def delete_asset(
|
||
asset_id: str,
|
||
dao: DAO = Depends(get_dao)
|
||
):
|
||
logger.info(f"delete_asset called for ID: {asset_id}")
|
||
# 1. Проверяем наличие (опционально, delete_one вернет false если нет, но для 404 нужно знать)
|
||
# Можно просто попробовать удалить
|
||
deleted = await dao.assets.delete_asset(asset_id)
|
||
if not deleted:
|
||
raise HTTPException(status_code=404, detail="Asset not found")
|
||
|
||
logger.info(f"Asset {asset_id} deleted successfully")
|
||
return None
|
||
|
||
|
||
@router.get("", dependencies=[Depends(get_current_user)])
|
||
async def get_assets(request: Request, dao: DAO = Depends(get_dao), type: Optional[str] = None, limit: int = 10, offset: int = 0, current_user: dict = Depends(get_current_user), project_id: Optional[str] = Depends(get_project_id)) -> AssetsResponse:
|
||
logger.info(f"get_assets called. Limit: {limit}, Offset: {offset}")
|
||
|
||
user_id_filter = current_user["id"]
|
||
if project_id:
|
||
project = await dao.projects.get_project(project_id)
|
||
if not project or str(current_user["id"]) not in project.members:
|
||
raise HTTPException(status_code=403, detail="Project access denied")
|
||
user_id_filter = None
|
||
|
||
assets = await dao.assets.get_assets(type, limit, offset, created_by=user_id_filter, project_id=project_id)
|
||
# assets = await dao.assets.get_assets() # This line seemed redundant/conflicting in original code
|
||
total_count = await dao.assets.get_asset_count(created_by=user_id_filter, project_id=project_id)
|
||
|
||
# Manually map to DTO to trigger computed fields validation if necessary,
|
||
# but primarily to ensure valid Pydantic models for the response list.
|
||
# Asset.model_dump() generally includes computed fields (url) if configured.
|
||
# Let's ensure strict conversion.
|
||
asset_responses = [AssetResponse.model_validate(a.model_dump()) for a in assets]
|
||
|
||
return AssetsResponse(assets=asset_responses, total_count=total_count)
|
||
|
||
|
||
|
||
@router.post("/upload", response_model=AssetResponse, status_code=status.HTTP_201_CREATED)
|
||
async def upload_asset(
|
||
file: UploadFile = File(...),
|
||
linked_char_id: Optional[str] = Form(None),
|
||
dao: DAO = Depends(get_dao),
|
||
current_user: dict = Depends(get_current_user),
|
||
project_id: Optional[str] = Depends(get_project_id)
|
||
):
|
||
logger.info(f"upload_asset called. Filename: {file.filename}, ContentType: {file.content_type}, LinkedCharId: {linked_char_id}")
|
||
if not file.content_type:
|
||
raise HTTPException(status_code=400, detail="Unknown file type")
|
||
|
||
if not file.content_type.startswith("image/"):
|
||
raise HTTPException(status_code=400, detail=f"Unsupported content type: {file.content_type}")
|
||
|
||
if project_id:
|
||
project = await dao.projects.get_project(project_id)
|
||
if not project or str(current_user["_id"]) not in project.members:
|
||
raise HTTPException(status_code=403, detail="Project access denied")
|
||
|
||
data = await file.read()
|
||
if not data:
|
||
raise HTTPException(status_code=400, detail="Empty file")
|
||
|
||
# Generate thumbnail
|
||
from utils.image_utils import create_thumbnail
|
||
thumbnail_bytes = await asyncio.to_thread(create_thumbnail, data)
|
||
|
||
asset = Asset(
|
||
name=file.filename or "upload",
|
||
type=AssetType.UPLOADED,
|
||
content_type=AssetContentType.IMAGE,
|
||
linked_char_id=linked_char_id,
|
||
data=data,
|
||
thumbnail=thumbnail_bytes,
|
||
created_by=str(current_user["_id"]),
|
||
project_id=project_id,
|
||
)
|
||
|
||
asset_id = await dao.assets.create_asset(asset)
|
||
asset.id = str(asset_id)
|
||
logger.info(f"Asset created successfully. ID: {asset_id}")
|
||
|
||
return AssetResponse(
|
||
id=asset.id,
|
||
name=asset.name,
|
||
type=asset.type.value if hasattr(asset.type, "value") else asset.type,
|
||
content_type=asset.content_type.value if hasattr(asset.content_type, "value") else asset.content_type,
|
||
linked_char_id=asset.linked_char_id,
|
||
created_at=asset.created_at,
|
||
url=asset.url
|
||
)
|
||
|
||
|
||
@router.post("/regenerate_thumbnails", dependencies=[Depends(get_current_user)])
|
||
async def regenerate_thumbnails(dao: DAO = Depends(get_dao)):
|
||
"""
|
||
Regenerates thumbnails for all existing image assets that don't have one.
|
||
"""
|
||
logger.info("Starting thumbnail regeneration task")
|
||
from utils.image_utils import create_thumbnail
|
||
import asyncio
|
||
|
||
# Get all assets (pagination loop might be needed for huge datasets, but simple list for now)
|
||
# We'll rely on DAO providing a method or just fetch large chunk.
|
||
# Assuming get_assets might have limit, let's fetch in chunks or just all if possible within limit.
|
||
# Ideally should use a specific repo method for iteration.
|
||
# For now, let's fetch first 1000 or similar.
|
||
assets = await dao.assets.get_assets(limit=1000, offset=0, with_data=True)
|
||
logger.info(f"Found {len(assets)} assets")
|
||
count = 0
|
||
updated = 0
|
||
|
||
for asset in assets:
|
||
if asset.content_type == AssetContentType.IMAGE and asset.data :
|
||
try:
|
||
thumb = await asyncio.to_thread(create_thumbnail, asset.data)
|
||
if thumb:
|
||
asset.thumbnail = thumb
|
||
await dao.assets.update_asset(asset.id, asset)
|
||
updated += 1
|
||
except Exception as e:
|
||
logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}")
|
||
count += 1
|
||
|
||
|
||
return {"status": "completed", "processed": count, "updated": updated}
|
||
|
||
@router.post("/migrate_to_minio", dependencies=[Depends(get_current_user)])
|
||
async def migrate_to_minio(dao: DAO = Depends(get_dao)):
|
||
"""
|
||
Migrates assets from MongoDB to MinIO.
|
||
"""
|
||
logger.info("Starting migration to MinIO")
|
||
result = await dao.assets.migrate_to_minio()
|
||
logger.info(f"Migration result: {result}")
|
||
return result
|
||
|