diff --git a/.env b/.env index 11d172a..7c576ff 100644 --- a/.env +++ b/.env @@ -8,4 +8,6 @@ MINIO_ACCESS_KEY=admin MINIO_SECRET_KEY=SuperSecretPassword123! MINIO_BUCKET=ai-char MODE=production -EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW \ No newline at end of file +EXTERNAL_API_SECRET=Gt9TyQ8OAYhcELh2YCbKjdHLflZGufKHJZcG338MQDW +KLING_ACCESS_KEY=AngRfYYeLhPQB3pmr9CpHfgHPCrmeeM4 +KLING_SECRET_KEY=ndJfyayfQgbg4bMnE49yHnkACPChKMp4 \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 4af4fe0..953e81d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,7 @@ "request": "launch", "module": "uvicorn", "args": [ - "main:app", + "aiws:app", "--reload", "--port", "8090", diff --git a/adapters/kling_adapter.py b/adapters/kling_adapter.py new file mode 100644 index 0000000..adc423c --- /dev/null +++ b/adapters/kling_adapter.py @@ -0,0 +1,165 @@ +import logging +import time +import asyncio +from typing import Optional, Dict, Any + +import httpx +import jwt + +logger = logging.getLogger(__name__) + +KLING_API_BASE = "https://api.klingai.com" + + +class KlingApiException(Exception): + pass + + +class KlingAdapter: + def __init__(self, access_key: str, secret_key: str): + if not access_key or not secret_key: + raise ValueError("Kling API credentials are missing") + self.access_key = access_key + self.secret_key = secret_key + + def _generate_token(self) -> str: + """Generate a JWT token for Kling API authentication.""" + now = int(time.time()) + payload = { + "iss": self.access_key, + "exp": now + 1800, # 30 minutes + "iat": now - 5, # небольшой запас назад + "nbf": now - 5, + } + return jwt.encode(payload, self.secret_key, algorithm="HS256", + headers={"typ": "JWT", "alg": "HS256"}) + + def _headers(self) -> dict: + return { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._generate_token()}" + } + + async def create_video_task( + self, + image_url: str, + prompt: str = "", + negative_prompt: str = "", + model_name: str = "kling-v2-6", + duration: int = 5, + mode: str = "std", + cfg_scale: float = 0.5, + aspect_ratio: str = "16:9", + callback_url: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Create an image-to-video generation task. + Returns the full task data dict including task_id. + """ + body: Dict[str, Any] = { + "model_name": model_name, + "image": image_url, + "prompt": prompt, + "negative_prompt": negative_prompt, + "duration": str(duration), + "mode": mode, + "cfg_scale": cfg_scale, + "aspect_ratio": aspect_ratio, + } + if callback_url: + body["callback_url"] = callback_url + + logger.info(f"Creating Kling video task. Model: {model_name}, Duration: {duration}s, Mode: {mode}") + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{KLING_API_BASE}/v1/videos/image2video", + headers=self._headers(), + json=body, + ) + + data = response.json() + logger.info(f"Kling create task response: code={data.get('code')}, message={data.get('message')}") + + if response.status_code != 200 or data.get("code") != 0: + error_msg = data.get("message", "Unknown Kling API error") + raise KlingApiException(f"Failed to create video task: {error_msg} (code={data.get('code')})") + + task_data = data.get("data", {}) + task_id = task_data.get("task_id") + if not task_id: + raise KlingApiException("No task_id returned from Kling API") + + logger.info(f"Kling video task created: task_id={task_id}") + return task_data + + async def get_task_status(self, task_id: str) -> Dict[str, Any]: + """ + Query the status of a video generation task. + Returns the full task data dict. + """ + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + f"{KLING_API_BASE}/v1/videos/image2video/{task_id}", + headers=self._headers(), + ) + + data = response.json() + + if response.status_code != 200 or data.get("code") != 0: + error_msg = data.get("message", "Unknown error") + raise KlingApiException(f"Failed to query task {task_id}: {error_msg}") + + return data.get("data", {}) + + async def wait_for_completion( + self, + task_id: str, + poll_interval: int = 10, + timeout: int = 600, + progress_callback=None, + ) -> Dict[str, Any]: + """ + Poll the task status until completion. + + Args: + task_id: Kling task ID + poll_interval: seconds between polls + timeout: max seconds to wait + progress_callback: async callable(progress_pct: int) to report progress + + Returns: + Final task data dict with video URL on success. + + Raises: + KlingApiException on failure or timeout. + """ + start = time.time() + attempt = 0 + + while True: + elapsed = time.time() - start + if elapsed > timeout: + raise KlingApiException(f"Video generation timed out after {timeout}s for task {task_id}") + + task_data = await self.get_task_status(task_id) + status = task_data.get("task_status") + + logger.info(f"Kling task {task_id}: status={status}, elapsed={elapsed:.0f}s") + + if status == "succeed": + logger.info(f"Kling task {task_id} completed successfully") + return task_data + + if status == "failed": + fail_reason = task_data.get("task_status_msg", "Unknown failure") + raise KlingApiException(f"Video generation failed: {fail_reason}") + + # Report progress estimate (linear approximation based on typical time) + if progress_callback: + # Estimate: typical gen is ~120s, cap at 90% + estimated_progress = min(int((elapsed / 120) * 90), 90) + attempt += 1 + await progress_callback(estimated_progress) + + await asyncio.sleep(poll_interval) diff --git a/aiws.py b/aiws.py index 57a1c7e..e47b1f3 100644 --- a/aiws.py +++ b/aiws.py @@ -18,6 +18,7 @@ from prometheus_fastapi_instrumentator import Instrumentator # --- ИМПОРТЫ ПРОЕКТА --- from adapters.google_adapter import GoogleAdapter +from adapters.kling_adapter import KlingAdapter from adapters.s3_adapter import S3Adapter from api.service.generation_service import GenerationService from api.service.album_service import AlbumService @@ -83,7 +84,18 @@ s3_adapter = S3Adapter( dao = DAO(mongo_client, s3_adapter) # Главный DAO для бота gemini = GoogleAdapter(api_key=GEMINI_API_KEY) -generation_service = GenerationService(dao, gemini, bot) + +# Kling Adapter (optional, for video generation) +kling_access_key = os.getenv("KLING_ACCESS_KEY", "") +kling_secret_key = os.getenv("KLING_SECRET_KEY", "") +kling_adapter = None +if kling_access_key and kling_secret_key: + kling_adapter = KlingAdapter(access_key=kling_access_key, secret_key=kling_secret_key) + logger.info("Kling adapter initialized") +else: + logger.warning("KLING_ACCESS_KEY / KLING_SECRET_KEY not set — video generation disabled") + +generation_service = GenerationService(dao, gemini, s3_adapter, bot, kling_adapter) album_service = AlbumService(dao) # Dispatcher @@ -138,6 +150,7 @@ async def lifespan(app: FastAPI): app.state.gemini_client = gemini app.state.bot = bot app.state.s3_adapter = s3_adapter + app.state.kling_adapter = kling_adapter app.state.album_service = album_service app.state.users_repo = users_repo # Добавляем репозиторий в state diff --git a/api/dependency.py b/api/dependency.py index 7dc90eb..58b0403 100644 --- a/api/dependency.py +++ b/api/dependency.py @@ -3,6 +3,7 @@ from fastapi import Request, Depends from motor.motor_asyncio import AsyncIOMotorClient from adapters.google_adapter import GoogleAdapter +from adapters.kling_adapter import KlingAdapter from api.service.generation_service import GenerationService from repos.dao import DAO @@ -36,14 +37,18 @@ def get_dao( # так что DAO создастся один раз за запрос. return DAO(mongo_client, s3_adapter) +def get_kling_adapter(request: Request) -> Optional[KlingAdapter]: + return request.app.state.kling_adapter + # Провайдер сервиса (собирается из DAO и Gemini) def get_generation_service( dao: DAO = Depends(get_dao), gemini: GoogleAdapter = Depends(get_gemini_client), s3_adapter: S3Adapter = Depends(get_s3_adapter), bot: Bot = Depends(get_bot_client), + kling_adapter: Optional[KlingAdapter] = Depends(get_kling_adapter), ) -> GenerationService: - return GenerationService(dao, gemini, s3_adapter, bot) + return GenerationService(dao, gemini, s3_adapter, bot, kling_adapter=kling_adapter) from fastapi import Header diff --git a/api/endpoints/generation_router.py b/api/endpoints/generation_router.py index 85c4f61..5ec7f25 100644 --- a/api/endpoints/generation_router.py +++ b/api/endpoints/generation_router.py @@ -9,6 +9,7 @@ from api.dependency import get_generation_service, get_project_id, get_dao from repos.dao import DAO from api.models.GenerationRequest import GenerationResponse, GenerationRequest, GenerationsResponse, PromptResponse, PromptRequest +from api.models.VideoGenerationRequest import VideoGenerationRequest from api.service.generation_service import GenerationService from models.Generation import Generation @@ -113,6 +114,25 @@ async def get_running_generations(request: Request, return await generation_service.get_running_generations(user_id=user_id_filter, project_id=project_id) +@router.post("/video/_run", response_model=GenerationResponse) +async def post_video_generation( + video_request: VideoGenerationRequest, + request: Request, + generation_service: GenerationService = Depends(get_generation_service), + current_user: dict = Depends(get_current_user), + project_id: Optional[str] = Depends(get_project_id), + dao: DAO = Depends(get_dao), +) -> GenerationResponse: + """Start image-to-video generation using Kling AI.""" + logger.info(f"post_video_generation called. AssetId: {video_request.image_asset_id}, Duration: {video_request.duration}s, Mode: {video_request.mode}") + + if project_id: + project = await dao.projects.get_project(project_id) + if not project or str(current_user["_id"]) not in project.members: + raise HTTPException(status_code=403, detail="Project access denied") + video_request.project_id = project_id + + return await generation_service.create_video_generation_task(video_request, user_id=str(current_user.get("_id"))) @router.post("/import", response_model=GenerationResponse) diff --git a/api/models/GenerationRequest.py b/api/models/GenerationRequest.py index 40e9d18..55d6a8c 100644 --- a/api/models/GenerationRequest.py +++ b/api/models/GenerationRequest.py @@ -27,6 +27,7 @@ class GenerationsResponse(BaseModel): class GenerationResponse(BaseModel): id: str status: GenerationStatus + gen_type: GenType = GenType.IMAGE failed_reason: Optional[str] = None linked_character_id: Optional[str] = None @@ -45,6 +46,10 @@ class GenerationResponse(BaseModel): progress: int = 0 cost: Optional[float] = None created_by: Optional[str] = None + # Video-specific + kling_task_id: Optional[str] = None + video_duration: Optional[int] = None + video_mode: Optional[str] = None created_at: datetime = datetime.now(UTC) updated_at: datetime = datetime.now(UTC) diff --git a/api/models/VideoGenerationRequest.py b/api/models/VideoGenerationRequest.py new file mode 100644 index 0000000..ab2e94a --- /dev/null +++ b/api/models/VideoGenerationRequest.py @@ -0,0 +1,16 @@ +from typing import Optional + +from pydantic import BaseModel + + +class VideoGenerationRequest(BaseModel): + prompt: str = "" + negative_prompt: Optional[str] = "" + image_asset_id: str # ID ассета-картинки для source image + duration: int = 5 # 5 or 10 seconds + mode: str = "std" # "std" or "pro" + model_name: str = "kling-v2-1" + cfg_scale: float = 0.5 + aspect_ratio: str = "16:9" + linked_character_id: Optional[str] = None + project_id: Optional[str] = None diff --git a/api/service/generation_service.py b/api/service/generation_service.py index b433eac..ca86269 100644 --- a/api/service/generation_service.py +++ b/api/service/generation_service.py @@ -11,7 +11,9 @@ from aiogram import Bot from aiogram.types import BufferedInputFile from adapters.Exception import GoogleGenerationException from adapters.google_adapter import GoogleAdapter +from adapters.kling_adapter import KlingAdapter, KlingApiException from api.models.GenerationRequest import GenerationRequest, GenerationResponse, GenerationsResponse +from api.models.VideoGenerationRequest import VideoGenerationRequest # Импортируйте ваши модели DAO, Asset, Generation корректно from models.Asset import Asset, AssetType, AssetContentType from models.Generation import Generation, GenerationStatus @@ -64,11 +66,12 @@ async def generate_image_task( return images_bytes, metrics class GenerationService: - def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None): + def __init__(self, dao: DAO, gemini: GoogleAdapter, s3_adapter: S3Adapter, bot: Optional[Bot] = None, kling_adapter: Optional[KlingAdapter] = None): self.dao = dao self.gemini = gemini self.s3_adapter = s3_adapter self.bot = bot + self.kling_adapter = kling_adapter async def ask_prompt_assistant(self, prompt: str, assets: List[str] = None) -> str: @@ -428,6 +431,168 @@ class GenerationService: return generation + # === VIDEO GENERATION (Kling) === + + async def create_video_generation_task(self, request: VideoGenerationRequest, user_id: Optional[str] = None) -> GenerationResponse: + """Create a video generation task (async, returns immediately).""" + if not self.kling_adapter: + raise Exception("Kling adapter is not configured") + + generation = Generation( + status=GenerationStatus.RUNNING, + gen_type=GenType.VIDEO, + linked_character_id=request.linked_character_id, + aspect_ratio=AspectRatios.SIXTEENNINE, # default for video + quality=Quality.ONEK, + prompt=request.prompt, + assets_list=[request.image_asset_id], + video_duration=request.duration, + video_mode=request.mode, + project_id=request.project_id, + ) + if user_id: + generation.created_by = user_id + + gen_id = await self.dao.generations.create_generation(generation) + generation.id = gen_id + + async def runner(gen, req): + logger.info(f"Starting background video generation task for ID: {gen.id}") + try: + await self.create_video_generation(gen, req) + logger.info(f"Background video generation task finished for ID: {gen.id}") + except Exception: + try: + db_gen = await self.dao.generations.get_generation(gen.id) + if db_gen and db_gen.status != GenerationStatus.FAILED: + db_gen.status = GenerationStatus.FAILED + db_gen.updated_at = datetime.now(UTC) + await self.dao.generations.update_generation(db_gen) + except Exception: + logger.exception("Failed to mark video generation as FAILED") + logger.exception("create_video_generation task failed") + + asyncio.create_task(runner(generation, request)) + return GenerationResponse(**generation.model_dump()) + + async def create_video_generation(self, generation: Generation, request: VideoGenerationRequest): + """Background video generation: call Kling API, poll, download result, save asset.""" + start_time = datetime.now() + + try: + # 1. Get source image presigned URL + asset = await self.dao.assets.get_asset(request.image_asset_id) + if not asset: + raise Exception(f"Asset {request.image_asset_id} not found") + + if not asset.minio_object_name: + raise Exception(f"Asset {request.image_asset_id} has no S3 object") + + presigned_url = await self.s3_adapter.get_presigned_url(asset.minio_object_name, expiration=3600) + if not presigned_url: + raise Exception("Failed to generate presigned URL for source image") + + logger.info(f"Video gen {generation.id}: got presigned URL for asset {request.image_asset_id}") + + # 2. Create Kling task + task_data = await self.kling_adapter.create_video_task( + image_url=presigned_url, + prompt=request.prompt, + negative_prompt=request.negative_prompt or "", + model_name=request.model_name, + duration=request.duration, + mode=request.mode, + cfg_scale=request.cfg_scale, + aspect_ratio=request.aspect_ratio, + ) + + task_id = task_data.get("task_id") + generation.kling_task_id = task_id + await self.dao.generations.update_generation(generation) + + logger.info(f"Video gen {generation.id}: Kling task created, task_id={task_id}") + + # 3. Poll for completion with progress updates + async def progress_callback(progress_pct: int): + generation.progress = progress_pct + generation.updated_at = datetime.now(UTC) + await self.dao.generations.update_generation(generation) + + result = await self.kling_adapter.wait_for_completion( + task_id=task_id, + poll_interval=10, + timeout=600, + progress_callback=progress_callback, + ) + + # 4. Extract video URL and download + works = result.get("task_result", {}).get("videos", []) + if not works: + raise Exception("No video in Kling result") + + video_url = works[0].get("url") + video_duration = works[0].get("duration", request.duration) + if not video_url: + raise Exception("No video URL in Kling result") + + logger.info(f"Video gen {generation.id}: downloading video from {video_url}") + + async with httpx.AsyncClient(timeout=120.0) as client: + video_response = await client.get(video_url) + video_response.raise_for_status() + video_bytes = video_response.content + + logger.info(f"Video gen {generation.id}: downloaded {len(video_bytes)} bytes") + + # 5. Upload to S3 + filename = f"generated_video/{generation.linked_character_id or 'no_char'}/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{random.randint(1000, 9999)}.mp4" + await self.s3_adapter.upload_file(filename, video_bytes, content_type="video/mp4") + + # 6. Create Asset + new_asset = Asset( + name=f"Video_{generation.linked_character_id or 'gen'}", + type=AssetType.GENERATED, + content_type=AssetContentType.VIDEO, + linked_char_id=generation.linked_character_id, + data=None, + minio_object_name=filename, + minio_bucket=self.s3_adapter.bucket_name, + thumbnail=None, # видео thumbnails можно добавить позже + created_by=generation.created_by, + project_id=generation.project_id, + ) + + asset_id = await self.dao.assets.create_asset(new_asset) + new_asset.id = str(asset_id) + + # 7. Finalize generation + end_time = datetime.now() + generation.result_list = [new_asset.id] + generation.result = new_asset.id + generation.status = GenerationStatus.DONE + generation.progress = 100 + generation.video_duration = video_duration + generation.execution_time_seconds = (end_time - start_time).total_seconds() + generation.updated_at = datetime.now(UTC) + await self.dao.generations.update_generation(generation) + + logger.info(f"Video generation {generation.id} completed. Asset: {new_asset.id}, Time: {generation.execution_time_seconds:.1f}s") + + except KlingApiException as e: + logger.error(f"Kling API error for generation {generation.id}: {e}") + generation.status = GenerationStatus.FAILED + generation.failed_reason = str(e) + generation.updated_at = datetime.now(UTC) + await self.dao.generations.update_generation(generation) + raise + except Exception as e: + logger.error(f"Video generation {generation.id} failed: {e}") + generation.status = GenerationStatus.FAILED + generation.failed_reason = str(e) + generation.updated_at = datetime.now(UTC) + await self.dao.generations.update_generation(generation) + raise + async def delete_generation(self, generation_id: str) -> bool: """ Soft delete generation by marking it as deleted. diff --git a/models/Asset.py b/models/Asset.py index ff4eeef..8ae084f 100644 --- a/models/Asset.py +++ b/models/Asset.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, computed_field, Field, model_validator class AssetContentType(str, Enum): IMAGE = 'image' + VIDEO = 'video' PROMPT = 'prompt' class AssetType(str, Enum): diff --git a/models/Generation.py b/models/Generation.py index 6c74100..3d71b12 100644 --- a/models/Generation.py +++ b/models/Generation.py @@ -16,6 +16,7 @@ class GenerationStatus(str, Enum): class Generation(BaseModel): id: Optional[str] = None status: GenerationStatus = GenerationStatus.RUNNING + gen_type: GenType = GenType.IMAGE failed_reason: Optional[str] = None linked_character_id: Optional[str] = None telegram_id: Optional[int] = None @@ -37,6 +38,10 @@ class Generation(BaseModel): album_id: Optional[str] = None created_by: Optional[str] = None # Stores User ID (Telegram ID or Web User ObjectId) project_id: Optional[str] = None + # Video-specific fields + kling_task_id: Optional[str] = None + video_duration: Optional[int] = None # 5 or 10 seconds + video_mode: Optional[str] = None # "std" or "pro" created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) diff --git a/models/enums.py b/models/enums.py index a0fd856..ddb5789 100644 --- a/models/enums.py +++ b/models/enums.py @@ -34,10 +34,12 @@ class Quality(str, Enum): class GenType(str, Enum): TEXT = 'Text' IMAGE = 'Image' + VIDEO = 'Video' @property def value_type(self) -> str: return { GenType.TEXT: 'Text', GenType.IMAGE: 'Image', + GenType.VIDEO: 'Video', }[self] diff --git a/requirements.txt b/requirements.txt index 2450e80..9b55360 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,3 +51,4 @@ python-jose[cryptography]==3.3.0 python-multipart==0.0.22 email-validator prometheus-fastapi-instrumentator +PyJWT