Compare commits
8 Commits
video
...
e2c050515d
| Author | SHA1 | Date | |
|---|---|---|---|
| e2c050515d | |||
| 2d3da59de9 | |||
| 279cb5c6f6 | |||
| 30138bab38 | |||
| 977cab92f8 | |||
| dcab238d3e | |||
| 9d2e4e47de | |||
| c6142715d9 |
2
.env
2
.env
@@ -9,5 +9,3 @@ MINIO_SECRET_KEY=SuperSecretPassword123!
|
||||
MINIO_BUCKET=ai-char
|
||||
MODE=production
|
||||
EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW
|
||||
KLING_ACCESS_KEY=AngRfYYeLhPQB3pmr9CpHfgHPCrmeeM4
|
||||
KLING_SECRET_KEY=ndJfyayfQgbg4bMnE49yHnkACPChKMp4
|
||||
14
.gitignore
vendored
14
.gitignore
vendored
@@ -9,3 +9,17 @@ minio_backup.tar.gz
|
||||
.idea
|
||||
.venv
|
||||
.vscode
|
||||
.vscode/launch.json
|
||||
middlewares/__pycache__/
|
||||
middlewares/*.pyc
|
||||
api/__pycache__/
|
||||
api/*.pyc
|
||||
repos/__pycache__/
|
||||
repos/*.pyc
|
||||
adapters/__pycache__/
|
||||
adapters/*.pyc
|
||||
services/__pycache__/
|
||||
services/*.pyc
|
||||
utils/__pycache__/
|
||||
utils/*.pyc
|
||||
.vscode/launch.json
|
||||
|
||||
25
.vscode/launch.json
vendored
25
.vscode/launch.json
vendored
@@ -16,31 +16,6 @@
|
||||
],
|
||||
"jinja": true,
|
||||
"justMyCode": true
|
||||
},
|
||||
{
|
||||
"name": "Python: Current File",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"env": {
|
||||
"PYTHONPATH": "${workspaceFolder}"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Debug Tests: Current File",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "pytest",
|
||||
"args": [
|
||||
"${file}"
|
||||
],
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"env": {
|
||||
"PYTHONPATH": "${workspaceFolder}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
BIN
__pycache__/config.cpython-313.pyc
Normal file
BIN
__pycache__/config.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/keyboards.cpython-313.pyc
Normal file
BIN
__pycache__/keyboards.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/main.cpython-313.pyc
Normal file
BIN
__pycache__/main.cpython-313.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/Exception.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/Exception.cpython-313.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/google_adapter.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/google_adapter.cpython-313.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/kling_adapter.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/kling_adapter.cpython-313.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/s3_adapter.cpython-313.pyc
Normal file
BIN
adapters/__pycache__/s3_adapter.cpython-313.pyc
Normal file
Binary file not shown.
@@ -23,28 +23,30 @@ class GoogleAdapter:
|
||||
self.TEXT_MODEL = "gemini-3-pro-preview"
|
||||
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
||||
|
||||
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> list:
|
||||
"""Вспомогательный метод для подготовки контента (текст + картинки)"""
|
||||
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> tuple:
|
||||
"""Вспомогательный метод для подготовки контента (текст + картинки).
|
||||
Returns (contents, opened_images) — caller MUST close opened_images after use."""
|
||||
contents = [prompt]
|
||||
opened_images = []
|
||||
if images_list:
|
||||
logger.info(f"Preparing content with {len(images_list)} images")
|
||||
for img_bytes in images_list:
|
||||
try:
|
||||
# Gemini API требует PIL Image на входе
|
||||
image = Image.open(io.BytesIO(img_bytes))
|
||||
contents.append(image)
|
||||
opened_images.append(image)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing input image: {e}")
|
||||
else:
|
||||
logger.info("Preparing content with no images")
|
||||
return contents
|
||||
return contents, opened_images
|
||||
|
||||
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
|
||||
"""
|
||||
Генерация текста (Чат или Vision).
|
||||
Возвращает строку с ответом.
|
||||
"""
|
||||
contents = self._prepare_contents(prompt, images_list)
|
||||
contents, opened_images = self._prepare_contents(prompt, images_list)
|
||||
logger.info(f"Generating text: {prompt}")
|
||||
try:
|
||||
response = self.client.models.generate_content(
|
||||
@@ -68,6 +70,9 @@ class GoogleAdapter:
|
||||
except Exception as e:
|
||||
logger.error(f"Gemini Text API Error: {e}")
|
||||
raise GoogleGenerationException(f"Gemini Text API Error: {e}")
|
||||
finally:
|
||||
for img in opened_images:
|
||||
img.close()
|
||||
|
||||
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
||||
"""
|
||||
@@ -75,7 +80,7 @@ class GoogleAdapter:
|
||||
Возвращает список байтовых потоков (готовых к отправке).
|
||||
"""
|
||||
|
||||
contents = self._prepare_contents(prompt, images_list)
|
||||
contents, opened_images = self._prepare_contents(prompt, images_list)
|
||||
logger.info(f"Generating image. Prompt length: {len(prompt)}, Ratio: {aspect_ratio}, Quality: {quality}")
|
||||
|
||||
start_time = datetime.now()
|
||||
@@ -101,8 +106,20 @@ class GoogleAdapter:
|
||||
if response.usage_metadata:
|
||||
token_usage = response.usage_metadata.total_token_count
|
||||
|
||||
if response.parts is None and response.candidates[0].finish_reason is not None:
|
||||
raise GoogleGenerationException(f"Generation blocked in cause of {response.candidates[0].finish_reason.value}")
|
||||
# Check prompt-level block (e.g. PROHIBITED_CONTENT) — no candidates in this case
|
||||
if response.prompt_feedback and response.prompt_feedback.block_reason:
|
||||
raise GoogleGenerationException(
|
||||
f"Generation blocked at prompt level: {response.prompt_feedback.block_reason.value}"
|
||||
)
|
||||
|
||||
# Check candidate-level block
|
||||
if response.parts is None:
|
||||
response_reason = (
|
||||
response.candidates[0].finish_reason
|
||||
if response.candidates and len(response.candidates) > 0
|
||||
else "Unknown"
|
||||
)
|
||||
raise GoogleGenerationException(f"Generation blocked: {response_reason}")
|
||||
|
||||
generated_images = []
|
||||
|
||||
@@ -148,3 +165,7 @@ class GoogleAdapter:
|
||||
except Exception as e:
|
||||
logger.error(f"Gemini Image API Error: {e}")
|
||||
raise GoogleGenerationException(f"Gemini Image API Error: {e}")
|
||||
finally:
|
||||
for img in opened_images:
|
||||
img.close()
|
||||
del contents
|
||||
@@ -1,165 +0,0 @@
|
||||
import logging
|
||||
import time
|
||||
import asyncio
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
import httpx
|
||||
import jwt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
KLING_API_BASE = "https://api.klingai.com"
|
||||
|
||||
|
||||
class KlingApiException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class KlingAdapter:
|
||||
def __init__(self, access_key: str, secret_key: str):
|
||||
if not access_key or not secret_key:
|
||||
raise ValueError("Kling API credentials are missing")
|
||||
self.access_key = access_key
|
||||
self.secret_key = secret_key
|
||||
|
||||
def _generate_token(self) -> str:
|
||||
"""Generate a JWT token for Kling API authentication."""
|
||||
now = int(time.time())
|
||||
payload = {
|
||||
"iss": self.access_key,
|
||||
"exp": now + 1800, # 30 minutes
|
||||
"iat": now - 5, # небольшой запас назад
|
||||
"nbf": now - 5,
|
||||
}
|
||||
return jwt.encode(payload, self.secret_key, algorithm="HS256",
|
||||
headers={"typ": "JWT", "alg": "HS256"})
|
||||
|
||||
def _headers(self) -> dict:
|
||||
return {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {self._generate_token()}"
|
||||
}
|
||||
|
||||
async def create_video_task(
|
||||
self,
|
||||
image_url: str,
|
||||
prompt: str = "",
|
||||
negative_prompt: str = "",
|
||||
model_name: str = "kling-v2-6",
|
||||
duration: int = 5,
|
||||
mode: str = "std",
|
||||
cfg_scale: float = 0.5,
|
||||
aspect_ratio: str = "16:9",
|
||||
callback_url: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create an image-to-video generation task.
|
||||
Returns the full task data dict including task_id.
|
||||
"""
|
||||
body: Dict[str, Any] = {
|
||||
"model_name": model_name,
|
||||
"image": image_url,
|
||||
"prompt": prompt,
|
||||
"negative_prompt": negative_prompt,
|
||||
"duration": str(duration),
|
||||
"mode": mode,
|
||||
"cfg_scale": cfg_scale,
|
||||
"aspect_ratio": aspect_ratio,
|
||||
}
|
||||
if callback_url:
|
||||
body["callback_url"] = callback_url
|
||||
|
||||
logger.info(f"Creating Kling video task. Model: {model_name}, Duration: {duration}s, Mode: {mode}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
response = await client.post(
|
||||
f"{KLING_API_BASE}/v1/videos/image2video",
|
||||
headers=self._headers(),
|
||||
json=body,
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
logger.info(f"Kling create task response: code={data.get('code')}, message={data.get('message')}")
|
||||
|
||||
if response.status_code != 200 or data.get("code") != 0:
|
||||
error_msg = data.get("message", "Unknown Kling API error")
|
||||
raise KlingApiException(f"Failed to create video task: {error_msg} (code={data.get('code')})")
|
||||
|
||||
task_data = data.get("data", {})
|
||||
task_id = task_data.get("task_id")
|
||||
if not task_id:
|
||||
raise KlingApiException("No task_id returned from Kling API")
|
||||
|
||||
logger.info(f"Kling video task created: task_id={task_id}")
|
||||
return task_data
|
||||
|
||||
async def get_task_status(self, task_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Query the status of a video generation task.
|
||||
Returns the full task data dict.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
response = await client.get(
|
||||
f"{KLING_API_BASE}/v1/videos/image2video/{task_id}",
|
||||
headers=self._headers(),
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
|
||||
if response.status_code != 200 or data.get("code") != 0:
|
||||
error_msg = data.get("message", "Unknown error")
|
||||
raise KlingApiException(f"Failed to query task {task_id}: {error_msg}")
|
||||
|
||||
return data.get("data", {})
|
||||
|
||||
async def wait_for_completion(
|
||||
self,
|
||||
task_id: str,
|
||||
poll_interval: int = 10,
|
||||
timeout: int = 600,
|
||||
progress_callback=None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Poll the task status until completion.
|
||||
|
||||
Args:
|
||||
task_id: Kling task ID
|
||||
poll_interval: seconds between polls
|
||||
timeout: max seconds to wait
|
||||
progress_callback: async callable(progress_pct: int) to report progress
|
||||
|
||||
Returns:
|
||||
Final task data dict with video URL on success.
|
||||
|
||||
Raises:
|
||||
KlingApiException on failure or timeout.
|
||||
"""
|
||||
start = time.time()
|
||||
attempt = 0
|
||||
|
||||
while True:
|
||||
elapsed = time.time() - start
|
||||
if elapsed > timeout:
|
||||
raise KlingApiException(f"Video generation timed out after {timeout}s for task {task_id}")
|
||||
|
||||
task_data = await self.get_task_status(task_id)
|
||||
status = task_data.get("task_status")
|
||||
|
||||
logger.info(f"Kling task {task_id}: status={status}, elapsed={elapsed:.0f}s")
|
||||
|
||||
if status == "succeed":
|
||||
logger.info(f"Kling task {task_id} completed successfully")
|
||||
return task_data
|
||||
|
||||
if status == "failed":
|
||||
fail_reason = task_data.get("task_status_msg", "Unknown failure")
|
||||
raise KlingApiException(f"Video generation failed: {fail_reason}")
|
||||
|
||||
# Report progress estimate (linear approximation based on typical time)
|
||||
if progress_callback:
|
||||
# Estimate: typical gen is ~120s, cap at 90%
|
||||
estimated_progress = min(int((elapsed / 120) * 90), 90)
|
||||
attempt += 1
|
||||
await progress_callback(estimated_progress)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
@@ -56,6 +56,21 @@ class S3Adapter:
|
||||
print(f"Error downloading from S3: {e}")
|
||||
return None
|
||||
|
||||
async def stream_file(self, object_name: str, chunk_size: int = 65536):
|
||||
"""Streams a file from S3 yielding chunks. Memory-efficient for large files."""
|
||||
try:
|
||||
async with self._get_client() as client:
|
||||
response = await client.get_object(Bucket=self.bucket_name, Key=object_name)
|
||||
# aioboto3 Body is an aiohttp StreamReader wrapper
|
||||
body = response['Body']
|
||||
data = await body.read()
|
||||
# Yield in chunks to avoid holding entire response in StreamingResponse buffer
|
||||
for i in range(0, len(data), chunk_size):
|
||||
yield data[i:i + chunk_size]
|
||||
except ClientError as e:
|
||||
print(f"Error streaming from S3: {e}")
|
||||
return
|
||||
|
||||
async def delete_file(self, object_name: str):
|
||||
"""Deletes a file from S3."""
|
||||
try:
|
||||
|
||||
45
aiws.py
45
aiws.py
@@ -18,7 +18,6 @@ from prometheus_fastapi_instrumentator import Instrumentator
|
||||
|
||||
# --- ИМПОРТЫ ПРОЕКТА ---
|
||||
from adapters.google_adapter import GoogleAdapter
|
||||
from adapters.kling_adapter import KlingAdapter
|
||||
from adapters.s3_adapter import S3Adapter
|
||||
from api.service.generation_service import GenerationService
|
||||
from api.service.album_service import AlbumService
|
||||
@@ -84,18 +83,7 @@ s3_adapter = S3Adapter(
|
||||
|
||||
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
||||
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
||||
|
||||
# Kling Adapter (optional, for video generation)
|
||||
kling_access_key = os.getenv("KLING_ACCESS_KEY", "")
|
||||
kling_secret_key = os.getenv("KLING_SECRET_KEY", "")
|
||||
kling_adapter = None
|
||||
if kling_access_key and kling_secret_key:
|
||||
kling_adapter = KlingAdapter(access_key=kling_access_key, secret_key=kling_secret_key)
|
||||
logger.info("Kling adapter initialized")
|
||||
else:
|
||||
logger.warning("KLING_ACCESS_KEY / KLING_SECRET_KEY not set — video generation disabled")
|
||||
|
||||
generation_service = GenerationService(dao, gemini, s3_adapter, bot, kling_adapter)
|
||||
generation_service = GenerationService(dao, gemini, bot)
|
||||
album_service = AlbumService(dao)
|
||||
|
||||
# Dispatcher
|
||||
@@ -132,6 +120,17 @@ assets_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_
|
||||
gen_router.message.middleware(AlbumMiddleware(latency=0.8))
|
||||
|
||||
|
||||
async def start_scheduler(service: GenerationService):
|
||||
while True:
|
||||
try:
|
||||
logger.info("Running scheduler for stacked generation killing")
|
||||
await service.cleanup_stale_generations()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Scheduler error: {e}")
|
||||
await asyncio.sleep(600) # Check every 10 minutes
|
||||
|
||||
# --- LIFESPAN (Запуск FastAPI + Bot) ---
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
@@ -150,7 +149,6 @@ async def lifespan(app: FastAPI):
|
||||
app.state.gemini_client = gemini
|
||||
app.state.bot = bot
|
||||
app.state.s3_adapter = s3_adapter
|
||||
app.state.kling_adapter = kling_adapter
|
||||
app.state.album_service = album_service
|
||||
app.state.users_repo = users_repo # Добавляем репозиторий в state
|
||||
|
||||
@@ -164,17 +162,28 @@ async def lifespan(app: FastAPI):
|
||||
# )
|
||||
# print("🤖 Bot polling started")
|
||||
|
||||
# 3. ЗАПУСК ШЕДУЛЕРА
|
||||
scheduler_task = asyncio.create_task(start_scheduler(generation_service))
|
||||
print("⏰ Scheduler started")
|
||||
|
||||
yield
|
||||
|
||||
# --- SHUTDOWN ---
|
||||
print("🛑 Shutting down...")
|
||||
|
||||
# 3. Остановка бота
|
||||
polling_task.cancel()
|
||||
# 4. Остановка шедулера
|
||||
scheduler_task.cancel()
|
||||
try:
|
||||
await polling_task
|
||||
await scheduler_task
|
||||
except asyncio.CancelledError:
|
||||
print("🤖 Bot polling stopped")
|
||||
print("⏰ Scheduler stopped")
|
||||
|
||||
# 3. Остановка бота
|
||||
# polling_task.cancel()
|
||||
# try:
|
||||
# await polling_task
|
||||
# except asyncio.CancelledError:
|
||||
# print("🤖 Bot polling stopped")
|
||||
|
||||
# 4. Отключение БД
|
||||
# Обычно Motor закрывать не обязательно при выходе, но хорошим тоном считается
|
||||
|
||||
BIN
api/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
api/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/__pycache__/dependency.cpython-313.pyc
Normal file
BIN
api/__pycache__/dependency.cpython-313.pyc
Normal file
Binary file not shown.
@@ -3,7 +3,6 @@ from fastapi import Request, Depends
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
|
||||
from adapters.google_adapter import GoogleAdapter
|
||||
from adapters.kling_adapter import KlingAdapter
|
||||
from api.service.generation_service import GenerationService
|
||||
from repos.dao import DAO
|
||||
|
||||
@@ -37,18 +36,14 @@ def get_dao(
|
||||
# так что DAO создастся один раз за запрос.
|
||||
return DAO(mongo_client, s3_adapter)
|
||||
|
||||
def get_kling_adapter(request: Request) -> Optional[KlingAdapter]:
|
||||
return request.app.state.kling_adapter
|
||||
|
||||
# Провайдер сервиса (собирается из DAO и Gemini)
|
||||
def get_generation_service(
|
||||
dao: DAO = Depends(get_dao),
|
||||
gemini: GoogleAdapter = Depends(get_gemini_client),
|
||||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
||||
bot: Bot = Depends(get_bot_client),
|
||||
kling_adapter: Optional[KlingAdapter] = Depends(get_kling_adapter),
|
||||
) -> GenerationService:
|
||||
return GenerationService(dao, gemini, s3_adapter, bot, kling_adapter=kling_adapter)
|
||||
return GenerationService(dao, gemini, s3_adapter, bot)
|
||||
|
||||
from fastapi import Header
|
||||
|
||||
|
||||
BIN
api/endpoints/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/endpoints/__pycache__/admin.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/admin.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/endpoints/__pycache__/assets_router.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/assets_router.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/endpoints/__pycache__/auth.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/auth.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/endpoints/__pycache__/character_router.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/character_router.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/endpoints/__pycache__/generation_router.cpython-313.pyc
Normal file
BIN
api/endpoints/__pycache__/generation_router.cpython-313.pyc
Normal file
Binary file not shown.
@@ -9,7 +9,7 @@ from pymongo import MongoClient
|
||||
from starlette import status
|
||||
from starlette.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import Response, JSONResponse
|
||||
from starlette.responses import Response, JSONResponse, StreamingResponse
|
||||
|
||||
from adapters.s3_adapter import S3Adapter
|
||||
from api.models.AssetDTO import AssetsResponse, AssetResponse
|
||||
@@ -33,27 +33,46 @@ async def get_asset(
|
||||
asset_id: str,
|
||||
request: Request,
|
||||
thumbnail: bool = False,
|
||||
dao: DAO = Depends(get_dao)
|
||||
dao: DAO = Depends(get_dao),
|
||||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
||||
) -> Response:
|
||||
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
|
||||
asset = await dao.assets.get_asset(asset_id)
|
||||
# 2. Проверка на существование
|
||||
# Загружаем только метаданные (без data/thumbnail bytes)
|
||||
asset = await dao.assets.get_asset(asset_id, with_data=False)
|
||||
if not asset:
|
||||
raise HTTPException(status_code=404, detail="Asset not found")
|
||||
|
||||
headers = {
|
||||
# Кэшировать на 1 год (31536000 сек)
|
||||
"Cache-Control": "public, max-age=31536000, immutable"
|
||||
}
|
||||
|
||||
content = asset.data
|
||||
media_type = "image/png" # Default, or detect
|
||||
# Thumbnail: маленький, можно грузить в RAM
|
||||
if thumbnail:
|
||||
if asset.minio_thumbnail_object_name and s3_adapter:
|
||||
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
|
||||
if thumb_bytes:
|
||||
return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers)
|
||||
# Fallback: thumbnail in DB
|
||||
if asset.thumbnail:
|
||||
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers)
|
||||
# No thumbnail available — fall through to main content
|
||||
|
||||
if thumbnail and asset.thumbnail:
|
||||
content = asset.thumbnail
|
||||
media_type = "image/jpeg"
|
||||
# Main content: стримим из S3 без загрузки в RAM
|
||||
if asset.minio_object_name and s3_adapter:
|
||||
content_type = "image/png"
|
||||
# if asset.content_type == AssetContentType.VIDEO:
|
||||
# content_type = "video/mp4"
|
||||
return StreamingResponse(
|
||||
s3_adapter.stream_file(asset.minio_object_name),
|
||||
media_type=content_type,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
return Response(content=content, media_type=media_type, headers=headers)
|
||||
# Fallback: data stored in DB (legacy)
|
||||
if asset.data:
|
||||
return Response(content=asset.data, media_type="image/png", headers=headers)
|
||||
|
||||
raise HTTPException(status_code=404, detail="Asset data not found")
|
||||
|
||||
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
|
||||
async def delete_orphan_assets_from_minio(
|
||||
|
||||
@@ -8,8 +8,7 @@ from api import service
|
||||
from api.dependency import get_generation_service, get_project_id, get_dao
|
||||
from repos.dao import DAO
|
||||
|
||||
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest
|
||||
from api.models.VideoGenerationRequest import VideoGenerationRequest
|
||||
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest, GenerationGroupResponse
|
||||
from api.service.generation_service import GenerationService
|
||||
from models.Generation import Generation
|
||||
|
||||
@@ -69,12 +68,12 @@ async def get_generations(character_id: Optional[str] = None, limit: int = 10, o
|
||||
return await generation_service.get_generations(character_id, limit=limit, offset=offset, user_id=user_id_filter, project_id=project_id)
|
||||
|
||||
|
||||
@router.post("/_run", response_model=GenerationResponse)
|
||||
@router.post("/_run", response_model=GenerationGroupResponse)
|
||||
async def post_generation(generation: GenerationRequest, request: Request,
|
||||
generation_service: GenerationService = Depends(get_generation_service),
|
||||
current_user: dict = Depends(get_current_user),
|
||||
project_id: Optional[str] = Depends(get_project_id),
|
||||
dao: DAO = Depends(get_dao)) -> GenerationResponse:
|
||||
dao: DAO = Depends(get_dao)) -> GenerationGroupResponse:
|
||||
logger.info(f"post_generation (run) called. LinkedCharId: {generation.linked_character_id}, PromptLength: {len(generation.prompt)}")
|
||||
|
||||
if project_id:
|
||||
@@ -86,16 +85,6 @@ async def post_generation(generation: GenerationRequest, request: Request,
|
||||
return await generation_service.create_generation_task(generation, user_id=str(current_user.get("_id")))
|
||||
|
||||
|
||||
@router.get("/{generation_id}", response_model=GenerationResponse)
|
||||
async def get_generation(generation_id: str,
|
||||
generation_service: GenerationService = Depends(get_generation_service),
|
||||
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
|
||||
logger.debug(f"get_generation called for ID: {generation_id}")
|
||||
gen = await generation_service.get_generation(generation_id)
|
||||
if gen and gen.created_by != str(current_user["_id"]):
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
return gen
|
||||
|
||||
|
||||
@router.get("/running")
|
||||
async def get_running_generations(request: Request,
|
||||
@@ -114,25 +103,27 @@ async def get_running_generations(request: Request,
|
||||
return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id)
|
||||
|
||||
|
||||
@router.post("/video/_run", response_model=GenerationResponse)
|
||||
async def post_video_generation(
|
||||
video_request: VideoGenerationRequest,
|
||||
request: Request,
|
||||
generation_service: GenerationService = Depends(get_generation_service),
|
||||
current_user: dict = Depends(get_current_user),
|
||||
project_id: Optional[str] = Depends(get_project_id),
|
||||
dao: DAO = Depends(get_dao),
|
||||
) -> GenerationResponse:
|
||||
"""Start image-to-video generation using Kling AI."""
|
||||
logger.info(f"post_video_generation called. AssetId: {video_request.image_asset_id}, Duration: {video_request.duration}s, Mode: {video_request.mode}")
|
||||
@router.get("/group/{group_id}", response_model=GenerationGroupResponse)
|
||||
async def get_generation_group(group_id: str,
|
||||
generation_service: GenerationService = Depends(get_generation_service),
|
||||
current_user: dict = Depends(get_current_user)):
|
||||
logger.info(f"get_generation_group called for group_id: {group_id}")
|
||||
generations = await generation_service.dao.generations.get_generations_by_group(group_id)
|
||||
gen_responses = [GenerationResponse(**gen.model_dump()) for gen in generations]
|
||||
return GenerationGroupResponse(generation_group_id=group_id, generations=gen_responses)
|
||||
|
||||
|
||||
@router.get("/{generation_id}", response_model=GenerationResponse)
|
||||
async def get_generation(generation_id: str,
|
||||
generation_service: GenerationService = Depends(get_generation_service),
|
||||
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
|
||||
logger.debug(f"get_generation called for ID: {generation_id}")
|
||||
gen = await generation_service.get_generation(generation_id)
|
||||
if gen and gen.created_by != str(current_user["_id"]):
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
return gen
|
||||
|
||||
if project_id:
|
||||
project = await dao.projects.get_project(project_id)
|
||||
if not project or str(current_user["_id"]) not in project.members:
|
||||
raise HTTPException(status_code=403, detail="Project access denied")
|
||||
video_request.project_id = project_id
|
||||
|
||||
return await generation_service.create_video_generation_task(video_request, user_id=str(current_user.get("_id")))
|
||||
|
||||
|
||||
@router.post("/import", response_model=GenerationResponse)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from datetime import datetime, UTC
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from models.Asset import Asset
|
||||
from models.Generation import GenerationStatus
|
||||
@@ -17,6 +17,7 @@ class GenerationRequest(BaseModel):
|
||||
use_profile_image: bool = True
|
||||
assets_list: List[str]
|
||||
project_id: Optional[str] = None
|
||||
count: int = Field(default=1, ge=1, le=10)
|
||||
|
||||
|
||||
class GenerationsResponse(BaseModel):
|
||||
@@ -27,7 +28,6 @@ class GenerationsResponse(BaseModel):
|
||||
class GenerationResponse(BaseModel):
|
||||
id: str
|
||||
status: GenerationStatus
|
||||
gen_type: GenType = GenType.IMAGE
|
||||
failed_reason: Optional[str] = None
|
||||
|
||||
linked_character_id: Optional[str] = None
|
||||
@@ -46,14 +46,15 @@ class GenerationResponse(BaseModel):
|
||||
progress: int = 0
|
||||
cost: Optional[float] = None
|
||||
created_by: Optional[str] = None
|
||||
# Video-specific
|
||||
kling_task_id: Optional[str] = None
|
||||
video_duration: Optional[int] = None
|
||||
video_mode: Optional[str] = None
|
||||
generation_group_id: Optional[str] = None
|
||||
created_at: datetime = datetime.now(UTC)
|
||||
updated_at: datetime = datetime.now(UTC)
|
||||
|
||||
|
||||
class GenerationGroupResponse(BaseModel):
|
||||
generation_group_id: str
|
||||
generations: List[GenerationResponse]
|
||||
|
||||
|
||||
class PromptRequest(BaseModel):
|
||||
prompt: str
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class VideoGenerationRequest(BaseModel):
|
||||
prompt: str = ""
|
||||
negative_prompt: Optional[str] = ""
|
||||
image_asset_id: str # ID ассета-картинки для source image
|
||||
duration: int = 5 # 5 or 10 seconds
|
||||
mode: str = "std" # "std" or "pro"
|
||||
model_name: str = "kling-v2-1"
|
||||
cfg_scale: float = 0.5
|
||||
aspect_ratio: str = "16:9"
|
||||
linked_character_id: Optional[str] = None
|
||||
project_id: Optional[str] = None
|
||||
BIN
api/models/__pycache__/AssetDTO.cpython-313.pyc
Normal file
BIN
api/models/__pycache__/AssetDTO.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/models/__pycache__/GenerationRequest.cpython-313.pyc
Normal file
BIN
api/models/__pycache__/GenerationRequest.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/models/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
api/models/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/service/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
api/service/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
api/service/__pycache__/generation_service.cpython-313.pyc
Normal file
BIN
api/service/__pycache__/generation_service.cpython-313.pyc
Normal file
Binary file not shown.
@@ -5,15 +5,14 @@ import base64
|
||||
from datetime import datetime, UTC
|
||||
from typing import List, Optional, Tuple, Any, Dict
|
||||
from io import BytesIO
|
||||
from uuid import uuid4
|
||||
import httpx
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import BufferedInputFile
|
||||
from adapters.Exception import GoogleGenerationException
|
||||
from adapters.google_adapter import GoogleAdapter
|
||||
from adapters.kling_adapter import KlingAdapter, KlingApiException
|
||||
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse
|
||||
from api.models.VideoGenerationRequest import VideoGenerationRequest
|
||||
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse, GenerationGroupResponse
|
||||
# Импортируйте ваши модели DAO, Asset, Generation корректно
|
||||
from models.Asset import Asset, AssetType, AssetContentType
|
||||
from models.Generation import Generation, GenerationStatus
|
||||
@@ -23,6 +22,9 @@ from adapters.s3_adapter import S3Adapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Limit concurrent generations to 4
|
||||
generation_semaphore = asyncio.Semaphore(4)
|
||||
|
||||
|
||||
# --- Вспомогательная функция генерации ---
|
||||
async def generate_image_task(
|
||||
@@ -52,26 +54,27 @@ async def generate_image_task(
|
||||
logger.info(f"generate_image_task completed, received {len(generated_images_io) if generated_images_io else 0} images")
|
||||
except GoogleGenerationException as e:
|
||||
raise e
|
||||
finally:
|
||||
# Освобождаем входные данные — они больше не нужны
|
||||
del media_group_bytes
|
||||
|
||||
images_bytes = []
|
||||
if generated_images_io:
|
||||
for img_io in generated_images_io:
|
||||
# Читаем байты из BytesIO
|
||||
img_io.seek(0)
|
||||
content = img_io.read()
|
||||
images_bytes.append(content)
|
||||
|
||||
# Закрываем поток
|
||||
images_bytes.append(img_io.read())
|
||||
img_io.close()
|
||||
# Освобождаем список BytesIO сразу
|
||||
del generated_images_io
|
||||
|
||||
return images_bytes, metrics
|
||||
|
||||
class GenerationService:
|
||||
def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None, kling_adapter: Optional[KlingAdapter] = None):
|
||||
def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None):
|
||||
self.dao = dao
|
||||
self.gemini = gemini
|
||||
self.s3_adapter = s3_adapter
|
||||
self.bot = bot
|
||||
self.kling_adapter = kling_adapter
|
||||
|
||||
|
||||
async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str:
|
||||
@@ -114,23 +117,39 @@ class GenerationService:
|
||||
async def get_running_generations(self, user_id: Optional[str] = None, project_id: Optional[str] = None) -> List[Generation]:
|
||||
return await self.dao.generations.get_generations(status=GenerationStatus.RUNNING, created_by=user_id, project_id=project_id)
|
||||
|
||||
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
|
||||
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationGroupResponse:
|
||||
count = generation_request.count
|
||||
|
||||
if generation_group_id is None:
|
||||
generation_group_id = str(uuid4())
|
||||
|
||||
results = []
|
||||
for _ in range(count):
|
||||
gen_response = await self._create_single_generation(generation_request, user_id, generation_group_id)
|
||||
results.append(gen_response)
|
||||
return GenerationGroupResponse(generation_group_id=generation_group_id, generations=results)
|
||||
|
||||
async def _create_single_generation(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationResponse:
|
||||
gen_id = None
|
||||
generation_model = None
|
||||
|
||||
try:
|
||||
generation_model = Generation(**generation_request.model_dump())
|
||||
generation_model = Generation(**generation_request.model_dump(exclude={'count'}))
|
||||
if user_id:
|
||||
generation_model.created_by = user_id
|
||||
if generation_group_id:
|
||||
generation_model.generation_group_id = generation_group_id
|
||||
|
||||
gen_id = await self.dao.generations.create_generation(generation_model)
|
||||
generation_model.id = gen_id
|
||||
|
||||
async def runner(gen):
|
||||
logger.info(f"Starting background generation task for ID: {gen.id}")
|
||||
logger.info(f"Generation {gen.id} entered queue (waiting for slot)...")
|
||||
try:
|
||||
await self.create_generation(gen)
|
||||
logger.info(f"Background generation task finished for ID: {gen.id}")
|
||||
async with generation_semaphore:
|
||||
logger.info(f"Starting background generation task for ID: {gen.id}")
|
||||
await self.create_generation(gen)
|
||||
logger.info(f"Background generation task finished for ID: {gen.id}")
|
||||
except Exception:
|
||||
# если генерация уже пошла и упала — пометим FAILED
|
||||
try:
|
||||
@@ -431,168 +450,6 @@ class GenerationService:
|
||||
|
||||
return generation
|
||||
|
||||
# === VIDEO GENERATION (Kling) ===
|
||||
|
||||
async def create_video_generation_task(self, request: VideoGenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
|
||||
"""Create a video generation task (async, returns immediately)."""
|
||||
if not self.kling_adapter:
|
||||
raise Exception("Kling adapter is not configured")
|
||||
|
||||
generation = Generation(
|
||||
status=GenerationStatus.RUNNING,
|
||||
gen_type=GenType.VIDEO,
|
||||
linked_character_id=request.linked_character_id,
|
||||
aspect_ratio=AspectRatios.SIXTEENNINE, # default for video
|
||||
quality=Quality.ONEK,
|
||||
prompt=request.prompt,
|
||||
assets_list=[request.image_asset_id],
|
||||
video_duration=request.duration,
|
||||
video_mode=request.mode,
|
||||
project_id=request.project_id,
|
||||
)
|
||||
if user_id:
|
||||
generation.created_by = user_id
|
||||
|
||||
gen_id = await self.dao.generations.create_generation(generation)
|
||||
generation.id = gen_id
|
||||
|
||||
async def runner(gen, req):
|
||||
logger.info(f"Starting background video generation task for ID: {gen.id}")
|
||||
try:
|
||||
await self.create_video_generation(gen, req)
|
||||
logger.info(f"Background video generation task finished for ID: {gen.id}")
|
||||
except Exception:
|
||||
try:
|
||||
db_gen = await self.dao.generations.get_generation(gen.id)
|
||||
if db_gen and db_gen.status != GenerationStatus.FAILED:
|
||||
db_gen.status = GenerationStatus.FAILED
|
||||
db_gen.updated_at = datetime.now(UTC)
|
||||
await self.dao.generations.update_generation(db_gen)
|
||||
except Exception:
|
||||
logger.exception("Failed to mark video generation as FAILED")
|
||||
logger.exception("create_video_generation task failed")
|
||||
|
||||
asyncio.create_task(runner(generation, request))
|
||||
return GenerationResponse(**generation.model_dump())
|
||||
|
||||
async def create_video_generation(self, generation: Generation, request: VideoGenerationRequest):
|
||||
"""Background video generation: call Kling API, poll, download result, save asset."""
|
||||
start_time = datetime.now()
|
||||
|
||||
try:
|
||||
# 1. Get source image presigned URL
|
||||
asset = await self.dao.assets.get_asset(request.image_asset_id)
|
||||
if not asset:
|
||||
raise Exception(f"Asset {request.image_asset_id} not found")
|
||||
|
||||
if not asset.minio_object_name:
|
||||
raise Exception(f"Asset {request.image_asset_id} has no S3 object")
|
||||
|
||||
presigned_url = await self.s3_adapter.get_presigned_url(asset.minio_object_name, expiration=3600)
|
||||
if not presigned_url:
|
||||
raise Exception("Failed to generate presigned URL for source image")
|
||||
|
||||
logger.info(f"Video gen {generation.id}: got presigned URL for asset {request.image_asset_id}")
|
||||
|
||||
# 2. Create Kling task
|
||||
task_data = await self.kling_adapter.create_video_task(
|
||||
image_url=presigned_url,
|
||||
prompt=request.prompt,
|
||||
negative_prompt=request.negative_prompt or "",
|
||||
model_name=request.model_name,
|
||||
duration=request.duration,
|
||||
mode=request.mode,
|
||||
cfg_scale=request.cfg_scale,
|
||||
aspect_ratio=request.aspect_ratio,
|
||||
)
|
||||
|
||||
task_id = task_data.get("task_id")
|
||||
generation.kling_task_id = task_id
|
||||
await self.dao.generations.update_generation(generation)
|
||||
|
||||
logger.info(f"Video gen {generation.id}: Kling task created, task_id={task_id}")
|
||||
|
||||
# 3. Poll for completion with progress updates
|
||||
async def progress_callback(progress_pct: int):
|
||||
generation.progress = progress_pct
|
||||
generation.updated_at = datetime.now(UTC)
|
||||
await self.dao.generations.update_generation(generation)
|
||||
|
||||
result = await self.kling_adapter.wait_for_completion(
|
||||
task_id=task_id,
|
||||
poll_interval=10,
|
||||
timeout=600,
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
|
||||
# 4. Extract video URL and download
|
||||
works = result.get("task_result", {}).get("videos", [])
|
||||
if not works:
|
||||
raise Exception("No video in Kling result")
|
||||
|
||||
video_url = works[0].get("url")
|
||||
video_duration = works[0].get("duration", request.duration)
|
||||
if not video_url:
|
||||
raise Exception("No video URL in Kling result")
|
||||
|
||||
logger.info(f"Video gen {generation.id}: downloading video from {video_url}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
video_response = await client.get(video_url)
|
||||
video_response.raise_for_status()
|
||||
video_bytes = video_response.content
|
||||
|
||||
logger.info(f"Video gen {generation.id}: downloaded {len(video_bytes)} bytes")
|
||||
|
||||
# 5. Upload to S3
|
||||
filename = f"generated_video/{generation.linked_character_id or 'no_char'}/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{random.randint(1000, 9999)}.mp4"
|
||||
await self.s3_adapter.upload_file(filename, video_bytes, content_type="video/mp4")
|
||||
|
||||
# 6. Create Asset
|
||||
new_asset = Asset(
|
||||
name=f"Video_{generation.linked_character_id or 'gen'}",
|
||||
type=AssetType.GENERATED,
|
||||
content_type=AssetContentType.VIDEO,
|
||||
linked_char_id=generation.linked_character_id,
|
||||
data=None,
|
||||
minio_object_name=filename,
|
||||
minio_bucket=self.s3_adapter.bucket_name,
|
||||
thumbnail=None, # видео thumbnails можно добавить позже
|
||||
created_by=generation.created_by,
|
||||
project_id=generation.project_id,
|
||||
)
|
||||
|
||||
asset_id = await self.dao.assets.create_asset(new_asset)
|
||||
new_asset.id = str(asset_id)
|
||||
|
||||
# 7. Finalize generation
|
||||
end_time = datetime.now()
|
||||
generation.result_list = [new_asset.id]
|
||||
generation.result = new_asset.id
|
||||
generation.status = GenerationStatus.DONE
|
||||
generation.progress = 100
|
||||
generation.video_duration = video_duration
|
||||
generation.execution_time_seconds = (end_time - start_time).total_seconds()
|
||||
generation.updated_at = datetime.now(UTC)
|
||||
await self.dao.generations.update_generation(generation)
|
||||
|
||||
logger.info(f"Video generation {generation.id} completed. Asset: {new_asset.id}, Time: {generation.execution_time_seconds:.1f}s")
|
||||
|
||||
except KlingApiException as e:
|
||||
logger.error(f"Kling API error for generation {generation.id}: {e}")
|
||||
generation.status = GenerationStatus.FAILED
|
||||
generation.failed_reason = str(e)
|
||||
generation.updated_at = datetime.now(UTC)
|
||||
await self.dao.generations.update_generation(generation)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Video generation {generation.id} failed: {e}")
|
||||
generation.status = GenerationStatus.FAILED
|
||||
generation.failed_reason = str(e)
|
||||
generation.updated_at = datetime.now(UTC)
|
||||
await self.dao.generations.update_generation(generation)
|
||||
raise
|
||||
|
||||
async def delete_generation(self, generation_id: str) -> bool:
|
||||
"""
|
||||
Soft delete generation by marking it as deleted.
|
||||
@@ -609,3 +466,14 @@ class GenerationService:
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting generation {generation_id}: {e}")
|
||||
return False
|
||||
|
||||
async def cleanup_stale_generations(self):
|
||||
"""
|
||||
Cancels generations that have been running for more than 1 hour.
|
||||
"""
|
||||
try:
|
||||
count = await self.dao.generations.cancel_stale_generations(timeout_minutes=60)
|
||||
if count > 0:
|
||||
logger.info(f"Cleaned up {count} stale generations (timeout)")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up stale generations: {e}")
|
||||
BIN
middlewares/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
middlewares/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
middlewares/__pycache__/album.cpython-313.pyc
Normal file
BIN
middlewares/__pycache__/album.cpython-313.pyc
Normal file
Binary file not shown.
BIN
middlewares/__pycache__/auth.cpython-313.pyc
Normal file
BIN
middlewares/__pycache__/auth.cpython-313.pyc
Normal file
Binary file not shown.
BIN
middlewares/__pycache__/dao.cpython-313.pyc
Normal file
BIN
middlewares/__pycache__/dao.cpython-313.pyc
Normal file
Binary file not shown.
@@ -7,7 +7,6 @@ from pydantic import BaseModel, computed_field, Field, model_validator
|
||||
|
||||
class AssetContentType(str, Enum):
|
||||
IMAGE = 'image'
|
||||
VIDEO = 'video'
|
||||
PROMPT = 'prompt'
|
||||
|
||||
class AssetType(str, Enum):
|
||||
|
||||
@@ -16,7 +16,6 @@ class GenerationStatus(str, Enum):
|
||||
class Generation(BaseModel):
|
||||
id: Optional[str] = None
|
||||
status: GenerationStatus = GenerationStatus.RUNNING
|
||||
gen_type: GenType = GenType.IMAGE
|
||||
failed_reason: Optional[str] = None
|
||||
linked_character_id: Optional[str] = None
|
||||
telegram_id: Optional[int] = None
|
||||
@@ -36,12 +35,9 @@ class Generation(BaseModel):
|
||||
output_token_usage: Optional[int] = None
|
||||
is_deleted: bool = False
|
||||
album_id: Optional[str] = None
|
||||
generation_group_id: Optional[str] = None
|
||||
created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId)
|
||||
project_id: Optional[str] = None
|
||||
# Video-specific fields
|
||||
kling_task_id: Optional[str] = None
|
||||
video_duration: Optional[int] = None # 5 or 10 seconds
|
||||
video_mode: Optional[str] = None # "std" or "pro"
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||
|
||||
|
||||
BIN
models/__pycache__/Asset.cpython-313.pyc
Normal file
BIN
models/__pycache__/Asset.cpython-313.pyc
Normal file
Binary file not shown.
BIN
models/__pycache__/Character.cpython-313.pyc
Normal file
BIN
models/__pycache__/Character.cpython-313.pyc
Normal file
Binary file not shown.
BIN
models/__pycache__/Generation.cpython-313.pyc
Normal file
BIN
models/__pycache__/Generation.cpython-313.pyc
Normal file
Binary file not shown.
BIN
models/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
models/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
models/__pycache__/enums.cpython-313.pyc
Normal file
BIN
models/__pycache__/enums.cpython-313.pyc
Normal file
Binary file not shown.
@@ -34,12 +34,10 @@ class Quality(str, Enum):
|
||||
class GenType(str, Enum):
|
||||
TEXT = 'Text'
|
||||
IMAGE = 'Image'
|
||||
VIDEO = 'Video'
|
||||
|
||||
@property
|
||||
def value_type(self) -> str:
|
||||
return {
|
||||
GenType.TEXT: 'Text',
|
||||
GenType.IMAGE: 'Image',
|
||||
GenType.VIDEO: 'Video',
|
||||
}[self]
|
||||
|
||||
BIN
repos/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
repos/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
repos/__pycache__/assets_repo.cpython-313.pyc
Normal file
BIN
repos/__pycache__/assets_repo.cpython-313.pyc
Normal file
Binary file not shown.
BIN
repos/__pycache__/char_repo.cpython-313.pyc
Normal file
BIN
repos/__pycache__/char_repo.cpython-313.pyc
Normal file
Binary file not shown.
BIN
repos/__pycache__/dao.cpython-313.pyc
Normal file
BIN
repos/__pycache__/dao.cpython-313.pyc
Normal file
Binary file not shown.
BIN
repos/__pycache__/generation_repo.cpython-313.pyc
Normal file
BIN
repos/__pycache__/generation_repo.cpython-313.pyc
Normal file
Binary file not shown.
BIN
repos/__pycache__/user_repo.cpython-313.pyc
Normal file
BIN
repos/__pycache__/user_repo.cpython-313.pyc
Normal file
Binary file not shown.
@@ -1,4 +1,5 @@
|
||||
from typing import Optional, List
|
||||
from datetime import datetime, timedelta, UTC
|
||||
|
||||
from PIL.ImageChops import offset
|
||||
from bson import ObjectId
|
||||
@@ -77,3 +78,28 @@ class GenerationRepo:
|
||||
|
||||
async def update_generation(self, generation: Generation, ):
|
||||
res = await self.collection.update_one({"_id": ObjectId(generation.id)}, {"$set": generation.model_dump()})
|
||||
|
||||
async def get_generations_by_group(self, group_id: str) -> List[Generation]:
|
||||
res = await self.collection.find({"generation_group_id": group_id, "is_deleted": False}).sort("created_at", 1).to_list(None)
|
||||
generations: List[Generation] = []
|
||||
for generation in res:
|
||||
generation["id"] = str(generation.pop("_id"))
|
||||
generations.append(Generation(**generation))
|
||||
return generations
|
||||
|
||||
async def cancel_stale_generations(self, timeout_minutes: int = 60) -> int:
|
||||
cutoff_time = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
|
||||
res = await self.collection.update_many(
|
||||
{
|
||||
"status": GenerationStatus.RUNNING,
|
||||
"created_at": {"$lt": cutoff_time}
|
||||
},
|
||||
{
|
||||
"$set": {
|
||||
"status": GenerationStatus.FAILED,
|
||||
"failed_reason": "Timeout: Execution time limit exceeded",
|
||||
"updated_at": datetime.now(UTC)
|
||||
}
|
||||
}
|
||||
)
|
||||
return res.modified_count
|
||||
|
||||
@@ -51,4 +51,3 @@ python-jose[cryptography]==3.3.0
|
||||
python-multipart==0.0.22
|
||||
email-validator
|
||||
prometheus-fastapi-instrumentator
|
||||
PyJWT
|
||||
|
||||
BIN
routers/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
routers/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
routers/__pycache__/assets_router.cpython-313.pyc
Normal file
BIN
routers/__pycache__/assets_router.cpython-313.pyc
Normal file
Binary file not shown.
BIN
routers/__pycache__/auth_router.cpython-313.pyc
Normal file
BIN
routers/__pycache__/auth_router.cpython-313.pyc
Normal file
Binary file not shown.
BIN
routers/__pycache__/char_router.cpython-313.pyc
Normal file
BIN
routers/__pycache__/char_router.cpython-313.pyc
Normal file
Binary file not shown.
BIN
routers/__pycache__/gen_router.cpython-313.pyc
Normal file
BIN
routers/__pycache__/gen_router.cpython-313.pyc
Normal file
Binary file not shown.
Binary file not shown.
BIN
tests/__pycache__/test_auth_flow.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/__pycache__/test_auth_flow.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
52
tests/test_scheduler.py
Normal file
52
tests/test_scheduler.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from models.Generation import Generation, GenerationStatus
|
||||
from repos.generation_repo import GenerationRepo
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Mock configs if not present in env
|
||||
MONGO_HOST = os.getenv("MONGO_HOST", "mongodb://localhost:27017")
|
||||
DB_NAME = os.getenv("DB_NAME", "bot_db")
|
||||
|
||||
print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}")
|
||||
|
||||
async def test_scheduler():
|
||||
client = AsyncIOMotorClient(MONGO_HOST)
|
||||
repo = GenerationRepo(client, db_name=DB_NAME)
|
||||
|
||||
# 1. Create a "stale" generation (2 hours ago)
|
||||
stale_gen = Generation(
|
||||
prompt="stale test",
|
||||
status=GenerationStatus.RUNNING,
|
||||
created_at=datetime.now(UTC) - timedelta(minutes=120),
|
||||
assets_list=[],
|
||||
aspect_ratio="NINESIXTEEN",
|
||||
quality="ONEK"
|
||||
)
|
||||
gen_id = await repo.create_generation(stale_gen)
|
||||
print(f"Created stale generation: {gen_id}")
|
||||
|
||||
# 2. Run cleanup
|
||||
print("Running cleanup...")
|
||||
count = await repo.cancel_stale_generations(timeout_minutes=60)
|
||||
print(f"Cleaned up {count} generations")
|
||||
|
||||
# 3. Verify status
|
||||
updated_gen = await repo.get_generation(gen_id)
|
||||
print(f"Generation status: {updated_gen.status}")
|
||||
print(f"Failed reason: {updated_gen.failed_reason}")
|
||||
|
||||
if updated_gen.status == GenerationStatus.FAILED:
|
||||
print("✅ SUCCESS: Generation marked as FAILED")
|
||||
else:
|
||||
print("❌ FAILURE: Generation status not updated")
|
||||
|
||||
# Cleanup
|
||||
await repo.collection.delete_one({"_id": updated_gen.id}) # Remove test data
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_scheduler())
|
||||
BIN
utils/__pycache__/image_utils.cpython-313.pyc
Normal file
BIN
utils/__pycache__/image_utils.cpython-313.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/security.cpython-313.pyc
Normal file
BIN
utils/__pycache__/security.cpython-313.pyc
Normal file
Binary file not shown.
Reference in New Issue
Block a user