116 lines
4.5 KiB
Python
116 lines
4.5 KiB
Python
import io
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import List, Union
|
||
|
||
from PIL import Image
|
||
from google import genai
|
||
from google.genai import types
|
||
|
||
from models.enums import AspectRatios, Quality
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class GoogleAdapter:
|
||
def __init__(self, api_key: str):
|
||
if not api_key:
|
||
raise ValueError("API Key for Gemini is missing")
|
||
self.client = genai.Client(api_key=api_key)
|
||
|
||
# Константы моделей
|
||
self.TEXT_MODEL = "gemini-3-pro-preview"
|
||
self.IMAGE_MODEL = "gemini-3-pro-image-preview"
|
||
|
||
def _prepare_contents(self, prompt: str, images_list: List[bytes] = None) -> list:
|
||
"""Вспомогательный метод для подготовки контента (текст + картинки)"""
|
||
contents = [prompt]
|
||
if images_list:
|
||
for img_bytes in images_list:
|
||
try:
|
||
# Gemini API требует PIL Image на входе
|
||
image = Image.open(io.BytesIO(img_bytes))
|
||
contents.append(image)
|
||
except Exception as e:
|
||
logger.error(f"Error processing input image: {e}")
|
||
return contents
|
||
|
||
def generate_text(self, prompt: str, images_list: List[bytes] = None) -> str:
|
||
"""
|
||
Генерация текста (Чат или Vision).
|
||
Возвращает строку с ответом.
|
||
"""
|
||
contents = self._prepare_contents(prompt, images_list)
|
||
logger.info(f"Generating text: {prompt}")
|
||
try:
|
||
response = self.client.models.generate_content(
|
||
model=self.TEXT_MODEL,
|
||
contents=contents,
|
||
config=types.GenerateContentConfig(
|
||
response_modalities=['TEXT'],
|
||
temperature=0.7,
|
||
)
|
||
)
|
||
|
||
# Собираем текст из всех частей ответа
|
||
result_text = ""
|
||
if response.parts:
|
||
for part in response.parts:
|
||
if part.text:
|
||
result_text += part.text
|
||
logger.error(f"Generated text: {result_text}")
|
||
return result_text
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gemini Text API Error: {e}")
|
||
return f"Ошибка генерации текста: {e}"
|
||
|
||
def generate_image(self, prompt: str, aspect_ratio: AspectRatios, quality: Quality, images_list: List[bytes] = None, ) -> List[io.BytesIO]:
|
||
"""
|
||
Генерация изображений (Text-to-Image или Image-to-Image).
|
||
Возвращает список байтовых потоков (готовых к отправке).
|
||
"""
|
||
contents = self._prepare_contents(prompt, images_list)
|
||
|
||
try:
|
||
response = self.client.models.generate_content(
|
||
model=self.IMAGE_MODEL,
|
||
contents=contents,
|
||
config=types.GenerateContentConfig(
|
||
response_modalities=['IMAGE'],
|
||
temperature=1.0,
|
||
image_config=types.ImageConfig(
|
||
aspect_ratio=aspect_ratio.value_ratio,
|
||
image_size=quality.value_quality
|
||
),
|
||
)
|
||
)
|
||
|
||
generated_images = []
|
||
|
||
if response.parts:
|
||
for part in response.parts:
|
||
# Ищем картинки (inline_data)
|
||
if part.inline_data:
|
||
try:
|
||
# 1. Берем сырые байты
|
||
raw_data = part.inline_data.data
|
||
byte_arr = io.BytesIO(raw_data)
|
||
|
||
# 2. Нейминг (формально, для TG)
|
||
timestamp = datetime.now().timestamp()
|
||
byte_arr.name = f'{timestamp}.png'
|
||
|
||
# 3. Важно: сбросить курсор в начало
|
||
byte_arr.seek(0)
|
||
|
||
generated_images.append(byte_arr)
|
||
except Exception as e:
|
||
logger.error(f"Error processing output image: {e}")
|
||
|
||
return generated_images
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gemini Image API Error: {e}")
|
||
# В случае ошибки возвращаем пустой список (или можно рейзить исключение)
|
||
return [] |