127 lines
4.5 KiB
Python
127 lines
4.5 KiB
Python
import logging
|
|
import io
|
|
import httpx
|
|
import hashlib
|
|
import time
|
|
from typing import List, Tuple, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from models.enums import AspectRatios, Quality
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AIProxyException(Exception):
|
|
def __init__(self, message: str, error_code: str | None = None):
|
|
super().__init__(message)
|
|
self.error_code = error_code
|
|
|
|
class AIProxyAdapter:
|
|
def __init__(self, base_url: str = None, salt: str = None):
|
|
self.base_url = (base_url or settings.PROXY_URL).rstrip("/")
|
|
self.salt = salt or settings.PROXY_SECRET_SALT
|
|
|
|
def _generate_headers(self) -> Dict[str, str]:
|
|
timestamp = int(time.time())
|
|
hash_input = f"{timestamp}{self.salt}".encode()
|
|
signature = hashlib.sha256(hash_input).hexdigest()
|
|
|
|
return {
|
|
"X-Timestamp": str(timestamp),
|
|
"X-Signature": signature
|
|
}
|
|
|
|
def _handle_http_error(self, e: httpx.HTTPStatusError, context: str):
|
|
error_code = None
|
|
message = str(e)
|
|
try:
|
|
error_data = e.response.json()
|
|
detail = error_data.get("detail")
|
|
if isinstance(detail, dict):
|
|
error_code = detail.get("error_code")
|
|
message = detail.get("message", message)
|
|
elif isinstance(detail, str):
|
|
message = detail
|
|
except Exception:
|
|
pass
|
|
|
|
logger.error(f"{context} Error: {message} (code: {error_code})")
|
|
raise AIProxyException(message, error_code=error_code)
|
|
|
|
async def generate_text(self, prompt: str, model: str = "gemini-3.1-pro-preview", asset_urls: List[str] | None = None) -> str:
|
|
"""
|
|
Generates text using the AI Proxy with signature verification.
|
|
"""
|
|
url = f"{self.base_url}/generate_text"
|
|
|
|
messages = [{"role": "user", "content": prompt}]
|
|
payload = {
|
|
"messages": messages,
|
|
"asset_urls": asset_urls
|
|
}
|
|
|
|
headers = self._generate_headers()
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(url, json=payload, headers=headers, timeout=60.0)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("finish_reason") != "STOP":
|
|
logger.warning(f"AI Proxy generation finished with reason: {data.get('finish_reason')}")
|
|
|
|
return data.get("response") or ""
|
|
except httpx.HTTPStatusError as e:
|
|
self._handle_http_error(e, "AI Proxy Text")
|
|
except Exception as e:
|
|
logger.error(f"AI Proxy Text General Error: {e}")
|
|
raise AIProxyException(f"AI Proxy Text Error: {e}")
|
|
|
|
async def generate_image(
|
|
self,
|
|
prompt: str,
|
|
aspect_ratio: AspectRatios,
|
|
quality: Quality,
|
|
model: str = "gemini-3-pro-image-preview",
|
|
asset_urls: List[str] | None = None
|
|
) -> Tuple[List[io.BytesIO], Dict[str, Any]]:
|
|
"""
|
|
Generates image using the AI Proxy with signature verification.
|
|
"""
|
|
url = f"{self.base_url}/generate_image"
|
|
|
|
payload = {
|
|
"prompt": prompt,
|
|
"asset_urls": asset_urls
|
|
}
|
|
|
|
headers = self._generate_headers()
|
|
|
|
start_time = datetime.now()
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(url, json=payload, headers=headers, timeout=120.0)
|
|
response.raise_for_status()
|
|
|
|
image_bytes = response.content
|
|
byte_arr = io.BytesIO(image_bytes)
|
|
byte_arr.name = f"{datetime.now().timestamp()}.png"
|
|
byte_arr.seek(0)
|
|
|
|
end_time = datetime.now()
|
|
api_duration = (end_time - start_time).total_seconds()
|
|
|
|
metrics = {
|
|
"api_execution_time_seconds": api_duration,
|
|
"token_usage": 0,
|
|
"input_token_usage": 0,
|
|
"output_token_usage": 0
|
|
}
|
|
|
|
return [byte_arr], metrics
|
|
except httpx.HTTPStatusError as e:
|
|
self._handle_http_error(e, "AI Proxy Image")
|
|
except Exception as e:
|
|
logger.error(f"AI Proxy Image General Error: {e}")
|
|
raise AIProxyException(f"AI Proxy Image Error: {e}")
|