from typing import List, Optional from bson import ObjectId from motor.motor_asyncio import AsyncIOMotorClient from models.Asset import Asset class AssetsRepo: def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"): self.collection = client[db_name]["assets"] async def create_asset(self, asset: Asset) -> str: res = await self.collection.insert_one(asset.model_dump()) return str(res.inserted_id) async def get_assets(self, limit: int = 10, offset: int = 0, with_data: bool = False) -> List[Asset]: args = {} if not with_data: args["data"] = 0 args["thumbnail"] = 0 res = await self.collection.find({}, args).sort("created_at", -1).skip(offset).limit(limit).to_list(None) assets = [] for doc in res: # Конвертируем ObjectId в строку и кладем в поле id doc["id"] = str(doc.pop("_id")) # Создаем объект assets.append(Asset(**doc)) return assets async def get_asset(self, asset_id: str, with_data: bool = True) -> Asset: projection = {"_id": 1, "name": 1, "type": 1, "tg_doc_file_id": 1} if with_data: projection["data"] = 1 projection["thumbnail"] = 1 res = await self.collection.find_one({"_id": ObjectId(asset_id)}, projection) res["id"] = str(res.pop("_id")) return Asset(**res) async def update_asset(self, asset_id: str, asset: Asset): if not asset.id: raise Exception(f"Asset ID not found: {asset_id}") await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": asset.model_dump()}) async def set_tg_photo_file_id(self, asset_id: str, tg_photo_file_id: str): await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": {"tg_photo_file_id": tg_photo_file_id}}) async def get_assets_by_char_id(self, character_id: str, limit: int = 10, offset: int = 0) -> List[Asset]: docs = await self.collection.find({"linked_char_id": character_id}, {"data": 0}, sort=[("created_at", -1)]).limit(limit).skip(offset).to_list( None) assets = [] for doc in docs: doc["id"] = str(doc.pop("_id")) assets.append(Asset(**doc)) return assets async def get_asset_count(self, character_id: Optional[str] = None) -> int: return await self.collection.count_documents({"linked_char_id": character_id} if character_id else {}) async def get_assets_by_ids(self, asset_ids: List[str]) -> List[Asset]: object_ids = [ObjectId(asset_id) for asset_id in asset_ids] res = self.collection.find({"_id": {"$in": object_ids}}, {"thumbnail": 0}) assets = [] async for doc in res: doc["id"] = str(doc.pop("_id")) assets.append(Asset(**doc)) return assets async def delete_asset(self, asset_id: str) -> bool: res = await self.collection.delete_one({"_id": ObjectId(asset_id)}) return res.deleted_count > 0