252 lines
11 KiB
Python
252 lines
11 KiB
Python
from typing import List, Optional
|
|
import logging
|
|
from bson import ObjectId
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
from models.Asset import Asset
|
|
from adapters.s3_adapter import S3Adapter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AssetsRepo:
|
|
def __init__(self, client: AsyncIOMotorClient, s3_adapter: Optional[S3Adapter] = None, db_name="bot_db"):
|
|
self.collection = client[db_name]["assets"]
|
|
self.s3 = s3_adapter
|
|
|
|
async def create_asset(self, asset: Asset) -> str:
|
|
# Если есть S3 и данные - грузим в S3
|
|
if self.s3:
|
|
# Main data
|
|
if asset.data:
|
|
ts = int(asset.created_at.timestamp())
|
|
object_name = f"{asset.type.value}/{ts}_{asset.name}"
|
|
|
|
uploaded = await self.s3.upload_file(object_name, asset.data)
|
|
if uploaded:
|
|
asset.minio_object_name = object_name
|
|
asset.minio_bucket = self.s3.bucket_name
|
|
asset.data = None # Clear data
|
|
else:
|
|
logger.error(f"Failed to upload asset {asset.name} to MinIO")
|
|
|
|
# Thumbnail
|
|
if asset.thumbnail:
|
|
ts = int(asset.created_at.timestamp())
|
|
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
|
|
|
|
uploaded_thumb = await self.s3.upload_file(thumb_name, asset.thumbnail)
|
|
if uploaded_thumb:
|
|
asset.minio_thumbnail_object_name = thumb_name
|
|
asset.minio_bucket = self.s3.bucket_name # Assumes same bucket
|
|
asset.thumbnail = None # Clear thumbnail data
|
|
else:
|
|
logger.error(f"Failed to upload thumbnail for {asset.name} to MinIO")
|
|
|
|
|
|
res = await self.collection.insert_one(asset.model_dump())
|
|
return str(res.inserted_id)
|
|
|
|
async def get_assets(self, asset_type: Optional[str] = None, limit: int = 10, offset: int = 0, with_data: bool = False) -> List[Asset]:
|
|
filter = {}
|
|
if asset_type:
|
|
filter["type"] = asset_type
|
|
args = {}
|
|
if not with_data:
|
|
args["data"] = 0
|
|
# We assume thumbnails are fetched only if needed or kept sparse.
|
|
# If they are in MinIO, we don't fetch them by default list unless specifically asked?
|
|
# User requirement "Get bytes ... from minio" usually refers to full asset. used in detail view.
|
|
# In list view, we might want thumbnails.
|
|
# If thumbnails are in MinIO, list view will be slow if we fetch all.
|
|
# Usually we return a URL. But this bot might serve bytes.
|
|
# Let's assuming list view needs thumbnails if they are small.
|
|
# But if we moved them to S3, we probably don't want to fetch 10x S3 requests for list.
|
|
# For now: If minio_thumbnail_object_name is present, user might need to fetch separately
|
|
# or we fetch if `with_data` is True?
|
|
# Standard pattern: return URL or ID.
|
|
# Let's keep existing logic: args["thumbnail"] = 0 if not with_data.
|
|
# EXCEPT if we want to show thumbnails in list.
|
|
# Original code:
|
|
# if not with_data: args["data"] = 0; args["thumbnail"] = 0
|
|
# So list DOES NOT return thumbnails by default.
|
|
args["thumbnail"] = 0
|
|
|
|
res = await self.collection.find(filter, args).sort("created_at", -1).skip(offset).limit(limit).to_list(None)
|
|
assets = []
|
|
for doc in res:
|
|
doc["id"] = str(doc.pop("_id"))
|
|
asset = Asset(**doc)
|
|
|
|
if with_data and self.s3:
|
|
# Fetch data
|
|
if asset.minio_object_name:
|
|
data = await self.s3.get_file(asset.minio_object_name)
|
|
if data: asset.data = data
|
|
|
|
# Fetch thumbnail
|
|
if asset.minio_thumbnail_object_name:
|
|
thumb = await self.s3.get_file(asset.minio_thumbnail_object_name)
|
|
if thumb: asset.thumbnail = thumb
|
|
|
|
assets.append(asset)
|
|
|
|
return assets
|
|
|
|
async def get_asset(self, asset_id: str, with_data: bool = True) -> Asset:
|
|
projection = None
|
|
if not with_data:
|
|
projection = {"data": 0, "thumbnail": 0}
|
|
|
|
res = await self.collection.find_one({"_id": ObjectId(asset_id)}, projection)
|
|
if not res:
|
|
return None
|
|
|
|
res["id"] = str(res.pop("_id"))
|
|
asset = Asset(**res)
|
|
|
|
if with_data and self.s3:
|
|
if asset.minio_object_name:
|
|
data = await self.s3.get_file(asset.minio_object_name)
|
|
if data: asset.data = data
|
|
|
|
if asset.minio_thumbnail_object_name:
|
|
thumb = await self.s3.get_file(asset.minio_thumbnail_object_name)
|
|
if thumb: asset.thumbnail = thumb
|
|
|
|
return asset
|
|
|
|
async def update_asset(self, asset_id: str, asset: Asset):
|
|
if not asset.id:
|
|
if asset_id: asset.id = asset_id
|
|
else: raise Exception(f"Asset ID not found: {asset_id}")
|
|
|
|
# NOTE: simplistic update. If asset has data/thumbnail bytes, we might need to upload?
|
|
# Assuming for now we just save what's given.
|
|
# If user wants to update data, they should probably use a specialized method or we handle it here.
|
|
# Let's handle it: If data/thumbnail is present AND we have S3, upload it.
|
|
|
|
if self.s3:
|
|
if asset.data:
|
|
ts = int(asset.created_at.timestamp())
|
|
object_name = f"{asset.type.value}/{ts}_{asset.name}"
|
|
if await self.s3.upload_file(object_name, asset.data):
|
|
asset.minio_object_name = object_name
|
|
asset.minio_bucket = self.s3.bucket_name
|
|
asset.data = None
|
|
|
|
if asset.thumbnail:
|
|
ts = int(asset.created_at.timestamp())
|
|
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
|
|
if await self.s3.upload_file(thumb_name, asset.thumbnail):
|
|
asset.minio_thumbnail_object_name = thumb_name
|
|
asset.thumbnail = None
|
|
|
|
model_dump = asset.model_dump()
|
|
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": model_dump})
|
|
|
|
async def set_tg_photo_file_id(self, asset_id: str, tg_photo_file_id: str):
|
|
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": {"tg_photo_file_id": tg_photo_file_id}})
|
|
|
|
async def get_assets_by_char_id(self, character_id: str, limit: int = 10, offset: int = 0) -> List[Asset]:
|
|
docs = await self.collection.find({"linked_char_id": character_id},
|
|
{"data": 0}, sort=[("created_at", -1)]).limit(limit).skip(offset).to_list(
|
|
None)
|
|
assets = []
|
|
for doc in docs:
|
|
doc["id"] = str(doc.pop("_id"))
|
|
assets.append(Asset(**doc))
|
|
return assets
|
|
|
|
async def get_asset_count(self, character_id: Optional[str] = None) -> int:
|
|
return await self.collection.count_documents({"linked_char_id": character_id} if character_id else {})
|
|
|
|
async def get_assets_by_ids(self, asset_ids: List[str]) -> List[Asset]:
|
|
object_ids = [ObjectId(asset_id) for asset_id in asset_ids]
|
|
res = self.collection.find({"_id": {"$in": object_ids}}, {"data": 0}) # Exclude data but maybe allow thumbnail if small?
|
|
# Original excluded thumbnail too.
|
|
assets = []
|
|
async for doc in res:
|
|
doc["id"] = str(doc.pop("_id"))
|
|
assets.append(Asset(**doc))
|
|
return assets
|
|
|
|
async def delete_asset(self, asset_id: str) -> bool:
|
|
asset_doc = await self.collection.find_one({"_id": ObjectId(asset_id)})
|
|
if not asset_doc:
|
|
return False
|
|
|
|
if self.s3:
|
|
if asset_doc.get("minio_object_name"):
|
|
await self.s3.delete_file(asset_doc["minio_object_name"])
|
|
if asset_doc.get("minio_thumbnail_object_name"):
|
|
await self.s3.delete_file(asset_doc["minio_thumbnail_object_name"])
|
|
|
|
res = await self.collection.delete_one({"_id": ObjectId(asset_id)})
|
|
return res.deleted_count > 0
|
|
|
|
async def migrate_to_minio(self) -> dict:
|
|
"""Переносит данные и thumbnails из Mongo в MinIO."""
|
|
if not self.s3:
|
|
return {"error": "MinIO adapter not initialized"}
|
|
|
|
# 1. Migrate Data
|
|
cursor_data = self.collection.find({"data": {"$ne": None}, "minio_object_name": {"$eq": None}})
|
|
count_data = 0
|
|
errors_data = 0
|
|
|
|
async for doc in cursor_data:
|
|
try:
|
|
asset_id = doc["_id"]
|
|
data = doc.get("data")
|
|
name = doc.get("name", "unknown")
|
|
type_ = doc.get("type", "image")
|
|
created_at = doc.get("created_at")
|
|
ts = int(created_at.timestamp()) if created_at else 0
|
|
|
|
object_name = f"{type_}/{ts}_{asset_id}_{name}"
|
|
if await self.s3.upload_file(object_name, data):
|
|
await self.collection.update_one(
|
|
{"_id": asset_id},
|
|
{"$set": {"minio_object_name": object_name, "minio_bucket": self.s3.bucket_name, "data": None}}
|
|
)
|
|
count_data += 1
|
|
else:
|
|
errors_data += 1
|
|
except Exception as e:
|
|
logger.error(f"Data migration error for {doc.get('_id')}: {e}")
|
|
errors_data += 1
|
|
|
|
# 2. Migrate Thumbnails
|
|
cursor_thumb = self.collection.find({"thumbnail": {"$ne": None}, "minio_thumbnail_object_name": {"$eq": None}})
|
|
count_thumb = 0
|
|
errors_thumb = 0
|
|
|
|
async for doc in cursor_thumb:
|
|
try:
|
|
asset_id = doc["_id"]
|
|
thumb = doc.get("thumbnail")
|
|
name = doc.get("name", "unknown")
|
|
type_ = doc.get("type", "image")
|
|
created_at = doc.get("created_at")
|
|
ts = int(created_at.timestamp()) if created_at else 0
|
|
|
|
thumb_name = f"{type_}/thumbs/{ts}_{asset_id}_{name}_thumb.jpg"
|
|
if await self.s3.upload_file(thumb_name, thumb):
|
|
await self.collection.update_one(
|
|
{"_id": asset_id},
|
|
{"$set": {"minio_thumbnail_object_name": thumb_name, "minio_bucket": self.s3.bucket_name, "thumbnail": None}}
|
|
)
|
|
count_thumb += 1
|
|
else:
|
|
errors_thumb += 1
|
|
except Exception as e:
|
|
logger.error(f"Thumbnail migration error for {doc.get('_id')}: {e}")
|
|
errors_thumb += 1
|
|
|
|
return {
|
|
"migrated_data": count_data,
|
|
"errors_data": errors_data,
|
|
"migrated_thumbnails": count_thumb,
|
|
"errors_thumbnails": errors_thumb
|
|
}
|