383 lines
14 KiB
Python
383 lines
14 KiB
Python
from typing import Any
|
||
|
||
from aiogram.types import BufferedInputFile
|
||
from bson import ObjectId
|
||
from fastapi import APIRouter, UploadFile, File, Form, Depends
|
||
from fastapi.openapi.models import MediaType
|
||
from motor.motor_asyncio import AsyncIOMotorClient
|
||
from pymongo import MongoClient
|
||
from starlette import status
|
||
from starlette.exceptions import HTTPException
|
||
from starlette.requests import Request
|
||
from starlette.responses import Response, JSONResponse, StreamingResponse
|
||
|
||
from adapters.s3_adapter import S3Adapter
|
||
from api.models import AssetsResponse, AssetResponse
|
||
from models.Asset import Asset, AssetType, AssetContentType
|
||
from repos.dao import DAO
|
||
from api.dependency import get_dao, get_mongo_client, get_s3_adapter
|
||
import asyncio
|
||
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
from api.endpoints.auth import get_current_user
|
||
from api.dependency import get_project_id
|
||
|
||
router = APIRouter(prefix="/api/assets", tags=["Assets"])
|
||
|
||
|
||
@router.get("/{asset_id}")
|
||
async def get_asset(
|
||
asset_id: str,
|
||
request: Request,
|
||
thumbnail: bool = False,
|
||
dao: DAO = Depends(get_dao),
|
||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
||
) -> Response:
|
||
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
|
||
# Загружаем только метаданные (без data/thumbnail bytes)
|
||
asset = await dao.assets.get_asset(asset_id, with_data=False)
|
||
if not asset:
|
||
raise HTTPException(status_code=404, detail="Asset not found")
|
||
|
||
base_headers = {
|
||
"Cache-Control": "public, max-age=31536000, immutable",
|
||
"Accept-Ranges": "bytes"
|
||
}
|
||
|
||
# Thumbnail: маленький, можно грузить в RAM
|
||
if thumbnail:
|
||
if asset.minio_thumbnail_object_name and s3_adapter:
|
||
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
|
||
if thumb_bytes:
|
||
return Response(content=thumb_bytes, media_type="image/jpeg", headers=base_headers)
|
||
# Fallback: thumbnail in DB
|
||
if asset.thumbnail:
|
||
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=base_headers)
|
||
# No thumbnail available — fall through to main content
|
||
|
||
# Main content: стримим из S3 без загрузки в RAM
|
||
if asset.minio_object_name and s3_adapter:
|
||
content_type = "image/png"
|
||
if asset.content_type == AssetContentType.VIDEO:
|
||
content_type = "video/mp4" # Or detect from extension if stored
|
||
elif asset.content_type == AssetContentType.IMAGE:
|
||
content_type = "image/png" # Default for images
|
||
|
||
# Better content type detection based on extension if possible, but for now this is okay
|
||
if asset.minio_object_name.endswith(".mp4"):
|
||
content_type = "video/mp4"
|
||
elif asset.minio_object_name.endswith(".mov"):
|
||
content_type = "video/quicktime"
|
||
elif asset.minio_object_name.endswith(".jpg") or asset.minio_object_name.endswith(".jpeg"):
|
||
content_type = "image/jpeg"
|
||
|
||
# Handle Range requests for video streaming
|
||
range_header = request.headers.get("range")
|
||
file_size = await s3_adapter.get_file_size(asset.minio_object_name)
|
||
|
||
if range_header and file_size:
|
||
try:
|
||
# Parse Range header: bytes=start-end
|
||
byte_range = range_header.replace("bytes=", "")
|
||
start_str, end_str = byte_range.split("-")
|
||
start = int(start_str)
|
||
end = int(end_str) if end_str else file_size - 1
|
||
|
||
# Validate range
|
||
if start >= file_size:
|
||
# 416 Range Not Satisfiable
|
||
return Response(status_code=416, headers={"Content-Range": f"bytes */{file_size}"})
|
||
|
||
chunk_size = end - start + 1
|
||
|
||
headers = base_headers.copy()
|
||
headers.update({
|
||
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
||
"Content-Length": str(chunk_size),
|
||
})
|
||
|
||
# Pass the exact range string to S3
|
||
s3_range = f"bytes={start}-{end}"
|
||
|
||
return StreamingResponse(
|
||
s3_adapter.stream_file(asset.minio_object_name, range_header=s3_range),
|
||
status_code=206,
|
||
headers=headers,
|
||
media_type=content_type
|
||
)
|
||
except ValueError:
|
||
pass # Fallback to full content if range parsing fails
|
||
|
||
# Full content response
|
||
headers = base_headers.copy()
|
||
if file_size:
|
||
headers["Content-Length"] = str(file_size)
|
||
|
||
return StreamingResponse(
|
||
s3_adapter.stream_file(asset.minio_object_name),
|
||
media_type=content_type,
|
||
headers=headers,
|
||
)
|
||
|
||
# Fallback: data stored in DB (legacy)
|
||
if asset.data:
|
||
return Response(content=asset.data, media_type="image/png", headers=base_headers)
|
||
|
||
raise HTTPException(status_code=404, detail="Asset data not found")
|
||
|
||
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
|
||
async def delete_orphan_assets_from_minio(
|
||
mongo: AsyncIOMotorClient = Depends(get_mongo_client),
|
||
minio_client: S3Adapter = Depends(get_s3_adapter),
|
||
*,
|
||
assets_collection: str = "assets",
|
||
generations_collection: str = "generations",
|
||
asset_type: str | None = "generated",
|
||
project_id: str | None = None,
|
||
dry_run: bool = True,
|
||
mark_assets_deleted: bool = False,
|
||
batch_size: int = 500,
|
||
) -> dict[str, Any]:
|
||
db = mongo['bot_db'] # БД уже выбрана в get_mongo_client
|
||
assets = db[assets_collection]
|
||
|
||
match_assets: dict[str, Any] = {}
|
||
if asset_type is not None:
|
||
match_assets["type"] = asset_type
|
||
if project_id is not None:
|
||
match_assets["project_id"] = project_id
|
||
|
||
pipeline: list[dict[str, Any]] = [
|
||
{"$match": match_assets} if match_assets else {"$match": {}},
|
||
{
|
||
"$lookup": {
|
||
"from": generations_collection,
|
||
"let": {"assetIdStr": {"$toString": "$_id"}},
|
||
"pipeline": [
|
||
# считаем "живыми" те, где is_deleted != True (т.е. false или поля нет)
|
||
{"$match": {"is_deleted": {"$ne": True}}},
|
||
{
|
||
"$match": {
|
||
"$expr": {
|
||
"$in": [
|
||
"$$assetIdStr",
|
||
{"$ifNull": ["$result_list", []]},
|
||
]
|
||
}
|
||
}
|
||
},
|
||
{"$limit": 1},
|
||
],
|
||
"as": "alive_generations",
|
||
}
|
||
},
|
||
{
|
||
"$match": {
|
||
"$expr": {"$eq": [{"$size": "$alive_generations"}, 0]}
|
||
}
|
||
},
|
||
{
|
||
"$project": {
|
||
"_id": 1,
|
||
"minio_object_name": 1,
|
||
"minio_thumbnail_object_name": 1,
|
||
}
|
||
},
|
||
]
|
||
print(pipeline)
|
||
cursor = assets.aggregate(pipeline, allowDiskUse=True, batchSize=batch_size)
|
||
|
||
deleted_objects = 0
|
||
deleted_assets = 0
|
||
errors: list[dict[str, Any]] = []
|
||
orphan_asset_ids: list[ObjectId] = []
|
||
|
||
async for asset in cursor:
|
||
aid = asset["_id"]
|
||
obj = asset.get("minio_object_name")
|
||
thumb = asset.get("minio_thumbnail_object_name")
|
||
|
||
orphan_asset_ids.append(aid)
|
||
|
||
if dry_run:
|
||
print(f"[DRY RUN] orphan asset={aid} obj={obj} thumb={thumb}")
|
||
continue
|
||
|
||
try:
|
||
if obj:
|
||
await minio_client.delete_file(obj)
|
||
deleted_objects += 1
|
||
|
||
if thumb:
|
||
await minio_client.delete_file(thumb)
|
||
deleted_objects += 1
|
||
|
||
deleted_assets += 1
|
||
|
||
except Exception as e:
|
||
errors.append({"asset_id": str(aid), "error": str(e)})
|
||
|
||
if (not dry_run) and mark_assets_deleted and orphan_asset_ids:
|
||
res = await assets.update_many(
|
||
{"_id": {"$in": orphan_asset_ids}},
|
||
{"$set": {"is_deleted": True}},
|
||
)
|
||
marked = res.modified_count
|
||
else:
|
||
marked = 0
|
||
|
||
return {
|
||
"dry_run": dry_run,
|
||
"filter": {
|
||
"asset_type": asset_type,
|
||
"project_id": project_id,
|
||
},
|
||
"orphans_found": len(orphan_asset_ids),
|
||
"deleted_assets": deleted_assets,
|
||
"deleted_objects": deleted_objects,
|
||
"marked_assets_deleted": marked,
|
||
"errors": errors,
|
||
}
|
||
|
||
@router.delete("/{asset_id}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_current_user)])
|
||
async def delete_asset(
|
||
asset_id: str,
|
||
dao: DAO = Depends(get_dao)
|
||
):
|
||
logger.info(f"delete_asset called for ID: {asset_id}")
|
||
# 1. Проверяем наличие (опционально, delete_one вернет false если нет, но для 404 нужно знать)
|
||
# Можно просто попробовать удалить
|
||
deleted = await dao.assets.delete_asset(asset_id)
|
||
if not deleted:
|
||
raise HTTPException(status_code=404, detail="Asset not found")
|
||
|
||
logger.info(f"Asset {asset_id} deleted successfully")
|
||
return None
|
||
|
||
|
||
@router.get("", dependencies=[Depends(get_current_user)])
|
||
async def get_assets(request: Request, dao: DAO = Depends(get_dao), type: str | None = None, limit: int = 10, offset: int = 0, current_user: dict = Depends(get_current_user), project_id: str | None = Depends(get_project_id)) -> AssetsResponse:
|
||
logger.info(f"get_assets called. Limit: {limit}, Offset: {offset}")
|
||
|
||
user_id_filter = current_user["id"]
|
||
if project_id:
|
||
project = await dao.projects.get_project(project_id)
|
||
if not project or str(current_user["id"]) not in project.members:
|
||
raise HTTPException(status_code=403, detail="Project access denied")
|
||
user_id_filter = None
|
||
|
||
assets = await dao.assets.get_assets(type, limit, offset, created_by=user_id_filter, project_id=project_id)
|
||
# assets = await dao.assets.get_assets() # This line seemed redundant/conflicting in original code
|
||
total_count = await dao.assets.get_asset_count(created_by=user_id_filter, project_id=project_id)
|
||
|
||
# Manually map to DTO to trigger computed fields validation if necessary,
|
||
# but primarily to ensure valid Pydantic models for the response list.
|
||
# Asset.model_dump() generally includes computed fields (url) if configured.
|
||
# Let's ensure strict conversion.
|
||
asset_responses = [AssetResponse.model_validate(a.model_dump()) for a in assets]
|
||
|
||
return AssetsResponse(assets=asset_responses, total_count=total_count)
|
||
|
||
|
||
|
||
@router.post("/upload", response_model=AssetResponse, status_code=status.HTTP_201_CREATED)
|
||
async def upload_asset(
|
||
file: UploadFile = File(...),
|
||
linked_char_id: str | None = Form(None),
|
||
dao: DAO = Depends(get_dao),
|
||
current_user: dict = Depends(get_current_user),
|
||
project_id: str | None = Depends(get_project_id)
|
||
):
|
||
logger.info(f"upload_asset called. Filename: {file.filename}, ContentType: {file.content_type}, LinkedCharId: {linked_char_id}")
|
||
if not file.content_type:
|
||
raise HTTPException(status_code=400, detail="Unknown file type")
|
||
|
||
if not file.content_type.startswith("image/"):
|
||
raise HTTPException(status_code=400, detail=f"Unsupported content type: {file.content_type}")
|
||
|
||
if project_id:
|
||
project = await dao.projects.get_project(project_id)
|
||
if not project or str(current_user["_id"]) not in project.members:
|
||
raise HTTPException(status_code=403, detail="Project access denied")
|
||
|
||
data = await file.read()
|
||
if not data:
|
||
raise HTTPException(status_code=400, detail="Empty file")
|
||
|
||
# Generate thumbnail
|
||
from utils.image_utils import create_thumbnail
|
||
thumbnail_bytes = await asyncio.to_thread(create_thumbnail, data)
|
||
|
||
asset = Asset(
|
||
name=file.filename or "upload",
|
||
type=AssetType.UPLOADED,
|
||
content_type=AssetContentType.IMAGE,
|
||
linked_char_id=linked_char_id,
|
||
data=data,
|
||
thumbnail=thumbnail_bytes,
|
||
created_by=str(current_user["_id"]),
|
||
project_id=project_id,
|
||
)
|
||
|
||
asset_id = await dao.assets.create_asset(asset)
|
||
asset.id = str(asset_id)
|
||
logger.info(f"Asset created successfully. ID: {asset_id}")
|
||
|
||
return AssetResponse(
|
||
id=asset.id,
|
||
name=asset.name,
|
||
type=asset.type.value if hasattr(asset.type, "value") else asset.type,
|
||
content_type=asset.content_type.value if hasattr(asset.content_type, "value") else asset.content_type,
|
||
linked_char_id=asset.linked_char_id,
|
||
created_at=asset.created_at
|
||
)
|
||
|
||
|
||
@router.post("/regenerate_thumbnails", dependencies=[Depends(get_current_user)])
|
||
async def regenerate_thumbnails(dao: DAO = Depends(get_dao)):
|
||
"""
|
||
Regenerates thumbnails for all existing image assets that don't have one.
|
||
"""
|
||
logger.info("Starting thumbnail regeneration task")
|
||
from utils.image_utils import create_thumbnail
|
||
import asyncio
|
||
|
||
# Get all assets (pagination loop might be needed for huge datasets, but simple list for now)
|
||
# We'll rely on DAO providing a method or just fetch large chunk.
|
||
# Assuming get_assets might have limit, let's fetch in chunks or just all if possible within limit.
|
||
# Ideally should use a specific repo method for iteration.
|
||
# For now, let's fetch first 1000 or similar.
|
||
assets = await dao.assets.get_assets(limit=1000, offset=0, with_data=True)
|
||
logger.info(f"Found {len(assets)} assets")
|
||
count = 0
|
||
updated = 0
|
||
|
||
for asset in assets:
|
||
if asset.content_type == AssetContentType.IMAGE and asset.data :
|
||
try:
|
||
thumb = await asyncio.to_thread(create_thumbnail, asset.data)
|
||
if thumb:
|
||
asset.thumbnail = thumb
|
||
await dao.assets.update_asset(asset.id, asset)
|
||
updated += 1
|
||
except Exception as e:
|
||
logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}")
|
||
count += 1
|
||
|
||
|
||
return {"status": "completed", "processed": count, "updated": updated}
|
||
|
||
@router.post("/migrate_to_minio", dependencies=[Depends(get_current_user)])
|
||
async def migrate_to_minio(dao: DAO = Depends(get_dao)):
|
||
"""
|
||
Migrates assets from MongoDB to MinIO.
|
||
"""
|
||
logger.info("Starting migration to MinIO")
|
||
result = await dao.assets.migrate_to_minio()
|
||
logger.info(f"Migration result: {result}")
|
||
return result
|
||
|