3 Commits
posts ... video

Author SHA1 Message Date
xds
32ff77e04b feat: Implement video generation functionality and integrate with Kling API. 2026-02-12 10:27:07 +03:00
xds
d1f67c773f 123 2026-02-12 00:25:08 +03:00
xds
c63b51ef75 123
er the commit message for your changes. Lines starting
2026-02-12 00:24:43 +03:00
58 changed files with 405 additions and 5 deletions

4
.env
View File

@@ -8,4 +8,6 @@ MINIO_ACCESS_KEY=admin
MINIO_SECRET_KEY=SuperSecretPassword123! MINIO_SECRET_KEY=SuperSecretPassword123!
MINIO_BUCKET=ai-char MINIO_BUCKET=ai-char
MODE=production MODE=production
EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW
KLING_ACCESS_KEY=AngRfYYeLhPQB3pmr9CpHfgHPCrmeeM4
KLING_SECRET_KEY=ndJfyayfQgbg4bMnE49yHnkACPChKMp4

2
.vscode/launch.json vendored
View File

@@ -7,7 +7,7 @@
"request": "launch", "request": "launch",
"module": "uvicorn", "module": "uvicorn",
"args": [ "args": [
"main:app", "aiws:app",
"--reload", "--reload",
"--port", "--port",
"8090", "8090",

Binary file not shown.

Binary file not shown.

165
adapters/kling_adapter.py Normal file
View File

@@ -0,0 +1,165 @@
import logging
import time
import asyncio
from typing import Optional, Dict, Any
import httpx
import jwt
logger = logging.getLogger(__name__)
KLING_API_BASE = "https://api.klingai.com"
class KlingApiException(Exception):
pass
class KlingAdapter:
def __init__(self, access_key: str, secret_key: str):
if not access_key or not secret_key:
raise ValueError("Kling API credentials are missing")
self.access_key = access_key
self.secret_key = secret_key
def _generate_token(self) -> str:
"""Generate a JWT token for Kling API authentication."""
now = int(time.time())
payload = {
"iss": self.access_key,
"exp": now + 1800, # 30 minutes
"iat": now - 5, # небольшой запас назад
"nbf": now - 5,
}
return jwt.encode(payload, self.secret_key, algorithm="HS256",
headers={"typ": "JWT", "alg": "HS256"})
def _headers(self) -> dict:
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self._generate_token()}"
}
async def create_video_task(
self,
image_url: str,
prompt: str = "",
negative_prompt: str = "",
model_name: str = "kling-v2-6",
duration: int = 5,
mode: str = "std",
cfg_scale: float = 0.5,
aspect_ratio: str = "16:9",
callback_url: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create an image-to-video generation task.
Returns the full task data dict including task_id.
"""
body: Dict[str, Any] = {
"model_name": model_name,
"image": image_url,
"prompt": prompt,
"negative_prompt": negative_prompt,
"duration": str(duration),
"mode": mode,
"cfg_scale": cfg_scale,
"aspect_ratio": aspect_ratio,
}
if callback_url:
body["callback_url"] = callback_url
logger.info(f"Creating Kling video task. Model: {model_name}, Duration: {duration}s, Mode: {mode}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{KLING_API_BASE}/v1/videos/image2video",
headers=self._headers(),
json=body,
)
data = response.json()
logger.info(f"Kling create task response: code={data.get('code')}, message={data.get('message')}")
if response.status_code != 200 or data.get("code") != 0:
error_msg = data.get("message", "Unknown Kling API error")
raise KlingApiException(f"Failed to create video task: {error_msg} (code={data.get('code')})")
task_data = data.get("data", {})
task_id = task_data.get("task_id")
if not task_id:
raise KlingApiException("No task_id returned from Kling API")
logger.info(f"Kling video task created: task_id={task_id}")
return task_data
async def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""
Query the status of a video generation task.
Returns the full task data dict.
"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{KLING_API_BASE}/v1/videos/image2video/{task_id}",
headers=self._headers(),
)
data = response.json()
if response.status_code != 200 or data.get("code") != 0:
error_msg = data.get("message", "Unknown error")
raise KlingApiException(f"Failed to query task {task_id}: {error_msg}")
return data.get("data", {})
async def wait_for_completion(
self,
task_id: str,
poll_interval: int = 10,
timeout: int = 600,
progress_callback=None,
) -> Dict[str, Any]:
"""
Poll the task status until completion.
Args:
task_id: Kling task ID
poll_interval: seconds between polls
timeout: max seconds to wait
progress_callback: async callable(progress_pct: int) to report progress
Returns:
Final task data dict with video URL on success.
Raises:
KlingApiException on failure or timeout.
"""
start = time.time()
attempt = 0
while True:
elapsed = time.time() - start
if elapsed > timeout:
raise KlingApiException(f"Video generation timed out after {timeout}s for task {task_id}")
task_data = await self.get_task_status(task_id)
status = task_data.get("task_status")
logger.info(f"Kling task {task_id}: status={status}, elapsed={elapsed:.0f}s")
if status == "succeed":
logger.info(f"Kling task {task_id} completed successfully")
return task_data
if status == "failed":
fail_reason = task_data.get("task_status_msg", "Unknown failure")
raise KlingApiException(f"Video generation failed: {fail_reason}")
# Report progress estimate (linear approximation based on typical time)
if progress_callback:
# Estimate: typical gen is ~120s, cap at 90%
estimated_progress = min(int((elapsed / 120) * 90), 90)
attempt += 1
await progress_callback(estimated_progress)
await asyncio.sleep(poll_interval)

15
aiws.py
View File

@@ -18,6 +18,7 @@ from prometheus_fastapi_instrumentator import Instrumentator
# --- ИМПОРТЫ ПРОЕКТА --- # --- ИМПОРТЫ ПРОЕКТА ---
from adapters.google_adapter import GoogleAdapter from adapters.google_adapter import GoogleAdapter
from adapters.kling_adapter import KlingAdapter
from adapters.s3_adapter import S3Adapter from adapters.s3_adapter import S3Adapter
from api.service.generation_service import GenerationService from api.service.generation_service import GenerationService
from api.service.album_service import AlbumService from api.service.album_service import AlbumService
@@ -83,7 +84,18 @@ s3_adapter = S3Adapter(
dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота
gemini = GoogleAdapter(api_key=GEMINI_API_KEY) gemini = GoogleAdapter(api_key=GEMINI_API_KEY)
generation_service = GenerationService(dao, gemini, bot)
# Kling Adapter (optional, for video generation)
kling_access_key = os.getenv("KLING_ACCESS_KEY", "")
kling_secret_key = os.getenv("KLING_SECRET_KEY", "")
kling_adapter = None
if kling_access_key and kling_secret_key:
kling_adapter = KlingAdapter(access_key=kling_access_key, secret_key=kling_secret_key)
logger.info("Kling adapter initialized")
else:
logger.warning("KLING_ACCESS_KEY / KLING_SECRET_KEY not set — video generation disabled")
generation_service = GenerationService(dao, gemini, s3_adapter, bot, kling_adapter)
album_service = AlbumService(dao) album_service = AlbumService(dao)
# Dispatcher # Dispatcher
@@ -138,6 +150,7 @@ async def lifespan(app: FastAPI):
app.state.gemini_client = gemini app.state.gemini_client = gemini
app.state.bot = bot app.state.bot = bot
app.state.s3_adapter = s3_adapter app.state.s3_adapter = s3_adapter
app.state.kling_adapter = kling_adapter
app.state.album_service = album_service app.state.album_service = album_service
app.state.users_repo = users_repo # Добавляем репозиторий в state app.state.users_repo = users_repo # Добавляем репозиторий в state

View File

@@ -3,6 +3,7 @@ from fastapi import Request, Depends
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from adapters.google_adapter import GoogleAdapter from adapters.google_adapter import GoogleAdapter
from adapters.kling_adapter import KlingAdapter
from api.service.generation_service import GenerationService from api.service.generation_service import GenerationService
from repos.dao import DAO from repos.dao import DAO
@@ -36,14 +37,18 @@ def get_dao(
# так что DAO создастся один раз за запрос. # так что DAO создастся один раз за запрос.
return DAO(mongo_client, s3_adapter) return DAO(mongo_client, s3_adapter)
def get_kling_adapter(request: Request) -> Optional[KlingAdapter]:
return request.app.state.kling_adapter
# Провайдер сервиса (собирается из DAO и Gemini) # Провайдер сервиса (собирается из DAO и Gemini)
def get_generation_service( def get_generation_service(
dao: DAO = Depends(get_dao), dao: DAO = Depends(get_dao),
gemini: GoogleAdapter = Depends(get_gemini_client), gemini: GoogleAdapter = Depends(get_gemini_client),
s3_adapter: S3Adapter = Depends(get_s3_adapter), s3_adapter: S3Adapter = Depends(get_s3_adapter),
bot: Bot = Depends(get_bot_client), bot: Bot = Depends(get_bot_client),
kling_adapter: Optional[KlingAdapter] = Depends(get_kling_adapter),
) -> GenerationService: ) -> GenerationService:
return GenerationService(dao, gemini, s3_adapter, bot) return GenerationService(dao, gemini, s3_adapter, bot, kling_adapter=kling_adapter)
from fastapi import Header from fastapi import Header

View File

@@ -9,6 +9,7 @@ from api.dependency import get_generation_service, get_project_id, get_dao
from repos.dao import DAO from repos.dao import DAO
from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest
from api.models.VideoGenerationRequest import VideoGenerationRequest
from api.service.generation_service import GenerationService from api.service.generation_service import GenerationService
from models.Generation import Generation from models.Generation import Generation
@@ -113,6 +114,25 @@ async def get_running_generations(request: Request,
return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id) return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id)
@router.post("/video/_run", response_model=GenerationResponse)
async def post_video_generation(
video_request: VideoGenerationRequest,
request: Request,
generation_service: GenerationService = Depends(get_generation_service),
current_user: dict = Depends(get_current_user),
project_id: Optional[str] = Depends(get_project_id),
dao: DAO = Depends(get_dao),
) -> GenerationResponse:
"""Start image-to-video generation using Kling AI."""
logger.info(f"post_video_generation called. AssetId: {video_request.image_asset_id}, Duration: {video_request.duration}s, Mode: {video_request.mode}")
if project_id:
project = await dao.projects.get_project(project_id)
if not project or str(current_user["_id"]) not in project.members:
raise HTTPException(status_code=403, detail="Project access denied")
video_request.project_id = project_id
return await generation_service.create_video_generation_task(video_request, user_id=str(current_user.get("_id")))
@router.post("/import", response_model=GenerationResponse) @router.post("/import", response_model=GenerationResponse)

View File

@@ -27,6 +27,7 @@ class GenerationsResponse(BaseModel):
class GenerationResponse(BaseModel): class GenerationResponse(BaseModel):
id: str id: str
status: GenerationStatus status: GenerationStatus
gen_type: GenType = GenType.IMAGE
failed_reason: Optional[str] = None failed_reason: Optional[str] = None
linked_character_id: Optional[str] = None linked_character_id: Optional[str] = None
@@ -45,6 +46,10 @@ class GenerationResponse(BaseModel):
progress: int = 0 progress: int = 0
cost: Optional[float] = None cost: Optional[float] = None
created_by: Optional[str] = None created_by: Optional[str] = None
# Video-specific
kling_task_id: Optional[str] = None
video_duration: Optional[int] = None
video_mode: Optional[str] = None
created_at: datetime = datetime.now(UTC) created_at: datetime = datetime.now(UTC)
updated_at: datetime = datetime.now(UTC) updated_at: datetime = datetime.now(UTC)

View File

@@ -0,0 +1,16 @@
from typing import Optional
from pydantic import BaseModel
class VideoGenerationRequest(BaseModel):
prompt: str = ""
negative_prompt: Optional[str] = ""
image_asset_id: str # ID ассета-картинки для source image
duration: int = 5 # 5 or 10 seconds
mode: str = "std" # "std" or "pro"
model_name: str = "kling-v2-1"
cfg_scale: float = 0.5
aspect_ratio: str = "16:9"
linked_character_id: Optional[str] = None
project_id: Optional[str] = None

View File

@@ -11,7 +11,9 @@ from aiogram import Bot
from aiogram.types import BufferedInputFile from aiogram.types import BufferedInputFile
from adapters.Exception import GoogleGenerationException from adapters.Exception import GoogleGenerationException
from adapters.google_adapter import GoogleAdapter from adapters.google_adapter import GoogleAdapter
from adapters.kling_adapter import KlingAdapter, KlingApiException
from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse
from api.models.VideoGenerationRequest import VideoGenerationRequest
# Импортируйте ваши модели DAO, Asset, Generation корректно # Импортируйте ваши модели DAO, Asset, Generation корректно
from models.Asset import Asset, AssetType, AssetContentType from models.Asset import Asset, AssetType, AssetContentType
from models.Generation import Generation, GenerationStatus from models.Generation import Generation, GenerationStatus
@@ -64,11 +66,12 @@ async def generate_image_task(
return images_bytes, metrics return images_bytes, metrics
class GenerationService: class GenerationService:
def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None): def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None, kling_adapter: Optional[KlingAdapter] = None):
self.dao = dao self.dao = dao
self.gemini = gemini self.gemini = gemini
self.s3_adapter = s3_adapter self.s3_adapter = s3_adapter
self.bot = bot self.bot = bot
self.kling_adapter = kling_adapter
async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str: async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str:
@@ -428,6 +431,168 @@ class GenerationService:
return generation return generation
# === VIDEO GENERATION (Kling) ===
async def create_video_generation_task(self, request: VideoGenerationRequest, user_id: Optional[str] = None) -> GenerationResponse:
"""Create a video generation task (async, returns immediately)."""
if not self.kling_adapter:
raise Exception("Kling adapter is not configured")
generation = Generation(
status=GenerationStatus.RUNNING,
gen_type=GenType.VIDEO,
linked_character_id=request.linked_character_id,
aspect_ratio=AspectRatios.SIXTEENNINE, # default for video
quality=Quality.ONEK,
prompt=request.prompt,
assets_list=[request.image_asset_id],
video_duration=request.duration,
video_mode=request.mode,
project_id=request.project_id,
)
if user_id:
generation.created_by = user_id
gen_id = await self.dao.generations.create_generation(generation)
generation.id = gen_id
async def runner(gen, req):
logger.info(f"Starting background video generation task for ID: {gen.id}")
try:
await self.create_video_generation(gen, req)
logger.info(f"Background video generation task finished for ID: {gen.id}")
except Exception:
try:
db_gen = await self.dao.generations.get_generation(gen.id)
if db_gen and db_gen.status != GenerationStatus.FAILED:
db_gen.status = GenerationStatus.FAILED
db_gen.updated_at = datetime.now(UTC)
await self.dao.generations.update_generation(db_gen)
except Exception:
logger.exception("Failed to mark video generation as FAILED")
logger.exception("create_video_generation task failed")
asyncio.create_task(runner(generation, request))
return GenerationResponse(**generation.model_dump())
async def create_video_generation(self, generation: Generation, request: VideoGenerationRequest):
"""Background video generation: call Kling API, poll, download result, save asset."""
start_time = datetime.now()
try:
# 1. Get source image presigned URL
asset = await self.dao.assets.get_asset(request.image_asset_id)
if not asset:
raise Exception(f"Asset {request.image_asset_id} not found")
if not asset.minio_object_name:
raise Exception(f"Asset {request.image_asset_id} has no S3 object")
presigned_url = await self.s3_adapter.get_presigned_url(asset.minio_object_name, expiration=3600)
if not presigned_url:
raise Exception("Failed to generate presigned URL for source image")
logger.info(f"Video gen {generation.id}: got presigned URL for asset {request.image_asset_id}")
# 2. Create Kling task
task_data = await self.kling_adapter.create_video_task(
image_url=presigned_url,
prompt=request.prompt,
negative_prompt=request.negative_prompt or "",
model_name=request.model_name,
duration=request.duration,
mode=request.mode,
cfg_scale=request.cfg_scale,
aspect_ratio=request.aspect_ratio,
)
task_id = task_data.get("task_id")
generation.kling_task_id = task_id
await self.dao.generations.update_generation(generation)
logger.info(f"Video gen {generation.id}: Kling task created, task_id={task_id}")
# 3. Poll for completion with progress updates
async def progress_callback(progress_pct: int):
generation.progress = progress_pct
generation.updated_at = datetime.now(UTC)
await self.dao.generations.update_generation(generation)
result = await self.kling_adapter.wait_for_completion(
task_id=task_id,
poll_interval=10,
timeout=600,
progress_callback=progress_callback,
)
# 4. Extract video URL and download
works = result.get("task_result", {}).get("videos", [])
if not works:
raise Exception("No video in Kling result")
video_url = works[0].get("url")
video_duration = works[0].get("duration", request.duration)
if not video_url:
raise Exception("No video URL in Kling result")
logger.info(f"Video gen {generation.id}: downloading video from {video_url}")
async with httpx.AsyncClient(timeout=120.0) as client:
video_response = await client.get(video_url)
video_response.raise_for_status()
video_bytes = video_response.content
logger.info(f"Video gen {generation.id}: downloaded {len(video_bytes)} bytes")
# 5. Upload to S3
filename = f"generated_video/{generation.linked_character_id or 'no_char'}/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{random.randint(1000, 9999)}.mp4"
await self.s3_adapter.upload_file(filename, video_bytes, content_type="video/mp4")
# 6. Create Asset
new_asset = Asset(
name=f"Video_{generation.linked_character_id or 'gen'}",
type=AssetType.GENERATED,
content_type=AssetContentType.VIDEO,
linked_char_id=generation.linked_character_id,
data=None,
minio_object_name=filename,
minio_bucket=self.s3_adapter.bucket_name,
thumbnail=None, # видео thumbnails можно добавить позже
created_by=generation.created_by,
project_id=generation.project_id,
)
asset_id = await self.dao.assets.create_asset(new_asset)
new_asset.id = str(asset_id)
# 7. Finalize generation
end_time = datetime.now()
generation.result_list = [new_asset.id]
generation.result = new_asset.id
generation.status = GenerationStatus.DONE
generation.progress = 100
generation.video_duration = video_duration
generation.execution_time_seconds = (end_time - start_time).total_seconds()
generation.updated_at = datetime.now(UTC)
await self.dao.generations.update_generation(generation)
logger.info(f"Video generation {generation.id} completed. Asset: {new_asset.id}, Time: {generation.execution_time_seconds:.1f}s")
except KlingApiException as e:
logger.error(f"Kling API error for generation {generation.id}: {e}")
generation.status = GenerationStatus.FAILED
generation.failed_reason = str(e)
generation.updated_at = datetime.now(UTC)
await self.dao.generations.update_generation(generation)
raise
except Exception as e:
logger.error(f"Video generation {generation.id} failed: {e}")
generation.status = GenerationStatus.FAILED
generation.failed_reason = str(e)
generation.updated_at = datetime.now(UTC)
await self.dao.generations.update_generation(generation)
raise
async def delete_generation(self, generation_id: str) -> bool: async def delete_generation(self, generation_id: str) -> bool:
""" """
Soft delete generation by marking it as deleted. Soft delete generation by marking it as deleted.

View File

@@ -7,6 +7,7 @@ from pydantic import BaseModel, computed_field, Field, model_validator
class AssetContentType(str, Enum): class AssetContentType(str, Enum):
IMAGE = 'image' IMAGE = 'image'
VIDEO = 'video'
PROMPT = 'prompt' PROMPT = 'prompt'
class AssetType(str, Enum): class AssetType(str, Enum):

View File

@@ -16,6 +16,7 @@ class GenerationStatus(str, Enum):
class Generation(BaseModel): class Generation(BaseModel):
id: Optional[str] = None id: Optional[str] = None
status: GenerationStatus = GenerationStatus.RUNNING status: GenerationStatus = GenerationStatus.RUNNING
gen_type: GenType = GenType.IMAGE
failed_reason: Optional[str] = None failed_reason: Optional[str] = None
linked_character_id: Optional[str] = None linked_character_id: Optional[str] = None
telegram_id: Optional[int] = None telegram_id: Optional[int] = None
@@ -37,6 +38,10 @@ class Generation(BaseModel):
album_id: Optional[str] = None album_id: Optional[str] = None
created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId) created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId)
project_id: Optional[str] = None project_id: Optional[str] = None
# Video-specific fields
kling_task_id: Optional[str] = None
video_duration: Optional[int] = None # 5 or 10 seconds
video_mode: Optional[str] = None # "std" or "pro"
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))

View File

@@ -34,10 +34,12 @@ class Quality(str, Enum):
class GenType(str, Enum): class GenType(str, Enum):
TEXT = 'Text' TEXT = 'Text'
IMAGE = 'Image' IMAGE = 'Image'
VIDEO = 'Video'
@property @property
def value_type(self) -> str: def value_type(self) -> str:
return { return {
GenType.TEXT: 'Text', GenType.TEXT: 'Text',
GenType.IMAGE: 'Image', GenType.IMAGE: 'Image',
GenType.VIDEO: 'Video',
}[self] }[self]

View File

@@ -51,3 +51,4 @@ python-jose[cryptography]==3.3.0
python-multipart==0.0.22 python-multipart==0.0.22
email-validator email-validator
prometheus-fastapi-instrumentator prometheus-fastapi-instrumentator
PyJWT