import logging import time import asyncio from typing import Optional, Dict, Any import httpx import jwt logger = logging.getLogger(__name__) KLING_API_BASE = "https://api.klingai.com" class KlingApiException(Exception): pass class KlingAdapter: def __init__(self, access_key: str, secret_key: str): if not access_key or not secret_key: raise ValueError("Kling API credentials are missing") self.access_key = access_key self.secret_key = secret_key def _generate_token(self) -> str: """Generate a JWT token for Kling API authentication.""" now = int(time.time()) payload = { "iss": self.access_key, "exp": now + 1800, # 30 minutes "iat": now - 5, # небольшой запас назад "nbf": now - 5, } return jwt.encode(payload, self.secret_key, algorithm="HS256", headers={"typ": "JWT", "alg": "HS256"}) def _headers(self) -> dict: return { "Content-Type": "application/json", "Authorization": f"Bearer {self._generate_token()}" } async def create_video_task( self, image_url: str, prompt: str = "", negative_prompt: str = "", model_name: str = "kling-v2-6", duration: int = 5, mode: str = "std", cfg_scale: float = 0.5, aspect_ratio: str = "16:9", callback_url: Optional[str] = None, ) -> Dict[str, Any]: """ Create an image-to-video generation task. Returns the full task data dict including task_id. """ body: Dict[str, Any] = { "model_name": model_name, "image": image_url, "prompt": prompt, "negative_prompt": negative_prompt, "duration": str(duration), "mode": mode, "cfg_scale": cfg_scale, "aspect_ratio": aspect_ratio, } if callback_url: body["callback_url"] = callback_url logger.info(f"Creating Kling video task. Model: {model_name}, Duration: {duration}s, Mode: {mode}") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{KLING_API_BASE}/v1/videos/image2video", headers=self._headers(), json=body, ) data = response.json() logger.info(f"Kling create task response: code={data.get('code')}, message={data.get('message')}") if response.status_code != 200 or data.get("code") != 0: error_msg = data.get("message", "Unknown Kling API error") raise KlingApiException(f"Failed to create video task: {error_msg} (code={data.get('code')})") task_data = data.get("data", {}) task_id = task_data.get("task_id") if not task_id: raise KlingApiException("No task_id returned from Kling API") logger.info(f"Kling video task created: task_id={task_id}") return task_data async def get_task_status(self, task_id: str) -> Dict[str, Any]: """ Query the status of a video generation task. Returns the full task data dict. """ async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{KLING_API_BASE}/v1/videos/image2video/{task_id}", headers=self._headers(), ) data = response.json() if response.status_code != 200 or data.get("code") != 0: error_msg = data.get("message", "Unknown error") raise KlingApiException(f"Failed to query task {task_id}: {error_msg}") return data.get("data", {}) async def wait_for_completion( self, task_id: str, poll_interval: int = 10, timeout: int = 600, progress_callback=None, ) -> Dict[str, Any]: """ Poll the task status until completion. Args: task_id: Kling task ID poll_interval: seconds between polls timeout: max seconds to wait progress_callback: async callable(progress_pct: int) to report progress Returns: Final task data dict with video URL on success. Raises: KlingApiException on failure or timeout. """ start = time.time() attempt = 0 while True: elapsed = time.time() - start if elapsed > timeout: raise KlingApiException(f"Video generation timed out after {timeout}s for task {task_id}") task_data = await self.get_task_status(task_id) status = task_data.get("task_status") logger.info(f"Kling task {task_id}: status={status}, elapsed={elapsed:.0f}s") if status == "succeed": logger.info(f"Kling task {task_id} completed successfully") return task_data if status == "failed": fail_reason = task_data.get("task_status_msg", "Unknown failure") raise KlingApiException(f"Video generation failed: {fail_reason}") # Report progress estimate (linear approximation based on typical time) if progress_callback: # Estimate: typical gen is ~120s, cap at 90% estimated_progress = min(int((elapsed / 120) * 90), 90) attempt += 1 await progress_callback(estimated_progress) await asyncio.sleep(poll_interval)