Compare commits
3 Commits
e2c050515d
...
video
| Author | SHA1 | Date | |
|---|---|---|---|
| 32ff77e04b | |||
| d1f67c773f | |||
| c63b51ef75 |
2
.env
2
.env
@@ -9,3 +9,5 @@ MINIO_SECRET_KEY=SuperSecretPassword123!
|
|||||||
MINIO_BUCKET=ai-char
|
MINIO_BUCKET=ai-char
|
||||||
MODE=production
|
MODE=production
|
||||||
EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW
|
EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW
|
||||||
|
KLING_ACCESS_KEY=AngRfYYeLhPQB3pmr9CpHfgHPCrmeeM4
|
||||||
|
KLING_SECRET_KEY=ndJfyayfQgbg4bMnE49yHnkACPChKMp4
|
||||||
14
.gitignore
vendored
14
.gitignore
vendored
@@ -9,17 +9,3 @@ minio_backup.tar.gz
|
|||||||
.idea
|
.idea
|
||||||
.venv
|
.venv
|
||||||
.vscode
|
.vscode
|
||||||
.vscode/launch.json
|
|
||||||
middlewares/__pycache__/
|
|
||||||
middlewares/*.pyc
|
|
||||||
api/__pycache__/
|
|
||||||
api/*.pyc
|
|
||||||
repos/__pycache__/
|
|
||||||
repos/*.pyc
|
|
||||||
adapters/__pycache__/
|
|
||||||
adapters/*.pyc
|
|
||||||
services/__pycache__/
|
|
||||||
services/*.pyc
|
|
||||||
utils/__pycache__/
|
|
||||||
utils/*.pyc
|
|
||||||
.vscode/launch.json
|
|
||||||
|
|||||||
25
.vscode/launch.json
vendored
25
.vscode/launch.json
vendored
@@ -16,6 +16,31 @@
|
|||||||
],
|
],
|
||||||
"jinja": true,
|
"jinja": true,
|
||||||
"justMyCode": true
|
"justMyCode": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Python: Current File",
|
||||||
|
"type": "debugpy",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${file}",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"justMyCode": true,
|
||||||
|
"env": {
|
||||||
|
"PYTHONPATH": "${workspaceFolder}"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Debug Tests: Current File",
|
||||||
|
"type": "debugpy",
|
||||||
|
"request": "launch",
|
||||||
|
"module": "pytest",
|
||||||
|
"args": [
|
||||||
|
"${file}"
|
||||||
|
],
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"justMyCode": true,
|
||||||
|
"env": {
|
||||||
|
"PYTHONPATH": "${workspaceFolder}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -23,30 +23,28 @@ class GoogleAdapter:
|
|||||||
self.TEXT_MODEL = "gemini-3-pro-preview"
|
self.TEXT_MODEL = "gemini-3-pro-preview"
|
||||||
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
||||||
|
|
||||||
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> tuple:
|
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> list:
|
||||||
"""Вспомогательный метод для подготовки контента (текст + картинки).
|
"""Вспомогательный метод для подготовки контента (текст + картинки)"""
|
||||||
Returns (contents, opened_images) — caller MUST close opened_images after use."""
|
|
||||||
contents = [prompt]
|
contents = [prompt]
|
||||||
opened_images = []
|
|
||||||
if images_list:
|
if images_list:
|
||||||
logger.info(f"Preparing content with {len(images_list)} images")
|
logger.info(f"Preparing content with {len(images_list)} images")
|
||||||
for img_bytes in images_list:
|
for img_bytes in images_list:
|
||||||
try:
|
try:
|
||||||
|
# Gemini API требует PIL Image на входе
|
||||||
image = Image.open(io.BytesIO(img_bytes))
|
image = Image.open(io.BytesIO(img_bytes))
|
||||||
contents.append(image)
|
contents.append(image)
|
||||||
opened_images.append(image)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing input image: {e}")
|
logger.error(f"Error processing input image: {e}")
|
||||||
else:
|
else:
|
||||||
logger.info("Preparing content with no images")
|
logger.info("Preparing content with no images")
|
||||||
return contents, opened_images
|
return contents
|
||||||
|
|
||||||
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
|
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
|
||||||
"""
|
"""
|
||||||
Генерация текста (Чат или Vision).
|
Генерация текста (Чат или Vision).
|
||||||
Возвращает строку с ответом.
|
Возвращает строку с ответом.
|
||||||
"""
|
"""
|
||||||
contents, opened_images = self._prepare_contents(prompt, images_list)
|
contents = self._prepare_contents(prompt, images_list)
|
||||||
logger.info(f"Generating text: {prompt}")
|
logger.info(f"Generating text: {prompt}")
|
||||||
try:
|
try:
|
||||||
response = self.client.models.generate_content(
|
response = self.client.models.generate_content(
|
||||||
@@ -70,9 +68,6 @@ class GoogleAdapter:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Gemini Text API Error: {e}")
|
logger.error(f"Gemini Text API Error: {e}")
|
||||||
raise GoogleGenerationException(f"Gemini Text API Error: {e}")
|
raise GoogleGenerationException(f"Gemini Text API Error: {e}")
|
||||||
finally:
|
|
||||||
for img in opened_images:
|
|
||||||
img.close()
|
|
||||||
|
|
||||||
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
@@ -80,7 +75,7 @@ class GoogleAdapter:
|
|||||||
Возвращает список байтовых потоков (готовых к отправке).
|
Возвращает список байтовых потоков (готовых к отправке).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
contents, opened_images = self._prepare_contents(prompt, images_list)
|
contents = self._prepare_contents(prompt, images_list)
|
||||||
logger.info(f"Generating image. Prompt length: {len(prompt)}, Ratio: {aspect_ratio}, Quality: {quality}")
|
logger.info(f"Generating image. Prompt length: {len(prompt)}, Ratio: {aspect_ratio}, Quality: {quality}")
|
||||||
|
|
||||||
start_time = datetime.now()
|
start_time = datetime.now()
|
||||||
@@ -106,20 +101,8 @@ class GoogleAdapter:
|
|||||||
if response.usage_metadata:
|
if response.usage_metadata:
|
||||||
token_usage = response.usage_metadata.total_token_count
|
token_usage = response.usage_metadata.total_token_count
|
||||||
|
|
||||||
# Check prompt-level block (e.g. PROHIBITED_CONTENT) — no candidates in this case
|
if response.parts is None and response.candidates[0].finish_reason is not None:
|
||||||
if response.prompt_feedback and response.prompt_feedback.block_reason:
|
raise GoogleGenerationException(f"Generation blocked in cause of {response.candidates[0].finish_reason.value}")
|
||||||
raise GoogleGenerationException(
|
|
||||||
f"Generation blocked at prompt level: {response.prompt_feedback.block_reason.value}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check candidate-level block
|
|
||||||
if response.parts is None:
|
|
||||||
response_reason = (
|
|
||||||
response.candidates[0].finish_reason
|
|
||||||
if response.candidates and len(response.candidates) > 0
|
|
||||||
else "Unknown"
|
|
||||||
)
|
|
||||||
raise GoogleGenerationException(f"Generation blocked: {response_reason}")
|
|
||||||
|
|
||||||
generated_images = []
|
generated_images = []
|
||||||
|
|
||||||
@@ -165,7 +148,3 @@ class GoogleAdapter:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Gemini Image API Error: {e}")
|
logger.error(f"Gemini Image API Error: {e}")
|
||||||
raise GoogleGenerationException(f"Gemini Image API Error: {e}")
|
raise GoogleGenerationException(f"Gemini Image API Error: {e}")
|
||||||
finally:
|
|
||||||
for img in opened_images:
|
|
||||||
img.close()
|
|
||||||
del contents
|
|
||||||
165
adapters/kling_adapter.py
Normal file
165
adapters/kling_adapter.py
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from typing import Optional, Dict, Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import jwt
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
KLING_API_BASE = "https://api.klingai.com"
|
||||||
|
|
||||||
|
|
||||||
|
class KlingApiException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KlingAdapter:
|
||||||
|
def __init__(self, access_key: str, secret_key: str):
|
||||||
|
if not access_key or not secret_key:
|
||||||
|
raise ValueError("Kling API credentials are missing")
|
||||||
|
self.access_key = access_key
|
||||||
|
self.secret_key = secret_key
|
||||||
|
|
||||||
|
def _generate_token(self) -> str:
|
||||||
|
"""Generate a JWT token for Kling API authentication."""
|
||||||
|
now = int(time.time())
|
||||||
|
payload = {
|
||||||
|
"iss": self.access_key,
|
||||||
|
"exp": now + 1800, # 30 minutes
|
||||||
|
"iat": now - 5, # небольшой запас назад
|
||||||
|
"nbf": now - 5,
|
||||||
|
}
|
||||||
|
return jwt.encode(payload, self.secret_key, algorithm="HS256",
|
||||||
|
headers={"typ": "JWT", "alg": "HS256"})
|
||||||
|
|
||||||
|
def _headers(self) -> dict:
|
||||||
|
return {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": f"Bearer {self._generate_token()}"
|
||||||
|
}
|
||||||
|
|
||||||
|
async def create_video_task(
|
||||||
|
self,
|
||||||
|
image_url: str,
|
||||||
|
prompt: str = "",
|
||||||
|
negative_prompt: str = "",
|
||||||
|
model_name: str = "kling-v2-6",
|
||||||
|
duration: int = 5,
|
||||||
|
mode: str = "std",
|
||||||
|
cfg_scale: float = 0.5,
|
||||||
|
aspect_ratio: str = "16:9",
|
||||||
|
callback_url: Optional[str] = None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Create an image-to-video generation task.
|
||||||
|
Returns the full task data dict including task_id.
|
||||||
|
"""
|
||||||
|
body: Dict[str, Any] = {
|
||||||
|
"model_name": model_name,
|
||||||
|
"image": image_url,
|
||||||
|
"prompt": prompt,
|
||||||
|
"negative_prompt": negative_prompt,
|
||||||
|
"duration": str(duration),
|
||||||
|
"mode": mode,
|
||||||
|
"cfg_scale": cfg_scale,
|
||||||
|
"aspect_ratio": aspect_ratio,
|
||||||
|
}
|
||||||
|
if callback_url:
|
||||||
|
body["callback_url"] = callback_url
|
||||||
|
|
||||||
|
logger.info(f"Creating Kling video task. Model: {model_name}, Duration: {duration}s, Mode: {mode}")
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
response = await client.post(
|
||||||
|
f"{KLING_API_BASE}/v1/videos/image2video",
|
||||||
|
headers=self._headers(),
|
||||||
|
json=body,
|
||||||
|
)
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
logger.info(f"Kling create task response: code={data.get('code')}, message={data.get('message')}")
|
||||||
|
|
||||||
|
if response.status_code != 200 or data.get("code") != 0:
|
||||||
|
error_msg = data.get("message", "Unknown Kling API error")
|
||||||
|
raise KlingApiException(f"Failed to create video task: {error_msg} (code={data.get('code')})")
|
||||||
|
|
||||||
|
task_data = data.get("data", {})
|
||||||
|
task_id = task_data.get("task_id")
|
||||||
|
if not task_id:
|
||||||
|
raise KlingApiException("No task_id returned from Kling API")
|
||||||
|
|
||||||
|
logger.info(f"Kling video task created: task_id={task_id}")
|
||||||
|
return task_data
|
||||||
|
|
||||||
|
async def get_task_status(self, task_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Query the status of a video generation task.
|
||||||
|
Returns the full task data dict.
|
||||||
|
"""
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
response = await client.get(
|
||||||
|
f"{KLING_API_BASE}/v1/videos/image2video/{task_id}",
|
||||||
|
headers=self._headers(),
|
||||||
|
)
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
if response.status_code != 200 or data.get("code") != 0:
|
||||||
|
error_msg = data.get("message", "Unknown error")
|
||||||
|
raise KlingApiException(f"Failed to query task {task_id}: {error_msg}")
|
||||||
|
|
||||||
|
return data.get("data", {})
|
||||||
|
|
||||||
|
async def wait_for_completion(
|
||||||
|
self,
|
||||||
|
task_id: str,
|
||||||
|
poll_interval: int = 10,
|
||||||
|
timeout: int = 600,
|
||||||
|
progress_callback=None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Poll the task status until completion.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: Kling task ID
|
||||||
|
poll_interval: seconds between polls
|
||||||
|
timeout: max seconds to wait
|
||||||
|
progress_callback: async callable(progress_pct: int) to report progress
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Final task data dict with video URL on success.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KlingApiException on failure or timeout.
|
||||||
|
"""
|
||||||
|
start = time.time()
|
||||||
|
attempt = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
elapsed = time.time() - start
|
||||||
|
if elapsed > timeout:
|
||||||
|
raise KlingApiException(f"Video generation timed out after {timeout}s for task {task_id}")
|
||||||
|
|
||||||
|
task_data = await self.get_task_status(task_id)
|
||||||
|
status = task_data.get("task_status")
|
||||||
|
|
||||||
|
logger.info(f"Kling task {task_id}: status={status}, elapsed={elapsed:.0f}s")
|
||||||
|
|
||||||
|
if status == "succeed":
|
||||||
|
logger.info(f"Kling task {task_id} completed successfully")
|
||||||
|
return task_data
|
||||||
|
|
||||||
|
if status == "failed":
|
||||||
|
fail_reason = task_data.get("task_status_msg", "Unknown failure")
|
||||||
|
raise KlingApiException(f"Video generation failed: {fail_reason}")
|
||||||
|
|
||||||
|
# Report progress estimate (linear approximation based on typical time)
|
||||||
|
if progress_callback:
|
||||||
|
# Estimate: typical gen is ~120s, cap at 90%
|
||||||
|
estimated_progress = min(int((elapsed / 120) * 90), 90)
|
||||||
|
attempt += 1
|
||||||
|
await progress_callback(estimated_progress)
|
||||||
|
|
||||||
|
await asyncio.sleep(poll_interval)
|
||||||
@@ -56,21 +56,6 @@ class S3Adapter:
|
|||||||
print(f"Error downloading from S3: {e}")
|
print(f"Error downloading from S3: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def stream_file(self, object_name: str, chunk_size: int = 65536):
|
|
||||||
"""Streams a file from S3 yielding chunks. Memory-efficient for large files."""
|
|
||||||
try:
|
|
||||||
async with self._get_client() as client:
|
|
||||||
response = await client.get_object(Bucket=self.bucket_name, Key=object_name)
|
|
||||||
# aioboto3 Body is an aiohttp StreamReader wrapper
|
|
||||||
body = response['Body']
|
|
||||||
data = await body.read()
|
|
||||||
# Yield in chunks to avoid holding entire response in StreamingResponse buffer
|
|
||||||
for i in range(0, len(data), chunk_size):
|
|
||||||
yield data[i:i + chunk_size]
|
|
||||||
except ClientError as e:
|
|
||||||
print(f"Error streaming from S3: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
async def delete_file(self, object_name: str):
|
async def delete_file(self, object_name: str):
|
||||||
"""Deletes a file from S3."""
|
"""Deletes a file from S3."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
47
aiws.py
47
aiws.py
@@ -18,6 +18,7 @@ from prometheus_fastapi_instrumentator import Instrumentator
|
|||||||
|
|
||||||
# --- ИМПОРТЫ ПРОЕКТА ---
|
# --- ИМПОРТЫ ПРОЕКТА ---
|
||||||
from adapters.google_adapter import GoogleAdapter
|
from adapters.google_adapter import GoogleAdapter
|
||||||
|
from adapters.kling_adapter import KlingAdapter
|
||||||
from adapters.s3_adapter import S3Adapter
|
from adapters.s3_adapter import S3Adapter
|
||||||
from api.service.generation_service import GenerationService
|
from api.service.generation_service import GenerationService
|
||||||
from api.service.album_service import AlbumService
|
from api.service.album_service import AlbumService
|
||||||
@@ -83,7 +84,18 @@ s3_adapter = S3Adapter(
|
|||||||
|
|
||||||
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
|
||||||
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
|
||||||
generation_service = GenerationService(dao, gemini, bot)
|
|
||||||
|
# Kling Adapter (optional, for video generation)
|
||||||
|
kling_access_key = os.getenv("KLING_ACCESS_KEY", "")
|
||||||
|
kling_secret_key = os.getenv("KLING_SECRET_KEY", "")
|
||||||
|
kling_adapter = None
|
||||||
|
if kling_access_key and kling_secret_key:
|
||||||
|
kling_adapter = KlingAdapter(access_key=kling_access_key, secret_key=kling_secret_key)
|
||||||
|
logger.info("Kling adapter initialized")
|
||||||
|
else:
|
||||||
|
logger.warning("KLING_ACCESS_KEY / KLING_SECRET_KEY not set — video generation disabled")
|
||||||
|
|
||||||
|
generation_service = GenerationService(dao, gemini, s3_adapter, bot, kling_adapter)
|
||||||
album_service = AlbumService(dao)
|
album_service = AlbumService(dao)
|
||||||
|
|
||||||
# Dispatcher
|
# Dispatcher
|
||||||
@@ -120,17 +132,6 @@ assets_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_
|
|||||||
gen_router.message.middleware(AlbumMiddleware(latency=0.8))
|
gen_router.message.middleware(AlbumMiddleware(latency=0.8))
|
||||||
|
|
||||||
|
|
||||||
async def start_scheduler(service: GenerationService):
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
logger.info("Running scheduler for stacked generation killing")
|
|
||||||
await service.cleanup_stale_generations()
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Scheduler error: {e}")
|
|
||||||
await asyncio.sleep(600) # Check every 10 minutes
|
|
||||||
|
|
||||||
# --- LIFESPAN (Запуск FastAPI + Bot) ---
|
# --- LIFESPAN (Запуск FastAPI + Bot) ---
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
@@ -149,6 +150,7 @@ async def lifespan(app: FastAPI):
|
|||||||
app.state.gemini_client = gemini
|
app.state.gemini_client = gemini
|
||||||
app.state.bot = bot
|
app.state.bot = bot
|
||||||
app.state.s3_adapter = s3_adapter
|
app.state.s3_adapter = s3_adapter
|
||||||
|
app.state.kling_adapter = kling_adapter
|
||||||
app.state.album_service = album_service
|
app.state.album_service = album_service
|
||||||
app.state.users_repo = users_repo # Добавляем репозиторий в state
|
app.state.users_repo = users_repo # Добавляем репозиторий в state
|
||||||
|
|
||||||
@@ -162,28 +164,17 @@ async def lifespan(app: FastAPI):
|
|||||||
# )
|
# )
|
||||||
# print("🤖 Bot polling started")
|
# print("🤖 Bot polling started")
|
||||||
|
|
||||||
# 3. ЗАПУСК ШЕДУЛЕРА
|
|
||||||
scheduler_task = asyncio.create_task(start_scheduler(generation_service))
|
|
||||||
print("⏰ Scheduler started")
|
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# --- SHUTDOWN ---
|
# --- SHUTDOWN ---
|
||||||
print("🛑 Shutting down...")
|
print("🛑 Shutting down...")
|
||||||
|
|
||||||
# 4. Остановка шедулера
|
|
||||||
scheduler_task.cancel()
|
|
||||||
try:
|
|
||||||
await scheduler_task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
print("⏰ Scheduler stopped")
|
|
||||||
|
|
||||||
# 3. Остановка бота
|
# 3. Остановка бота
|
||||||
# polling_task.cancel()
|
polling_task.cancel()
|
||||||
# try:
|
try:
|
||||||
# await polling_task
|
await polling_task
|
||||||
# except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# print("🤖 Bot polling stopped")
|
print("🤖 Bot polling stopped")
|
||||||
|
|
||||||
# 4. Отключение БД
|
# 4. Отключение БД
|
||||||
# Обычно Motor закрывать не обязательно при выходе, но хорошим тоном считается
|
# Обычно Motor закрывать не обязательно при выходе, но хорошим тоном считается
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
@@ -3,6 +3,7 @@ from fastapi import Request, Depends
|
|||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
|
|
||||||
from adapters.google_adapter import GoogleAdapter
|
from adapters.google_adapter import GoogleAdapter
|
||||||
|
from adapters.kling_adapter import KlingAdapter
|
||||||
from api.service.generation_service import GenerationService
|
from api.service.generation_service import GenerationService
|
||||||
from repos.dao import DAO
|
from repos.dao import DAO
|
||||||
|
|
||||||
@@ -36,14 +37,18 @@ def get_dao(
|
|||||||
# так что DAO создастся один раз за запрос.
|
# так что DAO создастся один раз за запрос.
|
||||||
return DAO(mongo_client, s3_adapter)
|
return DAO(mongo_client, s3_adapter)
|
||||||
|
|
||||||
|
def get_kling_adapter(request: Request) -> Optional[KlingAdapter]:
|
||||||
|
return request.app.state.kling_adapter
|
||||||
|
|
||||||
# Провайдер сервиса (собирается из DAO и Gemini)
|
# Провайдер сервиса (собирается из DAO и Gemini)
|
||||||
def get_generation_service(
|
def get_generation_service(
|
||||||
dao: DAO = Depends(get_dao),
|
dao: DAO = Depends(get_dao),
|
||||||
gemini: GoogleAdapter = Depends(get_gemini_client),
|
gemini: GoogleAdapter = Depends(get_gemini_client),
|
||||||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
||||||
bot: Bot = Depends(get_bot_client),
|
bot: Bot = Depends(get_bot_client),
|
||||||
|
kling_adapter: Optional[KlingAdapter] = Depends(get_kling_adapter),
|
||||||
) -> GenerationService:
|
) -> GenerationService:
|
||||||
return GenerationService(dao, gemini, s3_adapter, bot)
|
return GenerationService(dao, gemini, s3_adapter, bot, kling_adapter=kling_adapter)
|
||||||
|
|
||||||
from fastapi import Header
|
from fastapi import Header
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -9,7 +9,7 @@ from pymongo import MongoClient
|
|||||||
from starlette import status
|
from starlette import status
|
||||||
from starlette.exceptions import HTTPException
|
from starlette.exceptions import HTTPException
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
from starlette.responses import Response, JSONResponse, StreamingResponse
|
from starlette.responses import Response, JSONResponse
|
||||||
|
|
||||||
from adapters.s3_adapter import S3Adapter
|
from adapters.s3_adapter import S3Adapter
|
||||||
from api.models.AssetDTO import AssetsResponse, AssetResponse
|
from api.models.AssetDTO import AssetsResponse, AssetResponse
|
||||||
@@ -33,46 +33,27 @@ async def get_asset(
|
|||||||
asset_id: str,
|
asset_id: str,
|
||||||
request: Request,
|
request: Request,
|
||||||
thumbnail: bool = False,
|
thumbnail: bool = False,
|
||||||
dao: DAO = Depends(get_dao),
|
dao: DAO = Depends(get_dao)
|
||||||
s3_adapter: S3Adapter = Depends(get_s3_adapter),
|
|
||||||
) -> Response:
|
) -> Response:
|
||||||
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
|
logger.debug(f"get_asset called for ID: {asset_id}, thumbnail={thumbnail}")
|
||||||
# Загружаем только метаданные (без data/thumbnail bytes)
|
asset = await dao.assets.get_asset(asset_id)
|
||||||
asset = await dao.assets.get_asset(asset_id, with_data=False)
|
# 2. Проверка на существование
|
||||||
if not asset:
|
if not asset:
|
||||||
raise HTTPException(status_code=404, detail="Asset not found")
|
raise HTTPException(status_code=404, detail="Asset not found")
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
|
# Кэшировать на 1 год (31536000 сек)
|
||||||
"Cache-Control": "public, max-age=31536000, immutable"
|
"Cache-Control": "public, max-age=31536000, immutable"
|
||||||
}
|
}
|
||||||
|
|
||||||
# Thumbnail: маленький, можно грузить в RAM
|
content = asset.data
|
||||||
if thumbnail:
|
media_type = "image/png" # Default, or detect
|
||||||
if asset.minio_thumbnail_object_name and s3_adapter:
|
|
||||||
thumb_bytes = await s3_adapter.get_file(asset.minio_thumbnail_object_name)
|
|
||||||
if thumb_bytes:
|
|
||||||
return Response(content=thumb_bytes, media_type="image/jpeg", headers=headers)
|
|
||||||
# Fallback: thumbnail in DB
|
|
||||||
if asset.thumbnail:
|
|
||||||
return Response(content=asset.thumbnail, media_type="image/jpeg", headers=headers)
|
|
||||||
# No thumbnail available — fall through to main content
|
|
||||||
|
|
||||||
# Main content: стримим из S3 без загрузки в RAM
|
if thumbnail and asset.thumbnail:
|
||||||
if asset.minio_object_name and s3_adapter:
|
content = asset.thumbnail
|
||||||
content_type = "image/png"
|
media_type = "image/jpeg"
|
||||||
# if asset.content_type == AssetContentType.VIDEO:
|
|
||||||
# content_type = "video/mp4"
|
|
||||||
return StreamingResponse(
|
|
||||||
s3_adapter.stream_file(asset.minio_object_name),
|
|
||||||
media_type=content_type,
|
|
||||||
headers=headers,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fallback: data stored in DB (legacy)
|
return Response(content=content, media_type=media_type, headers=headers)
|
||||||
if asset.data:
|
|
||||||
return Response(content=asset.data, media_type="image/png", headers=headers)
|
|
||||||
|
|
||||||
raise HTTPException(status_code=404, detail="Asset data not found")
|
|
||||||
|
|
||||||
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
|
@router.delete("/orphans", dependencies=[Depends(get_current_user)])
|
||||||
async def delete_orphan_assets_from_minio(
|
async def delete_orphan_assets_from_minio(
|
||||||
|
|||||||
@@ -8,7 +8,8 @@ from api import service
|
|||||||
from api.dependency import get_generation_service, get_project_id, get_dao
|
from api.dependency import get_generation_service, get_project_id, get_dao
|
||||||
from repos.dao import DAO
|
from repos.dao import DAO
|
||||||
|
|
||||||
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest, GenerationGroupResponse
|
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest
|
||||||
|
from api.models.VideoGenerationRequest import VideoGenerationRequest
|
||||||
from api.service.generation_service import GenerationService
|
from api.service.generation_service import GenerationService
|
||||||
from models.Generation import Generation
|
from models.Generation import Generation
|
||||||
|
|
||||||
@@ -68,12 +69,12 @@ async def get_generations(character_id: Optional[str] = None, limit: int = 10, o
|
|||||||
return await generation_service.get_generations(character_id, limit=limit, offset=offset, user_id=user_id_filter, project_id=project_id)
|
return await generation_service.get_generations(character_id, limit=limit, offset=offset, user_id=user_id_filter, project_id=project_id)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/_run", response_model=GenerationGroupResponse)
|
@router.post("/_run", response_model=GenerationResponse)
|
||||||
async def post_generation(generation: GenerationRequest, request: Request,
|
async def post_generation(generation: GenerationRequest, request: Request,
|
||||||
generation_service: GenerationService = Depends(get_generation_service),
|
generation_service: GenerationService = Depends(get_generation_service),
|
||||||
current_user: dict = Depends(get_current_user),
|
current_user: dict = Depends(get_current_user),
|
||||||
project_id: Optional[str] = Depends(get_project_id),
|
project_id: Optional[str] = Depends(get_project_id),
|
||||||
dao: DAO = Depends(get_dao)) -> GenerationGroupResponse:
|
dao: DAO = Depends(get_dao)) -> GenerationResponse:
|
||||||
logger.info(f"post_generation (run) called. LinkedCharId: {generation.linked_character_id}, PromptLength: {len(generation.prompt)}")
|
logger.info(f"post_generation (run) called. LinkedCharId: {generation.linked_character_id}, PromptLength: {len(generation.prompt)}")
|
||||||
|
|
||||||
if project_id:
|
if project_id:
|
||||||
@@ -85,6 +86,16 @@ async def post_generation(generation: GenerationRequest, request: Request,
|
|||||||
return await generation_service.create_generation_task(generation, user_id=str(current_user.get("_id")))
|
return await generation_service.create_generation_task(generation, user_id=str(current_user.get("_id")))
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{generation_id}", response_model=GenerationResponse)
|
||||||
|
async def get_generation(generation_id: str,
|
||||||
|
generation_service: GenerationService = Depends(get_generation_service),
|
||||||
|
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
|
||||||
|
logger.debug(f"get_generation called for ID: {generation_id}")
|
||||||
|
gen = await generation_service.get_generation(generation_id)
|
||||||
|
if gen and gen.created_by != str(current_user["_id"]):
|
||||||
|
raise HTTPException(status_code=403, detail="Access denied")
|
||||||
|
return gen
|
||||||
|
|
||||||
|
|
||||||
@router.get("/running")
|
@router.get("/running")
|
||||||
async def get_running_generations(request: Request,
|
async def get_running_generations(request: Request,
|
||||||
@@ -103,27 +114,25 @@ async def get_running_generations(request: Request,
|
|||||||
return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id)
|
return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/group/{group_id}", response_model=GenerationGroupResponse)
|
@router.post("/video/_run", response_model=GenerationResponse)
|
||||||
async def get_generation_group(group_id: str,
|
async def post_video_generation(
|
||||||
|
video_request: VideoGenerationRequest,
|
||||||
|
request: Request,
|
||||||
generation_service: GenerationService = Depends(get_generation_service),
|
generation_service: GenerationService = Depends(get_generation_service),
|
||||||
current_user: dict = Depends(get_current_user)):
|
current_user: dict = Depends(get_current_user),
|
||||||
logger.info(f"get_generation_group called for group_id: {group_id}")
|
project_id: Optional[str] = Depends(get_project_id),
|
||||||
generations = await generation_service.dao.generations.get_generations_by_group(group_id)
|
dao: DAO = Depends(get_dao),
|
||||||
gen_responses = [GenerationResponse(**gen.model_dump()) for gen in generations]
|
) -> GenerationResponse:
|
||||||
return GenerationGroupResponse(generation_group_id=group_id, generations=gen_responses)
|
"""Start image-to-video generation using Kling AI."""
|
||||||
|
logger.info(f"post_video_generation called. AssetId: {video_request.image_asset_id}, Duration: {video_request.duration}s, Mode: {video_request.mode}")
|
||||||
|
|
||||||
@router.get("/{generation_id}", response_model=GenerationResponse)
|
|
||||||
async def get_generation(generation_id: str,
|
|
||||||
generation_service: GenerationService = Depends(get_generation_service),
|
|
||||||
current_user: dict = Depends(get_current_user)) -> GenerationResponse:
|
|
||||||
logger.debug(f"get_generation called for ID: {generation_id}")
|
|
||||||
gen = await generation_service.get_generation(generation_id)
|
|
||||||
if gen and gen.created_by != str(current_user["_id"]):
|
|
||||||
raise HTTPException(status_code=403, detail="Access denied")
|
|
||||||
return gen
|
|
||||||
|
|
||||||
|
if project_id:
|
||||||
|
project = await dao.projects.get_project(project_id)
|
||||||
|
if not project or str(current_user["_id"]) not in project.members:
|
||||||
|
raise HTTPException(status_code=403, detail="Project access denied")
|
||||||
|
video_request.project_id = project_id
|
||||||
|
|
||||||
|
return await generation_service.create_video_generation_task(video_request, user_id=str(current_user.get("_id")))
|
||||||
|
|
||||||
|
|
||||||
@router.post("/import", response_model=GenerationResponse)
|
@router.post("/import", response_model=GenerationResponse)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
from datetime import datetime, UTC
|
from datetime import datetime, UTC
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from models.Asset import Asset
|
from models.Asset import Asset
|
||||||
from models.Generation import GenerationStatus
|
from models.Generation import GenerationStatus
|
||||||
@@ -17,7 +17,6 @@ class GenerationRequest(BaseModel):
|
|||||||
use_profile_image: bool = True
|
use_profile_image: bool = True
|
||||||
assets_list: List[str]
|
assets_list: List[str]
|
||||||
project_id: Optional[str] = None
|
project_id: Optional[str] = None
|
||||||
count: int = Field(default=1, ge=1, le=10)
|
|
||||||
|
|
||||||
|
|
||||||
class GenerationsResponse(BaseModel):
|
class GenerationsResponse(BaseModel):
|
||||||
@@ -28,6 +27,7 @@ class GenerationsResponse(BaseModel):
|
|||||||
class GenerationResponse(BaseModel):
|
class GenerationResponse(BaseModel):
|
||||||
id: str
|
id: str
|
||||||
status: GenerationStatus
|
status: GenerationStatus
|
||||||
|
gen_type: GenType = GenType.IMAGE
|
||||||
failed_reason: Optional[str] = None
|
failed_reason: Optional[str] = None
|
||||||
|
|
||||||
linked_character_id: Optional[str] = None
|
linked_character_id: Optional[str] = None
|
||||||
@@ -46,15 +46,14 @@ class GenerationResponse(BaseModel):
|
|||||||
progress: int = 0
|
progress: int = 0
|
||||||
cost: Optional[float] = None
|
cost: Optional[float] = None
|
||||||
created_by: Optional[str] = None
|
created_by: Optional[str] = None
|
||||||
generation_group_id: Optional[str] = None
|
# Video-specific
|
||||||
|
kling_task_id: Optional[str] = None
|
||||||
|
video_duration: Optional[int] = None
|
||||||
|
video_mode: Optional[str] = None
|
||||||
created_at: datetime = datetime.now(UTC)
|
created_at: datetime = datetime.now(UTC)
|
||||||
updated_at: datetime = datetime.now(UTC)
|
updated_at: datetime = datetime.now(UTC)
|
||||||
|
|
||||||
|
|
||||||
class GenerationGroupResponse(BaseModel):
|
|
||||||
generation_group_id: str
|
|
||||||
generations: List[GenerationResponse]
|
|
||||||
|
|
||||||
|
|
||||||
class PromptRequest(BaseModel):
|
class PromptRequest(BaseModel):
|
||||||
prompt: str
|
prompt: str
|
||||||
|
|||||||
16
api/models/VideoGenerationRequest.py
Normal file
16
api/models/VideoGenerationRequest.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class VideoGenerationRequest(BaseModel):
|
||||||
|
prompt: str = ""
|
||||||
|
negative_prompt: Optional[str] = ""
|
||||||
|
image_asset_id: str # ID ассета-картинки для source image
|
||||||
|
duration: int = 5 # 5 or 10 seconds
|
||||||
|
mode: str = "std" # "std" or "pro"
|
||||||
|
model_name: str = "kling-v2-1"
|
||||||
|
cfg_scale: float = 0.5
|
||||||
|
aspect_ratio: str = "16:9"
|
||||||
|
linked_character_id: Optional[str] = None
|
||||||
|
project_id: Optional[str] = None
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -5,14 +5,15 @@ import base64
|
|||||||
from datetime import datetime, UTC
|
from datetime import datetime, UTC
|
||||||
from typing import List, Optional, Tuple, Any, Dict
|
from typing import List, Optional, Tuple, Any, Dict
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from uuid import uuid4
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from aiogram import Bot
|
from aiogram import Bot
|
||||||
from aiogram.types import BufferedInputFile
|
from aiogram.types import BufferedInputFile
|
||||||
from adapters.Exception import GoogleGenerationException
|
from adapters.Exception import GoogleGenerationException
|
||||||
from adapters.google_adapter import GoogleAdapter
|
from adapters.google_adapter import GoogleAdapter
|
||||||
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse, GenerationGroupResponse
|
from adapters.kling_adapter import KlingAdapter, KlingApiException
|
||||||
|
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse
|
||||||
|
from api.models.VideoGenerationRequest import VideoGenerationRequest
|
||||||
# Импортируйте ваши модели DAO, Asset, Generation корректно
|
# Импортируйте ваши модели DAO, Asset, Generation корректно
|
||||||
from models.Asset import Asset, AssetType, AssetContentType
|
from models.Asset import Asset, AssetType, AssetContentType
|
||||||
from models.Generation import Generation, GenerationStatus
|
from models.Generation import Generation, GenerationStatus
|
||||||
@@ -22,9 +23,6 @@ from adapters.s3_adapter import S3Adapter
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Limit concurrent generations to 4
|
|
||||||
generation_semaphore = asyncio.Semaphore(4)
|
|
||||||
|
|
||||||
|
|
||||||
# --- Вспомогательная функция генерации ---
|
# --- Вспомогательная функция генерации ---
|
||||||
async def generate_image_task(
|
async def generate_image_task(
|
||||||
@@ -54,27 +52,26 @@ async def generate_image_task(
|
|||||||
logger.info(f"generate_image_task completed, received {len(generated_images_io) if generated_images_io else 0} images")
|
logger.info(f"generate_image_task completed, received {len(generated_images_io) if generated_images_io else 0} images")
|
||||||
except GoogleGenerationException as e:
|
except GoogleGenerationException as e:
|
||||||
raise e
|
raise e
|
||||||
finally:
|
|
||||||
# Освобождаем входные данные — они больше не нужны
|
|
||||||
del media_group_bytes
|
|
||||||
|
|
||||||
images_bytes = []
|
images_bytes = []
|
||||||
if generated_images_io:
|
if generated_images_io:
|
||||||
for img_io in generated_images_io:
|
for img_io in generated_images_io:
|
||||||
|
# Читаем байты из BytesIO
|
||||||
img_io.seek(0)
|
img_io.seek(0)
|
||||||
images_bytes.append(img_io.read())
|
content = img_io.read()
|
||||||
|
images_bytes.append(content)
|
||||||
|
|
||||||
|
# Закрываем поток
|
||||||
img_io.close()
|
img_io.close()
|
||||||
# Освобождаем список BytesIO сразу
|
|
||||||
del generated_images_io
|
|
||||||
|
|
||||||
return images_bytes, metrics
|
return images_bytes, metrics
|
||||||
|
|
||||||
class GenerationService:
|
class GenerationService:
|
||||||
def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None):
|
def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None, kling_adapter: Optional[KlingAdapter] = None):
|
||||||
self.dao = dao
|
self.dao = dao
|
||||||
self.gemini = gemini
|
self.gemini = gemini
|
||||||
self.s3_adapter = s3_adapter
|
self.s3_adapter = s3_adapter
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
|
self.kling_adapter = kling_adapter
|
||||||
|
|
||||||
|
|
||||||
async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str:
|
async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str:
|
||||||
@@ -117,37 +114,21 @@ class GenerationService:
|
|||||||
async def get_running_generations(self, user_id: Optional[str] = None, project_id: Optional[str] = None) -> List[Generation]:
|
async def get_running_generations(self, user_id: Optional[str] = None, project_id: Optional[str] = None) -> List[Generation]:
|
||||||
return await self.dao.generations.get_generations(status=GenerationStatus.RUNNING, created_by=user_id, project_id=project_id)
|
return await self.dao.generations.get_generations(status=GenerationStatus.RUNNING, created_by=user_id, project_id=project_id)
|
||||||
|
|
||||||
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationGroupResponse:
|
async def create_generation_task(self, generation_request: GenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
|
||||||
count = generation_request.count
|
|
||||||
|
|
||||||
if generation_group_id is None:
|
|
||||||
generation_group_id = str(uuid4())
|
|
||||||
|
|
||||||
results = []
|
|
||||||
for _ in range(count):
|
|
||||||
gen_response = await self._create_single_generation(generation_request, user_id, generation_group_id)
|
|
||||||
results.append(gen_response)
|
|
||||||
return GenerationGroupResponse(generation_group_id=generation_group_id, generations=results)
|
|
||||||
|
|
||||||
async def _create_single_generation(self, generation_request: GenerationRequest, user_id: Optional[str] = None, generation_group_id: Optional[str] = None) -> GenerationResponse:
|
|
||||||
gen_id = None
|
gen_id = None
|
||||||
generation_model = None
|
generation_model = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
generation_model = Generation(**generation_request.model_dump(exclude={'count'}))
|
generation_model = Generation(**generation_request.model_dump())
|
||||||
if user_id:
|
if user_id:
|
||||||
generation_model.created_by = user_id
|
generation_model.created_by = user_id
|
||||||
if generation_group_id:
|
|
||||||
generation_model.generation_group_id = generation_group_id
|
|
||||||
|
|
||||||
gen_id = await self.dao.generations.create_generation(generation_model)
|
gen_id = await self.dao.generations.create_generation(generation_model)
|
||||||
generation_model.id = gen_id
|
generation_model.id = gen_id
|
||||||
|
|
||||||
async def runner(gen):
|
async def runner(gen):
|
||||||
logger.info(f"Generation {gen.id} entered queue (waiting for slot)...")
|
|
||||||
try:
|
|
||||||
async with generation_semaphore:
|
|
||||||
logger.info(f"Starting background generation task for ID: {gen.id}")
|
logger.info(f"Starting background generation task for ID: {gen.id}")
|
||||||
|
try:
|
||||||
await self.create_generation(gen)
|
await self.create_generation(gen)
|
||||||
logger.info(f"Background generation task finished for ID: {gen.id}")
|
logger.info(f"Background generation task finished for ID: {gen.id}")
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -450,6 +431,168 @@ class GenerationService:
|
|||||||
|
|
||||||
return generation
|
return generation
|
||||||
|
|
||||||
|
# === VIDEO GENERATION (Kling) ===
|
||||||
|
|
||||||
|
async def create_video_generation_task(self, request: VideoGenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
|
||||||
|
"""Create a video generation task (async, returns immediately)."""
|
||||||
|
if not self.kling_adapter:
|
||||||
|
raise Exception("Kling adapter is not configured")
|
||||||
|
|
||||||
|
generation = Generation(
|
||||||
|
status=GenerationStatus.RUNNING,
|
||||||
|
gen_type=GenType.VIDEO,
|
||||||
|
linked_character_id=request.linked_character_id,
|
||||||
|
aspect_ratio=AspectRatios.SIXTEENNINE, # default for video
|
||||||
|
quality=Quality.ONEK,
|
||||||
|
prompt=request.prompt,
|
||||||
|
assets_list=[request.image_asset_id],
|
||||||
|
video_duration=request.duration,
|
||||||
|
video_mode=request.mode,
|
||||||
|
project_id=request.project_id,
|
||||||
|
)
|
||||||
|
if user_id:
|
||||||
|
generation.created_by = user_id
|
||||||
|
|
||||||
|
gen_id = await self.dao.generations.create_generation(generation)
|
||||||
|
generation.id = gen_id
|
||||||
|
|
||||||
|
async def runner(gen, req):
|
||||||
|
logger.info(f"Starting background video generation task for ID: {gen.id}")
|
||||||
|
try:
|
||||||
|
await self.create_video_generation(gen, req)
|
||||||
|
logger.info(f"Background video generation task finished for ID: {gen.id}")
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
db_gen = await self.dao.generations.get_generation(gen.id)
|
||||||
|
if db_gen and db_gen.status != GenerationStatus.FAILED:
|
||||||
|
db_gen.status = GenerationStatus.FAILED
|
||||||
|
db_gen.updated_at = datetime.now(UTC)
|
||||||
|
await self.dao.generations.update_generation(db_gen)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to mark video generation as FAILED")
|
||||||
|
logger.exception("create_video_generation task failed")
|
||||||
|
|
||||||
|
asyncio.create_task(runner(generation, request))
|
||||||
|
return GenerationResponse(**generation.model_dump())
|
||||||
|
|
||||||
|
async def create_video_generation(self, generation: Generation, request: VideoGenerationRequest):
|
||||||
|
"""Background video generation: call Kling API, poll, download result, save asset."""
|
||||||
|
start_time = datetime.now()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. Get source image presigned URL
|
||||||
|
asset = await self.dao.assets.get_asset(request.image_asset_id)
|
||||||
|
if not asset:
|
||||||
|
raise Exception(f"Asset {request.image_asset_id} not found")
|
||||||
|
|
||||||
|
if not asset.minio_object_name:
|
||||||
|
raise Exception(f"Asset {request.image_asset_id} has no S3 object")
|
||||||
|
|
||||||
|
presigned_url = await self.s3_adapter.get_presigned_url(asset.minio_object_name, expiration=3600)
|
||||||
|
if not presigned_url:
|
||||||
|
raise Exception("Failed to generate presigned URL for source image")
|
||||||
|
|
||||||
|
logger.info(f"Video gen {generation.id}: got presigned URL for asset {request.image_asset_id}")
|
||||||
|
|
||||||
|
# 2. Create Kling task
|
||||||
|
task_data = await self.kling_adapter.create_video_task(
|
||||||
|
image_url=presigned_url,
|
||||||
|
prompt=request.prompt,
|
||||||
|
negative_prompt=request.negative_prompt or "",
|
||||||
|
model_name=request.model_name,
|
||||||
|
duration=request.duration,
|
||||||
|
mode=request.mode,
|
||||||
|
cfg_scale=request.cfg_scale,
|
||||||
|
aspect_ratio=request.aspect_ratio,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_id = task_data.get("task_id")
|
||||||
|
generation.kling_task_id = task_id
|
||||||
|
await self.dao.generations.update_generation(generation)
|
||||||
|
|
||||||
|
logger.info(f"Video gen {generation.id}: Kling task created, task_id={task_id}")
|
||||||
|
|
||||||
|
# 3. Poll for completion with progress updates
|
||||||
|
async def progress_callback(progress_pct: int):
|
||||||
|
generation.progress = progress_pct
|
||||||
|
generation.updated_at = datetime.now(UTC)
|
||||||
|
await self.dao.generations.update_generation(generation)
|
||||||
|
|
||||||
|
result = await self.kling_adapter.wait_for_completion(
|
||||||
|
task_id=task_id,
|
||||||
|
poll_interval=10,
|
||||||
|
timeout=600,
|
||||||
|
progress_callback=progress_callback,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. Extract video URL and download
|
||||||
|
works = result.get("task_result", {}).get("videos", [])
|
||||||
|
if not works:
|
||||||
|
raise Exception("No video in Kling result")
|
||||||
|
|
||||||
|
video_url = works[0].get("url")
|
||||||
|
video_duration = works[0].get("duration", request.duration)
|
||||||
|
if not video_url:
|
||||||
|
raise Exception("No video URL in Kling result")
|
||||||
|
|
||||||
|
logger.info(f"Video gen {generation.id}: downloading video from {video_url}")
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||||
|
video_response = await client.get(video_url)
|
||||||
|
video_response.raise_for_status()
|
||||||
|
video_bytes = video_response.content
|
||||||
|
|
||||||
|
logger.info(f"Video gen {generation.id}: downloaded {len(video_bytes)} bytes")
|
||||||
|
|
||||||
|
# 5. Upload to S3
|
||||||
|
filename = f"generated_video/{generation.linked_character_id or 'no_char'}/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{random.randint(1000, 9999)}.mp4"
|
||||||
|
await self.s3_adapter.upload_file(filename, video_bytes, content_type="video/mp4")
|
||||||
|
|
||||||
|
# 6. Create Asset
|
||||||
|
new_asset = Asset(
|
||||||
|
name=f"Video_{generation.linked_character_id or 'gen'}",
|
||||||
|
type=AssetType.GENERATED,
|
||||||
|
content_type=AssetContentType.VIDEO,
|
||||||
|
linked_char_id=generation.linked_character_id,
|
||||||
|
data=None,
|
||||||
|
minio_object_name=filename,
|
||||||
|
minio_bucket=self.s3_adapter.bucket_name,
|
||||||
|
thumbnail=None, # видео thumbnails можно добавить позже
|
||||||
|
created_by=generation.created_by,
|
||||||
|
project_id=generation.project_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
asset_id = await self.dao.assets.create_asset(new_asset)
|
||||||
|
new_asset.id = str(asset_id)
|
||||||
|
|
||||||
|
# 7. Finalize generation
|
||||||
|
end_time = datetime.now()
|
||||||
|
generation.result_list = [new_asset.id]
|
||||||
|
generation.result = new_asset.id
|
||||||
|
generation.status = GenerationStatus.DONE
|
||||||
|
generation.progress = 100
|
||||||
|
generation.video_duration = video_duration
|
||||||
|
generation.execution_time_seconds = (end_time - start_time).total_seconds()
|
||||||
|
generation.updated_at = datetime.now(UTC)
|
||||||
|
await self.dao.generations.update_generation(generation)
|
||||||
|
|
||||||
|
logger.info(f"Video generation {generation.id} completed. Asset: {new_asset.id}, Time: {generation.execution_time_seconds:.1f}s")
|
||||||
|
|
||||||
|
except KlingApiException as e:
|
||||||
|
logger.error(f"Kling API error for generation {generation.id}: {e}")
|
||||||
|
generation.status = GenerationStatus.FAILED
|
||||||
|
generation.failed_reason = str(e)
|
||||||
|
generation.updated_at = datetime.now(UTC)
|
||||||
|
await self.dao.generations.update_generation(generation)
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Video generation {generation.id} failed: {e}")
|
||||||
|
generation.status = GenerationStatus.FAILED
|
||||||
|
generation.failed_reason = str(e)
|
||||||
|
generation.updated_at = datetime.now(UTC)
|
||||||
|
await self.dao.generations.update_generation(generation)
|
||||||
|
raise
|
||||||
|
|
||||||
async def delete_generation(self, generation_id: str) -> bool:
|
async def delete_generation(self, generation_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Soft delete generation by marking it as deleted.
|
Soft delete generation by marking it as deleted.
|
||||||
@@ -466,14 +609,3 @@ class GenerationService:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error deleting generation {generation_id}: {e}")
|
logger.error(f"Error deleting generation {generation_id}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def cleanup_stale_generations(self):
|
|
||||||
"""
|
|
||||||
Cancels generations that have been running for more than 1 hour.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
count = await self.dao.generations.cancel_stale_generations(timeout_minutes=60)
|
|
||||||
if count > 0:
|
|
||||||
logger.info(f"Cleaned up {count} stale generations (timeout)")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error cleaning up stale generations: {e}")
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -7,6 +7,7 @@ from pydantic import BaseModel, computed_field, Field, model_validator
|
|||||||
|
|
||||||
class AssetContentType(str, Enum):
|
class AssetContentType(str, Enum):
|
||||||
IMAGE = 'image'
|
IMAGE = 'image'
|
||||||
|
VIDEO = 'video'
|
||||||
PROMPT = 'prompt'
|
PROMPT = 'prompt'
|
||||||
|
|
||||||
class AssetType(str, Enum):
|
class AssetType(str, Enum):
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ class GenerationStatus(str, Enum):
|
|||||||
class Generation(BaseModel):
|
class Generation(BaseModel):
|
||||||
id: Optional[str] = None
|
id: Optional[str] = None
|
||||||
status: GenerationStatus = GenerationStatus.RUNNING
|
status: GenerationStatus = GenerationStatus.RUNNING
|
||||||
|
gen_type: GenType = GenType.IMAGE
|
||||||
failed_reason: Optional[str] = None
|
failed_reason: Optional[str] = None
|
||||||
linked_character_id: Optional[str] = None
|
linked_character_id: Optional[str] = None
|
||||||
telegram_id: Optional[int] = None
|
telegram_id: Optional[int] = None
|
||||||
@@ -35,9 +36,12 @@ class Generation(BaseModel):
|
|||||||
output_token_usage: Optional[int] = None
|
output_token_usage: Optional[int] = None
|
||||||
is_deleted: bool = False
|
is_deleted: bool = False
|
||||||
album_id: Optional[str] = None
|
album_id: Optional[str] = None
|
||||||
generation_group_id: Optional[str] = None
|
|
||||||
created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId)
|
created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId)
|
||||||
project_id: Optional[str] = None
|
project_id: Optional[str] = None
|
||||||
|
# Video-specific fields
|
||||||
|
kling_task_id: Optional[str] = None
|
||||||
|
video_duration: Optional[int] = None # 5 or 10 seconds
|
||||||
|
video_mode: Optional[str] = None # "std" or "pro"
|
||||||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -34,10 +34,12 @@ class Quality(str, Enum):
|
|||||||
class GenType(str, Enum):
|
class GenType(str, Enum):
|
||||||
TEXT = 'Text'
|
TEXT = 'Text'
|
||||||
IMAGE = 'Image'
|
IMAGE = 'Image'
|
||||||
|
VIDEO = 'Video'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def value_type(self) -> str:
|
def value_type(self) -> str:
|
||||||
return {
|
return {
|
||||||
GenType.TEXT: 'Text',
|
GenType.TEXT: 'Text',
|
||||||
GenType.IMAGE: 'Image',
|
GenType.IMAGE: 'Image',
|
||||||
|
GenType.VIDEO: 'Video',
|
||||||
}[self]
|
}[self]
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,5 +1,4 @@
|
|||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
from datetime import datetime, timedelta, UTC
|
|
||||||
|
|
||||||
from PIL.ImageChops import offset
|
from PIL.ImageChops import offset
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
@@ -78,28 +77,3 @@ class GenerationRepo:
|
|||||||
|
|
||||||
async def update_generation(self, generation: Generation, ):
|
async def update_generation(self, generation: Generation, ):
|
||||||
res = await self.collection.update_one({"_id": ObjectId(generation.id)}, {"$set": generation.model_dump()})
|
res = await self.collection.update_one({"_id": ObjectId(generation.id)}, {"$set": generation.model_dump()})
|
||||||
|
|
||||||
async def get_generations_by_group(self, group_id: str) -> List[Generation]:
|
|
||||||
res = await self.collection.find({"generation_group_id": group_id, "is_deleted": False}).sort("created_at", 1).to_list(None)
|
|
||||||
generations: List[Generation] = []
|
|
||||||
for generation in res:
|
|
||||||
generation["id"] = str(generation.pop("_id"))
|
|
||||||
generations.append(Generation(**generation))
|
|
||||||
return generations
|
|
||||||
|
|
||||||
async def cancel_stale_generations(self, timeout_minutes: int = 60) -> int:
|
|
||||||
cutoff_time = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
|
|
||||||
res = await self.collection.update_many(
|
|
||||||
{
|
|
||||||
"status": GenerationStatus.RUNNING,
|
|
||||||
"created_at": {"$lt": cutoff_time}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"$set": {
|
|
||||||
"status": GenerationStatus.FAILED,
|
|
||||||
"failed_reason": "Timeout: Execution time limit exceeded",
|
|
||||||
"updated_at": datetime.now(UTC)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return res.modified_count
|
|
||||||
|
|||||||
@@ -51,3 +51,4 @@ python-jose[cryptography]==3.3.0
|
|||||||
python-multipart==0.0.22
|
python-multipart==0.0.22
|
||||||
email-validator
|
email-validator
|
||||||
prometheus-fastapi-instrumentator
|
prometheus-fastapi-instrumentator
|
||||||
|
PyJWT
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,52 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import os
|
|
||||||
from datetime import datetime, timedelta, UTC
|
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
|
||||||
from models.Generation import Generation, GenerationStatus
|
|
||||||
from repos.generation_repo import GenerationRepo
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
# Mock configs if not present in env
|
|
||||||
MONGO_HOST = os.getenv("MONGO_HOST", "mongodb://localhost:27017")
|
|
||||||
DB_NAME = os.getenv("DB_NAME", "bot_db")
|
|
||||||
|
|
||||||
print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}")
|
|
||||||
|
|
||||||
async def test_scheduler():
|
|
||||||
client = AsyncIOMotorClient(MONGO_HOST)
|
|
||||||
repo = GenerationRepo(client, db_name=DB_NAME)
|
|
||||||
|
|
||||||
# 1. Create a "stale" generation (2 hours ago)
|
|
||||||
stale_gen = Generation(
|
|
||||||
prompt="stale test",
|
|
||||||
status=GenerationStatus.RUNNING,
|
|
||||||
created_at=datetime.now(UTC) - timedelta(minutes=120),
|
|
||||||
assets_list=[],
|
|
||||||
aspect_ratio="NINESIXTEEN",
|
|
||||||
quality="ONEK"
|
|
||||||
)
|
|
||||||
gen_id = await repo.create_generation(stale_gen)
|
|
||||||
print(f"Created stale generation: {gen_id}")
|
|
||||||
|
|
||||||
# 2. Run cleanup
|
|
||||||
print("Running cleanup...")
|
|
||||||
count = await repo.cancel_stale_generations(timeout_minutes=60)
|
|
||||||
print(f"Cleaned up {count} generations")
|
|
||||||
|
|
||||||
# 3. Verify status
|
|
||||||
updated_gen = await repo.get_generation(gen_id)
|
|
||||||
print(f"Generation status: {updated_gen.status}")
|
|
||||||
print(f"Failed reason: {updated_gen.failed_reason}")
|
|
||||||
|
|
||||||
if updated_gen.status == GenerationStatus.FAILED:
|
|
||||||
print("✅ SUCCESS: Generation marked as FAILED")
|
|
||||||
else:
|
|
||||||
print("❌ FAILURE: Generation status not updated")
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
await repo.collection.delete_one({"_id": updated_gen.id}) # Remove test data
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(test_scheduler())
|
|
||||||
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user