+ s3
This commit is contained in:
6
.env
6
.env
@@ -2,4 +2,8 @@ BOT_TOKEN=8495170789:AAHyjjhHwwVtd9_ROnjHqPHRdnmyVr1aeaY
|
|||||||
# BOT_TOKEN=8011562605:AAF3kyzrZJgii0Jx-H8Sum5Njbo0BdbsiAo
|
# BOT_TOKEN=8011562605:AAF3kyzrZJgii0Jx-H8Sum5Njbo0BdbsiAo
|
||||||
GEMINI_API_KEY=AIzaSyAHzDYhgjOqZZnvOnOFRGaSkKu4OAN3kZE
|
GEMINI_API_KEY=AIzaSyAHzDYhgjOqZZnvOnOFRGaSkKu4OAN3kZE
|
||||||
MONGO_HOST=mongodb://admin:super_secure_password@31.59.58.220:27017/
|
MONGO_HOST=mongodb://admin:super_secure_password@31.59.58.220:27017/
|
||||||
ADMIN_ID=567047
|
ADMIN_ID=567047
|
||||||
|
MINIO_ENDPOINT=http://localhost:9000
|
||||||
|
MINIO_ACCESS_KEY=admin
|
||||||
|
MINIO_SECRET_KEY=SuperSecretPassword123!
|
||||||
|
MINIO_BUCKET=ai-char
|
||||||
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
minio_backup.tar.gz
|
||||||
Binary file not shown.
BIN
adapters/__pycache__/s3_adapter.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/s3_adapter.cpython-313.pyc
Normal file
Binary file not shown.
81
adapters/s3_adapter.py
Normal file
81
adapters/s3_adapter.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Optional, BinaryIO
|
||||||
|
import aioboto3
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
import os
|
||||||
|
|
||||||
|
class S3Adapter:
|
||||||
|
def __init__(self,
|
||||||
|
endpoint_url: str,
|
||||||
|
aws_access_key_id: str,
|
||||||
|
aws_secret_access_key: str,
|
||||||
|
bucket_name: str):
|
||||||
|
self.endpoint_url = endpoint_url
|
||||||
|
self.aws_access_key_id = aws_access_key_id
|
||||||
|
self.aws_secret_access_key = aws_secret_access_key
|
||||||
|
self.bucket_name = bucket_name
|
||||||
|
self.session = aioboto3.Session()
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _get_client(self):
|
||||||
|
async with self.session.client(
|
||||||
|
"s3",
|
||||||
|
endpoint_url=self.endpoint_url,
|
||||||
|
aws_access_key_id=self.aws_access_key_id,
|
||||||
|
aws_secret_access_key=self.aws_secret_access_key,
|
||||||
|
) as client:
|
||||||
|
yield client
|
||||||
|
|
||||||
|
async def upload_file(self, object_name: str, data: bytes, content_type: Optional[str] = None):
|
||||||
|
"""Uploads bytes data to S3."""
|
||||||
|
try:
|
||||||
|
extra_args = {}
|
||||||
|
if content_type:
|
||||||
|
extra_args["ContentType"] = content_type
|
||||||
|
|
||||||
|
async with self._get_client() as client:
|
||||||
|
await client.put_object(
|
||||||
|
Bucket=self.bucket_name,
|
||||||
|
Key=object_name,
|
||||||
|
Body=data,
|
||||||
|
**extra_args
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except ClientError as e:
|
||||||
|
# logging.error(e)
|
||||||
|
print(f"Error uploading to S3: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def get_file(self, object_name: str) -> Optional[bytes]:
|
||||||
|
"""Downloads a file from S3 and returns bytes."""
|
||||||
|
try:
|
||||||
|
async with self._get_client() as client:
|
||||||
|
response = await client.get_object(Bucket=self.bucket_name, Key=object_name)
|
||||||
|
return await response['Body'].read()
|
||||||
|
except ClientError as e:
|
||||||
|
print(f"Error downloading from S3: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def delete_file(self, object_name: str):
|
||||||
|
"""Deletes a file from S3."""
|
||||||
|
try:
|
||||||
|
async with self._get_client() as client:
|
||||||
|
await client.delete_object(Bucket=self.bucket_name, Key=object_name)
|
||||||
|
return True
|
||||||
|
except ClientError as e:
|
||||||
|
print(f"Error deleting from S3: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def get_presigned_url(self, object_name: str, expiration: int = 3600) -> Optional[str]:
|
||||||
|
"""Generate a presigned URL to share an S3 object."""
|
||||||
|
try:
|
||||||
|
async with self._get_client() as client:
|
||||||
|
response = await client.generate_presigned_url(
|
||||||
|
'get_object',
|
||||||
|
Params={'Bucket': self.bucket_name, 'Key': object_name},
|
||||||
|
ExpiresIn=expiration
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
except ClientError as e:
|
||||||
|
print(f"Error generating presigned URL: {e}")
|
||||||
|
return None
|
||||||
Binary file not shown.
@@ -11,6 +11,9 @@ from repos.dao import DAO
|
|||||||
|
|
||||||
from aiogram import Bot
|
from aiogram import Bot
|
||||||
|
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
# Провайдеры "сырых" клиентов из состояния приложения
|
# Провайдеры "сырых" клиентов из состояния приложения
|
||||||
def get_mongo_client(request: Request) -> AsyncIOMotorClient:
|
def get_mongo_client(request: Request) -> AsyncIOMotorClient:
|
||||||
return request.app.state.mongo_client
|
return request.app.state.mongo_client
|
||||||
@@ -21,11 +24,17 @@ def get_gemini_client(request: Request) -> GoogleAdapter:
|
|||||||
def get_bot_client(request: Request) -> Bot:
|
def get_bot_client(request: Request) -> Bot:
|
||||||
return request.app.state.bot
|
return request.app.state.bot
|
||||||
|
|
||||||
|
def get_s3_adapter(request: Request) -> Optional[S3Adapter]:
|
||||||
|
return getattr(request.app.state, "s3_adapter", None)
|
||||||
|
|
||||||
# Провайдер DAO (собирается из mongo_client)
|
# Провайдер DAO (собирается из mongo_client)
|
||||||
def get_dao(mongo_client: AsyncIOMotorClient = Depends(get_mongo_client)) -> DAO:
|
def get_dao(
|
||||||
|
mongo_client: AsyncIOMotorClient = Depends(get_mongo_client),
|
||||||
|
s3_adapter: Optional[S3Adapter] = Depends(get_s3_adapter)
|
||||||
|
) -> DAO:
|
||||||
# FastAPI кэширует результат Depends в рамках одного запроса,
|
# FastAPI кэширует результат Depends в рамках одного запроса,
|
||||||
# так что DAO создастся один раз за запрос.
|
# так что DAO создастся один раз за запрос.
|
||||||
return DAO(mongo_client)
|
return DAO(mongo_client, s3_adapter)
|
||||||
|
|
||||||
# Провайдер сервиса (собирается из DAO и Gemini)
|
# Провайдер сервиса (собирается из DAO и Gemini)
|
||||||
def get_generation_service(
|
def get_generation_service(
|
||||||
|
|||||||
Binary file not shown.
@@ -156,4 +156,15 @@ async def regenerate_thumbnails(dao: DAO = Depends(get_dao)):
|
|||||||
logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}")
|
logger.error(f"Failed to regenerate thumbnail for asset {asset.id}: {e}")
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
return {"status": "completed", "processed": count, "updated": updated}
|
|
||||||
|
return {"status": "completed", "processed": count, "updated": updated}
|
||||||
|
|
||||||
|
@router.post("/migrate_to_minio")
|
||||||
|
async def migrate_to_minio(dao: DAO = Depends(get_dao)):
|
||||||
|
"""
|
||||||
|
Migrates assets from MongoDB to MinIO.
|
||||||
|
"""
|
||||||
|
logger.info("Starting migration to MinIO")
|
||||||
|
result = await dao.assets.migrate_to_minio()
|
||||||
|
logger.info(f"Migration result: {result}")
|
||||||
|
return result
|
||||||
@@ -4,6 +4,26 @@ services:
|
|||||||
container_name: ai-bot
|
container_name: ai-bot
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
network: host
|
# УБРАЛИ network_mode: host
|
||||||
network_mode: host
|
ports:
|
||||||
|
- "8090:8090" # Вернули проброс порта
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
depends_on:
|
||||||
|
- minio
|
||||||
|
environment:
|
||||||
|
# Важно: внутри докера к другим контейнерам обращаемся по имени сервиса!
|
||||||
|
MINIO_ENDPOINT: "http://minio:9000"
|
||||||
|
|
||||||
|
minio:
|
||||||
|
image: minio/minio:latest
|
||||||
|
container_name: minio
|
||||||
|
restart: unless-stopped
|
||||||
|
command: server /data --console-address ":9001"
|
||||||
|
environment:
|
||||||
|
MINIO_ROOT_USER: admin
|
||||||
|
MINIO_ROOT_PASSWORD: SuperSecretPassword123!
|
||||||
|
ports:
|
||||||
|
- "9000:9000"
|
||||||
|
- "9001:9001"
|
||||||
|
volumes:
|
||||||
|
- ./minio_data:/data
|
||||||
15
main.py
15
main.py
@@ -16,6 +16,7 @@ from starlette.middleware.cors import CORSMiddleware
|
|||||||
|
|
||||||
# --- ИМПОРТЫ ПРОЕКТА ---
|
# --- ИМПОРТЫ ПРОЕКТА ---
|
||||||
from adapters.google_adapter import GoogleAdapter
|
from adapters.google_adapter import GoogleAdapter
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
from api.service.generation_service import GenerationService
|
from api.service.generation_service import GenerationService
|
||||||
from middlewares.album import AlbumMiddleware
|
from middlewares.album import AlbumMiddleware
|
||||||
from middlewares.auth import AuthMiddleware
|
from middlewares.auth import AuthMiddleware
|
||||||
@@ -60,11 +61,20 @@ bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTM
|
|||||||
# Клиент БД создаем глобально, чтобы он был доступен и боту (Storage), и API
|
# Клиент БД создаем глобально, чтобы он был доступен и боту (Storage), и API
|
||||||
mongo_client = AsyncIOMotorClient(MONGO_HOST)
|
mongo_client = AsyncIOMotorClient(MONGO_HOST)
|
||||||
|
|
||||||
|
# Репозитории
|
||||||
# Репозитории
|
# Репозитории
|
||||||
users_repo = UsersRepo(mongo_client)
|
users_repo = UsersRepo(mongo_client)
|
||||||
char_repo = CharacterRepo(mongo_client)
|
char_repo = CharacterRepo(mongo_client)
|
||||||
dao = DAO(mongo_client) # Главный DAO для бота
|
|
||||||
dao = DAO(mongo_client) # Главный DAO для бота
|
# S3 Adapter
|
||||||
|
s3_adapter = S3Adapter(
|
||||||
|
endpoint_url=os.getenv("MINIO_ENDPOINT", "http://localhost:9000"),
|
||||||
|
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
|
||||||
|
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
|
||||||
|
bucket_name=os.getenv("MINIO_BUCKET", "ai-char")
|
||||||
|
)
|
||||||
|
|
||||||
|
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
||||||
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
||||||
generation_service = GenerationService(dao, gemini, bot)
|
generation_service = GenerationService(dao, gemini, bot)
|
||||||
|
|
||||||
@@ -119,6 +129,7 @@ async def lifespan(app: FastAPI):
|
|||||||
app.state.mongo_client = mongo_client
|
app.state.mongo_client = mongo_client
|
||||||
app.state.gemini_client = gemini
|
app.state.gemini_client = gemini
|
||||||
app.state.bot = bot
|
app.state.bot = bot
|
||||||
|
app.state.s3_adapter = s3_adapter
|
||||||
|
|
||||||
print("✅ DB & DAO initialized")
|
print("✅ DB & DAO initialized")
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,9 @@ class Asset(BaseModel):
|
|||||||
data: Optional[bytes] = None
|
data: Optional[bytes] = None
|
||||||
tg_doc_file_id: Optional[str] = None
|
tg_doc_file_id: Optional[str] = None
|
||||||
tg_photo_file_id: Optional[str] = None
|
tg_photo_file_id: Optional[str] = None
|
||||||
|
minio_object_name: Optional[str] = None
|
||||||
|
minio_bucket: Optional[str] = None
|
||||||
|
minio_thumbnail_object_name: Optional[str] = None
|
||||||
thumbnail: Optional[bytes] = None
|
thumbnail: Optional[bytes] = None
|
||||||
tags: List[str] = []
|
tags: List[str] = []
|
||||||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,52 +1,145 @@
|
|||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
import logging
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
|
|
||||||
from models.Asset import Asset
|
from models.Asset import Asset
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class AssetsRepo:
|
class AssetsRepo:
|
||||||
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
|
def __init__(self, client: AsyncIOMotorClient, s3_adapter: Optional[S3Adapter] = None, db_name="bot_db"):
|
||||||
self.collection = client[db_name]["assets"]
|
self.collection = client[db_name]["assets"]
|
||||||
|
self.s3 = s3_adapter
|
||||||
|
|
||||||
async def create_asset(self, asset: Asset) -> str:
|
async def create_asset(self, asset: Asset) -> str:
|
||||||
res = await self.collection.insert_one(asset.model_dump())
|
# Если есть S3 и данные - грузим в S3
|
||||||
|
if self.s3:
|
||||||
|
# Main data
|
||||||
|
if asset.data:
|
||||||
|
ts = int(asset.created_at.timestamp())
|
||||||
|
object_name = f"{asset.type.value}/{ts}_{asset.name}"
|
||||||
|
|
||||||
|
uploaded = await self.s3.upload_file(object_name, asset.data)
|
||||||
|
if uploaded:
|
||||||
|
asset.minio_object_name = object_name
|
||||||
|
asset.minio_bucket = self.s3.bucket_name
|
||||||
|
asset.data = None # Clear data
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to upload asset {asset.name} to MinIO")
|
||||||
|
|
||||||
|
# Thumbnail
|
||||||
|
if asset.thumbnail:
|
||||||
|
ts = int(asset.created_at.timestamp())
|
||||||
|
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
|
||||||
|
|
||||||
|
uploaded_thumb = await self.s3.upload_file(thumb_name, asset.thumbnail)
|
||||||
|
if uploaded_thumb:
|
||||||
|
asset.minio_thumbnail_object_name = thumb_name
|
||||||
|
asset.minio_bucket = self.s3.bucket_name # Assumes same bucket
|
||||||
|
asset.thumbnail = None # Clear thumbnail data
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to upload thumbnail for {asset.name} to MinIO")
|
||||||
|
|
||||||
|
|
||||||
|
res = await self.collection.insert_one(asset.model_dump())
|
||||||
return str(res.inserted_id)
|
return str(res.inserted_id)
|
||||||
|
|
||||||
async def get_assets(self, limit: int = 10, offset: int = 0, with_data: bool = False) -> List[Asset]:
|
async def get_assets(self, limit: int = 10, offset: int = 0, with_data: bool = False) -> List[Asset]:
|
||||||
args = {}
|
args = {}
|
||||||
if not with_data:
|
if not with_data:
|
||||||
args["data"] = 0
|
args["data"] = 0
|
||||||
|
# We assume thumbnails are fetched only if needed or kept sparse.
|
||||||
|
# If they are in MinIO, we don't fetch them by default list unless specifically asked?
|
||||||
|
# User requirement "Get bytes ... from minio" usually refers to full asset. used in detail view.
|
||||||
|
# In list view, we might want thumbnails.
|
||||||
|
# If thumbnails are in MinIO, list view will be slow if we fetch all.
|
||||||
|
# Usually we return a URL. But this bot might serve bytes.
|
||||||
|
# Let's assuming list view needs thumbnails if they are small.
|
||||||
|
# But if we moved them to S3, we probably don't want to fetch 10x S3 requests for list.
|
||||||
|
# For now: If minio_thumbnail_object_name is present, user might need to fetch separately
|
||||||
|
# or we fetch if `with_data` is True?
|
||||||
|
# Standard pattern: return URL or ID.
|
||||||
|
# Let's keep existing logic: args["thumbnail"] = 0 if not with_data.
|
||||||
|
# EXCEPT if we want to show thumbnails in list.
|
||||||
|
# Original code:
|
||||||
|
# if not with_data: args["data"] = 0; args["thumbnail"] = 0
|
||||||
|
# So list DOES NOT return thumbnails by default.
|
||||||
args["thumbnail"] = 0
|
args["thumbnail"] = 0
|
||||||
|
|
||||||
res = await self.collection.find({}, args).sort("created_at", -1).skip(offset).limit(limit).to_list(None)
|
res = await self.collection.find({}, args).sort("created_at", -1).skip(offset).limit(limit).to_list(None)
|
||||||
assets = []
|
assets = []
|
||||||
for doc in res:
|
for doc in res:
|
||||||
# Конвертируем ObjectId в строку и кладем в поле id
|
|
||||||
doc["id"] = str(doc.pop("_id"))
|
doc["id"] = str(doc.pop("_id"))
|
||||||
|
asset = Asset(**doc)
|
||||||
# Создаем объект
|
|
||||||
assets.append(Asset(**doc))
|
if with_data and self.s3:
|
||||||
|
# Fetch data
|
||||||
|
if asset.minio_object_name:
|
||||||
|
data = await self.s3.get_file(asset.minio_object_name)
|
||||||
|
if data: asset.data = data
|
||||||
|
|
||||||
|
# Fetch thumbnail
|
||||||
|
if asset.minio_thumbnail_object_name:
|
||||||
|
thumb = await self.s3.get_file(asset.minio_thumbnail_object_name)
|
||||||
|
if thumb: asset.thumbnail = thumb
|
||||||
|
|
||||||
|
assets.append(asset)
|
||||||
|
|
||||||
return assets
|
return assets
|
||||||
|
|
||||||
|
|
||||||
async def get_asset(self, asset_id: str, with_data: bool = True) -> Asset:
|
async def get_asset(self, asset_id: str, with_data: bool = True) -> Asset:
|
||||||
projection = {"_id": 1, "name": 1, "type": 1, "tg_doc_file_id": 1}
|
projection = None
|
||||||
if with_data:
|
if not with_data:
|
||||||
projection["data"] = 1
|
projection = {"data": 0, "thumbnail": 0}
|
||||||
projection["thumbnail"] = 1
|
|
||||||
|
|
||||||
res = await self.collection.find_one({"_id": ObjectId(asset_id)},
|
res = await self.collection.find_one({"_id": ObjectId(asset_id)}, projection)
|
||||||
projection)
|
if not res:
|
||||||
|
return None
|
||||||
|
|
||||||
res["id"] = str(res.pop("_id"))
|
res["id"] = str(res.pop("_id"))
|
||||||
return Asset(**res)
|
asset = Asset(**res)
|
||||||
|
|
||||||
|
if with_data and self.s3:
|
||||||
|
if asset.minio_object_name:
|
||||||
|
data = await self.s3.get_file(asset.minio_object_name)
|
||||||
|
if data: asset.data = data
|
||||||
|
|
||||||
|
if asset.minio_thumbnail_object_name:
|
||||||
|
thumb = await self.s3.get_file(asset.minio_thumbnail_object_name)
|
||||||
|
if thumb: asset.thumbnail = thumb
|
||||||
|
|
||||||
|
return asset
|
||||||
|
|
||||||
async def update_asset(self, asset_id: str, asset: Asset):
|
async def update_asset(self, asset_id: str, asset: Asset):
|
||||||
if not asset.id:
|
if not asset.id:
|
||||||
raise Exception(f"Asset ID not found: {asset_id}")
|
if asset_id: asset.id = asset_id
|
||||||
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": asset.model_dump()})
|
else: raise Exception(f"Asset ID not found: {asset_id}")
|
||||||
|
|
||||||
|
# NOTE: simplistic update. If asset has data/thumbnail bytes, we might need to upload?
|
||||||
|
# Assuming for now we just save what's given.
|
||||||
|
# If user wants to update data, they should probably use a specialized method or we handle it here.
|
||||||
|
# Let's handle it: If data/thumbnail is present AND we have S3, upload it.
|
||||||
|
|
||||||
|
if self.s3:
|
||||||
|
if asset.data:
|
||||||
|
ts = int(asset.created_at.timestamp())
|
||||||
|
object_name = f"{asset.type.value}/{ts}_{asset.name}"
|
||||||
|
if await self.s3.upload_file(object_name, asset.data):
|
||||||
|
asset.minio_object_name = object_name
|
||||||
|
asset.minio_bucket = self.s3.bucket_name
|
||||||
|
asset.data = None
|
||||||
|
|
||||||
|
if asset.thumbnail:
|
||||||
|
ts = int(asset.created_at.timestamp())
|
||||||
|
thumb_name = f"{asset.type.value}/thumbs/{ts}_{asset.name}_thumb.jpg"
|
||||||
|
if await self.s3.upload_file(thumb_name, asset.thumbnail):
|
||||||
|
asset.minio_thumbnail_object_name = thumb_name
|
||||||
|
asset.thumbnail = None
|
||||||
|
|
||||||
|
model_dump = asset.model_dump()
|
||||||
|
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": model_dump})
|
||||||
|
|
||||||
async def set_tg_photo_file_id(self, asset_id: str, tg_photo_file_id: str):
|
async def set_tg_photo_file_id(self, asset_id: str, tg_photo_file_id: str):
|
||||||
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": {"tg_photo_file_id": tg_photo_file_id}})
|
await self.collection.update_one({"_id": ObjectId(asset_id)}, {"$set": {"tg_photo_file_id": tg_photo_file_id}})
|
||||||
@@ -64,11 +157,10 @@ class AssetsRepo:
|
|||||||
async def get_asset_count(self, character_id: Optional[str] = None) -> int:
|
async def get_asset_count(self, character_id: Optional[str] = None) -> int:
|
||||||
return await self.collection.count_documents({"linked_char_id": character_id} if character_id else {})
|
return await self.collection.count_documents({"linked_char_id": character_id} if character_id else {})
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def get_assets_by_ids(self, asset_ids: List[str]) -> List[Asset]:
|
async def get_assets_by_ids(self, asset_ids: List[str]) -> List[Asset]:
|
||||||
object_ids = [ObjectId(asset_id) for asset_id in asset_ids]
|
object_ids = [ObjectId(asset_id) for asset_id in asset_ids]
|
||||||
res = self.collection.find({"_id": {"$in": object_ids}}, {"thumbnail": 0})
|
res = self.collection.find({"_id": {"$in": object_ids}}, {"data": 0}) # Exclude data but maybe allow thumbnail if small?
|
||||||
|
# Original excluded thumbnail too.
|
||||||
assets = []
|
assets = []
|
||||||
async for doc in res:
|
async for doc in res:
|
||||||
doc["id"] = str(doc.pop("_id"))
|
doc["id"] = str(doc.pop("_id"))
|
||||||
@@ -76,5 +168,81 @@ class AssetsRepo:
|
|||||||
return assets
|
return assets
|
||||||
|
|
||||||
async def delete_asset(self, asset_id: str) -> bool:
|
async def delete_asset(self, asset_id: str) -> bool:
|
||||||
|
asset_doc = await self.collection.find_one({"_id": ObjectId(asset_id)})
|
||||||
|
if not asset_doc:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.s3:
|
||||||
|
if asset_doc.get("minio_object_name"):
|
||||||
|
await self.s3.delete_file(asset_doc["minio_object_name"])
|
||||||
|
if asset_doc.get("minio_thumbnail_object_name"):
|
||||||
|
await self.s3.delete_file(asset_doc["minio_thumbnail_object_name"])
|
||||||
|
|
||||||
res = await self.collection.delete_one({"_id": ObjectId(asset_id)})
|
res = await self.collection.delete_one({"_id": ObjectId(asset_id)})
|
||||||
return res.deleted_count > 0
|
return res.deleted_count > 0
|
||||||
|
|
||||||
|
async def migrate_to_minio(self) -> dict:
|
||||||
|
"""Переносит данные и thumbnails из Mongo в MinIO."""
|
||||||
|
if not self.s3:
|
||||||
|
return {"error": "MinIO adapter not initialized"}
|
||||||
|
|
||||||
|
# 1. Migrate Data
|
||||||
|
cursor_data = self.collection.find({"data": {"$ne": None}, "minio_object_name": {"$eq": None}})
|
||||||
|
count_data = 0
|
||||||
|
errors_data = 0
|
||||||
|
|
||||||
|
async for doc in cursor_data:
|
||||||
|
try:
|
||||||
|
asset_id = doc["_id"]
|
||||||
|
data = doc.get("data")
|
||||||
|
name = doc.get("name", "unknown")
|
||||||
|
type_ = doc.get("type", "image")
|
||||||
|
created_at = doc.get("created_at")
|
||||||
|
ts = int(created_at.timestamp()) if created_at else 0
|
||||||
|
|
||||||
|
object_name = f"{type_}/{ts}_{asset_id}_{name}"
|
||||||
|
if await self.s3.upload_file(object_name, data):
|
||||||
|
await self.collection.update_one(
|
||||||
|
{"_id": asset_id},
|
||||||
|
{"$set": {"minio_object_name": object_name, "minio_bucket": self.s3.bucket_name, "data": None}}
|
||||||
|
)
|
||||||
|
count_data += 1
|
||||||
|
else:
|
||||||
|
errors_data += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Data migration error for {doc.get('_id')}: {e}")
|
||||||
|
errors_data += 1
|
||||||
|
|
||||||
|
# 2. Migrate Thumbnails
|
||||||
|
cursor_thumb = self.collection.find({"thumbnail": {"$ne": None}, "minio_thumbnail_object_name": {"$eq": None}})
|
||||||
|
count_thumb = 0
|
||||||
|
errors_thumb = 0
|
||||||
|
|
||||||
|
async for doc in cursor_thumb:
|
||||||
|
try:
|
||||||
|
asset_id = doc["_id"]
|
||||||
|
thumb = doc.get("thumbnail")
|
||||||
|
name = doc.get("name", "unknown")
|
||||||
|
type_ = doc.get("type", "image")
|
||||||
|
created_at = doc.get("created_at")
|
||||||
|
ts = int(created_at.timestamp()) if created_at else 0
|
||||||
|
|
||||||
|
thumb_name = f"{type_}/thumbs/{ts}_{asset_id}_{name}_thumb.jpg"
|
||||||
|
if await self.s3.upload_file(thumb_name, thumb):
|
||||||
|
await self.collection.update_one(
|
||||||
|
{"_id": asset_id},
|
||||||
|
{"$set": {"minio_thumbnail_object_name": thumb_name, "minio_bucket": self.s3.bucket_name, "thumbnail": None}}
|
||||||
|
)
|
||||||
|
count_thumb += 1
|
||||||
|
else:
|
||||||
|
errors_thumb += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Thumbnail migration error for {doc.get('_id')}: {e}")
|
||||||
|
errors_thumb += 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"migrated_data": count_data,
|
||||||
|
"errors_data": errors_data,
|
||||||
|
"migrated_thumbnails": count_thumb,
|
||||||
|
"errors_thumbnails": errors_thumb
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,8 +6,11 @@ from repos.generation_repo import GenerationRepo
|
|||||||
from repos.user_repo import UsersRepo
|
from repos.user_repo import UsersRepo
|
||||||
|
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
|
|
||||||
class DAO:
|
class DAO:
|
||||||
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
|
def __init__(self, client: AsyncIOMotorClient, s3_adapter: Optional[S3Adapter] = None, db_name="bot_db"):
|
||||||
self.chars = CharacterRepo(client, db_name)
|
self.chars = CharacterRepo(client, db_name)
|
||||||
self.assets = AssetsRepo(client, db_name)
|
self.assets = AssetsRepo(client, s3_adapter, db_name)
|
||||||
self.generations = GenerationRepo(client, db_name)
|
self.generations = GenerationRepo(client, db_name)
|
||||||
|
|||||||
@@ -45,3 +45,4 @@ urllib3==2.6.3
|
|||||||
uvicorn==0.40.0
|
uvicorn==0.40.0
|
||||||
websockets==15.0.1
|
websockets==15.0.1
|
||||||
yarl==1.22.0
|
yarl==1.22.0
|
||||||
|
aioboto3==13.3.0
|
||||||
|
|||||||
44
tests/test_s3_connection.py
Normal file
44
tests/test_s3_connection.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
|
|
||||||
|
async def test_s3():
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
endpoint = os.getenv("MINIO_ENDPOINT", "http://localhost:9000")
|
||||||
|
access_key = os.getenv("MINIO_ACCESS_KEY")
|
||||||
|
secret_key = os.getenv("MINIO_SECRET_KEY")
|
||||||
|
bucket = os.getenv("MINIO_BUCKET")
|
||||||
|
|
||||||
|
print(f"Connecting to {endpoint}, bucket: {bucket}")
|
||||||
|
|
||||||
|
s3 = S3Adapter(endpoint, access_key, secret_key, bucket)
|
||||||
|
|
||||||
|
test_filename = "test_connection.txt"
|
||||||
|
test_data = b"Hello MinIO!"
|
||||||
|
|
||||||
|
print("Uploading...")
|
||||||
|
success = await s3.upload_file(test_filename, test_data)
|
||||||
|
if success:
|
||||||
|
print("Upload successful!")
|
||||||
|
else:
|
||||||
|
print("Upload failed!")
|
||||||
|
return
|
||||||
|
|
||||||
|
print("Downloading...")
|
||||||
|
data = await s3.get_file(test_filename)
|
||||||
|
if data == test_data:
|
||||||
|
print("Download successful and data matches!")
|
||||||
|
else:
|
||||||
|
print(f"Download mismatch: {data}")
|
||||||
|
|
||||||
|
print("Deleting...")
|
||||||
|
deleted = await s3.delete_file(test_filename)
|
||||||
|
if deleted:
|
||||||
|
print("Delete successful!")
|
||||||
|
else:
|
||||||
|
print("Delete failed!")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(test_s3())
|
||||||
84
tests/verify_minio_integration.py
Normal file
84
tests/verify_minio_integration.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
|
|
||||||
|
from models.Asset import Asset, AssetType
|
||||||
|
from repos.assets_repo import AssetsRepo
|
||||||
|
from adapters.s3_adapter import S3Adapter
|
||||||
|
|
||||||
|
# Load env to get credentials
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
async def test_integration():
|
||||||
|
print("🚀 Starting integration test...")
|
||||||
|
|
||||||
|
# 1. Setup Dependencies
|
||||||
|
mongo_uri = os.getenv("MONGO_HOST", "mongodb://localhost:27017")
|
||||||
|
client = AsyncIOMotorClient(mongo_uri)
|
||||||
|
db_name = os.getenv("DB_NAME", "bot_db_test")
|
||||||
|
|
||||||
|
s3_adapter = S3Adapter(
|
||||||
|
endpoint_url=os.getenv("MINIO_ENDPOINT", "http://localhost:9000"),
|
||||||
|
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY", "admin"),
|
||||||
|
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY", "SuperSecretPassword123!"),
|
||||||
|
bucket_name=os.getenv("MINIO_BUCKET", "ai-char")
|
||||||
|
)
|
||||||
|
|
||||||
|
repo = AssetsRepo(client, s3_adapter, db_name=db_name)
|
||||||
|
|
||||||
|
# 2. Create Asset with Data and Thumbnail
|
||||||
|
print("📝 Creating asset...")
|
||||||
|
dummy_data = b"image_data_bytes"
|
||||||
|
dummy_thumb = b"thumbnail_bytes"
|
||||||
|
|
||||||
|
asset = Asset(
|
||||||
|
name="test_asset_with_thumb.png",
|
||||||
|
type=AssetType.IMAGE,
|
||||||
|
data=dummy_data,
|
||||||
|
thumbnail=dummy_thumb
|
||||||
|
)
|
||||||
|
|
||||||
|
asset_id = await repo.create_asset(asset)
|
||||||
|
print(f"✅ Asset created with ID: {asset_id}")
|
||||||
|
|
||||||
|
# 3. Verify object names in Mongo (Raw check)
|
||||||
|
print("🔍 Verifying Mongo metadata...")
|
||||||
|
# Used repo to fetch is better
|
||||||
|
fetched_asset = await repo.get_asset(asset_id, with_data=False)
|
||||||
|
|
||||||
|
if fetched_asset.minio_object_name:
|
||||||
|
print(f"✅ minio_object_name set: {fetched_asset.minio_object_name}")
|
||||||
|
else:
|
||||||
|
print("❌ minio_object_name NOT set!")
|
||||||
|
|
||||||
|
if fetched_asset.minio_thumbnail_object_name:
|
||||||
|
print(f"✅ minio_thumbnail_object_name set: {fetched_asset.minio_thumbnail_object_name}")
|
||||||
|
else:
|
||||||
|
print("❌ minio_thumbnail_object_name NOT set!")
|
||||||
|
|
||||||
|
# 4. Fetch Data from S3 via Repo
|
||||||
|
print("📥 Fetching data from MinIO...")
|
||||||
|
full_asset = await repo.get_asset(asset_id, with_data=True)
|
||||||
|
|
||||||
|
if full_asset.data == dummy_data:
|
||||||
|
print("✅ Data matches!")
|
||||||
|
else:
|
||||||
|
print(f"❌ Data mismatch! Got: {full_asset.data}")
|
||||||
|
|
||||||
|
if full_asset.thumbnail == dummy_thumb:
|
||||||
|
print("✅ Thumbnail matches!")
|
||||||
|
else:
|
||||||
|
print(f"❌ Thumbnail mismatch! Got: {full_asset.thumbnail}")
|
||||||
|
|
||||||
|
# 5. Clean up
|
||||||
|
print("🧹 Cleaning up...")
|
||||||
|
deleted = await repo.delete_asset(asset_id)
|
||||||
|
if deleted:
|
||||||
|
print("✅ Asset deleted")
|
||||||
|
else:
|
||||||
|
print("❌ Failed to delete asset")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(test_integration())
|
||||||
Reference in New Issue
Block a user