Files
ai-char-bot/scheduler/daily_scheduler.py
2026-03-17 18:02:22 +03:00

457 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional, Tuple
from aiogram import Bot
from aiogram.types import BufferedInputFile, InlineKeyboardButton, InlineKeyboardMarkup
from adapters.google_adapter import GoogleAdapter
from adapters.ai_proxy_adapter import AIProxyAdapter, AIProxyException
from adapters.s3_adapter import S3Adapter
from api.service.generation_service import GenerationService
from models.Asset import Asset
from models.Generation import Generation, GenerationStatus
from models.enums import AspectRatios, ImageModel, Quality, TextModel
from repos.dao import DAO
logger = logging.getLogger(__name__)
MSK_TZ = timezone(timedelta(hours=3))
SCHEDULE_HOUR_MSK = 11
SCHEDULE_MINUTE_MSK = 0
# Callback data prefixes for inline keyboard buttons
CB_POST = "daily_post"
CB_REGEN_ALL = "daily_regen_all"
CB_REGEN_IMG = "daily_regen_img"
CB_REGEN_MORE = "daily_regen_more"
CB_CANCEL = "daily_cancel"
def make_admin_keyboard(generation_id: str) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(text="✅ Выложить", callback_data=f"{CB_POST}:{generation_id}"),
InlineKeyboardButton(text="❌ Отмена", callback_data=f"{CB_CANCEL}:{generation_id}"),
],
[
InlineKeyboardButton(text="🔄 Перегенерить с нуля", callback_data=f"{CB_REGEN_ALL}:{generation_id}"),
InlineKeyboardButton(text="🖼 Перегенерить изображение", callback_data=f"{CB_REGEN_IMG}:{generation_id}"),
],
[
InlineKeyboardButton(text=" Сгенерировать ещё 2", callback_data=f"{CB_REGEN_MORE}:{generation_id}"),
],
]
)
class DailyScheduler:
"""Orchestrates the daily AI-character content generation pipeline.
Flow:
1. Generate image prompt + social caption via LLM (with character avatar).
2. Generate image via GenerationService.create_generation() (reuses existing pipeline).
3. Send to Telegram admin with action buttons.
Admin actions (inline keyboard):
- Выложить → post to Instagram feed + story via Meta API.
- Перегенерить с нуля → restart from step 1.
- Перегенерить изображение → restart from step 2 (same prompt/caption).
- Сгенерировать ещё 2 → generate 2 pose-varied images.
- Отмена → dismiss (no action).
"""
def __init__(
self,
dao: DAO,
gemini: GoogleAdapter,
s3_adapter: S3Adapter,
generation_service: GenerationService,
bot: Bot,
admin_id: int,
character_id: str,
meta_adapter=None, # Optional[MetaAdapter]
):
self.dao = dao
self.gemini = gemini
self.ai_proxy = AIProxyAdapter()
self.s3_adapter = s3_adapter
self.generation_service = generation_service
self.bot = bot
self.admin_id = admin_id
self.character_id = character_id
self.meta_adapter = meta_adapter
# Stores session state keyed by generation_id.
# Each value: {prompt, caption, asset_id, message_id, chat_id}
self.pending_sessions: Dict[str, Dict[str, Any]] = {}
# ------------------------------------------------------------------
# Scheduler loop
# ------------------------------------------------------------------
async def run_loop(self):
"""Run indefinitely, triggering daily generation at 11:00 MSK."""
logger.info("Daily scheduler loop started")
while True:
try:
await self._wait_until_next_run()
logger.info("Daily scheduler: triggering daily generation")
await self.run_daily_generation()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Daily scheduler loop error: {e}", exc_info=True)
async def _wait_until_next_run(self):
now = datetime.now(MSK_TZ)
next_run = now.replace(
hour=SCHEDULE_HOUR_MSK,
minute=SCHEDULE_MINUTE_MSK,
second=0,
microsecond=0,
)
if now >= next_run:
next_run += timedelta(days=1)
wait_seconds = (next_run - now).total_seconds()
logger.info(
f"Next daily generation at {next_run.strftime('%Y-%m-%d %H:%M MSK')} "
f"(in {wait_seconds / 3600:.1f}h)"
)
await asyncio.sleep(wait_seconds)
# ------------------------------------------------------------------
# Main generation pipeline
# ------------------------------------------------------------------
async def run_daily_generation(self):
"""Full pipeline: prompt → image → send to admin."""
try:
prompt, caption = await self._generate_prompt_and_caption()
logger.info(f"Prompt generated ({len(prompt)} chars), caption ({len(caption)} chars)")
generation, asset = await self._generate_image_and_save(prompt)
logger.info(f"Generation done: id={generation.id}, asset={asset.id}")
await self._send_to_admin(generation, asset, prompt, caption)
except Exception as e:
logger.error(f"Daily generation pipeline failed: {e}", exc_info=True)
try:
await self.bot.send_message(
chat_id=self.admin_id,
text=f"❌ <b>Ежедневная генерация провалилась:</b>\n<code>{e}</code>",
)
except Exception:
pass
# ------------------------------------------------------------------
# Step 1 — Generate prompt + caption via LLM
# ------------------------------------------------------------------
async def _generate_prompt_and_caption(self) -> Tuple[str, str]:
"""Ask Gemini to produce an image prompt and social caption.
Passes the character's avatar photo to the model so it can create
a prompt that is faithful to the character's appearance.
"""
char = await self.dao.chars.get_character(self.character_id)
if not char:
raise ValueError(f"Character {self.character_id} not found in DB")
avatar_bytes_list: list[bytes] = []
if char.avatar_asset_id:
avatar_asset = await self.dao.assets.get_asset(char.avatar_asset_id, with_data=True)
if avatar_asset and avatar_asset.data:
avatar_bytes_list.append(avatar_asset.data)
char_bio = char.character_bio or "An expressive, stylish AI character."
system_prompt = (
f"You are a creative director for the social media account of an AI character named '{char.name}'.\n"
# f"Character description: {char_bio}\n\n"
"I'm attaching the character's avatar photo. Based on it, produce TWO things:\n\n"
"1. IMAGE_PROMPT: A detailed, vivid image generation prompt in English. "
"Describe the pose, environment, lighting, color palette, and artistic style. It must look amateur. "
"Make it unique and suitable for a social media post.\n\n"
"2. SOCIAL_CAPTION: An engaging caption in English for Instagram and TikTok. "
"Include 5-10 relevant hashtags at the end.\n\n"
"Reply in EXACTLY this format (two lines, no extra text before IMAGE_PROMPT):\n"
"IMAGE_PROMPT: <prompt here>\n"
"SOCIAL_CAPTION: <caption here>"
)
settings = await self.dao.settings.get_settings()
if settings.use_ai_proxy:
asset_urls = await self._prepare_asset_urls([char.avatar_asset_id]) if char.avatar_asset_id else None
raw = await self.ai_proxy.generate_text(
system_prompt,
TextModel.GEMINI_3_1_PRO_PREVIEW.value,
asset_urls
)
else:
raw = await asyncio.to_thread(
self.gemini.generate_text,
system_prompt,
TextModel.GEMINI_3_1_PRO_PREVIEW.value,
avatar_bytes_list or None,
)
logger.debug(f"LLM raw response: {raw[:500]}")
prompt, caption = self._parse_prompt_and_caption(raw, char.name)
return prompt, caption
async def _prepare_asset_urls(self, asset_ids: list[str]) -> list[str]:
assets = await self.dao.assets.get_assets_by_ids(asset_ids)
urls = []
for asset in assets:
if asset.minio_object_name:
bucket = asset.minio_bucket or self.s3_adapter.bucket_name
urls.append(f"{bucket}/{asset.minio_object_name}")
return urls
@staticmethod
def _parse_prompt_and_caption(raw: str, char_name: str) -> Tuple[str, str]:
prompt = ""
caption = ""
if "IMAGE_PROMPT:" in raw and "SOCIAL_CAPTION:" in raw:
after_label = raw.split("IMAGE_PROMPT:", 1)[1]
prompt = after_label.split("SOCIAL_CAPTION:", 1)[0].strip()
caption = after_label.split("SOCIAL_CAPTION:", 1)[1].strip()
elif "IMAGE_PROMPT:" in raw:
prompt = raw.split("IMAGE_PROMPT:", 1)[1].strip()
else:
prompt = raw.strip()
if not prompt:
raise ValueError(f"LLM did not produce IMAGE_PROMPT. Raw snippet: {raw[:300]}")
if not caption:
caption = f"✨ Новый контент от {char_name}"
return prompt, caption
# ------------------------------------------------------------------
# Step 2 — Generate image via GenerationService
# ------------------------------------------------------------------
async def _generate_image_and_save(
self,
prompt: str,
variation_hint: Optional[str] = None,
) -> Tuple[Generation, Asset]:
"""Create a Generation record and delegate execution to GenerationService.
Uses GenerationService.create_generation() which handles:
- loading character avatar / reference assets
- calling Gemini image generation
- saving result as Asset in S3
- finalizing the Generation record with metrics
No telegram_id is set, so the service won't send its own notification —
we handle that ourselves in _send_to_admin() with action buttons.
"""
actual_prompt = prompt
if variation_hint:
actual_prompt = f"{prompt}. {variation_hint}"
# Create Generation record (GenerationService.create_generation expects it pre-saved)
generation = Generation(
status=GenerationStatus.RUNNING,
linked_character_id=self.character_id,
aspect_ratio=AspectRatios.NINESIXTEEN,
quality=Quality.ONEK,
prompt=actual_prompt,
model=ImageModel.GEMINI_3_PRO_IMAGE_PREVIEW.value,
use_profile_image=True,
# No telegram_id → service won't send its own notification
)
gen_id = await self.dao.generations.create_generation(generation)
generation.id = gen_id
try:
# Delegate all heavy lifting to the existing service
await self.generation_service.create_generation(generation)
except AIProxyException as e:
# error code is already saved by create_generation
raise ValueError(f"AI Proxy Error: {e.error_code or str(e)}")
except Exception as e:
raise ValueError(f"Image generation failed: {str(e)}")
# After create_generation, generation.result_list is populated
if not generation.result_list:
raise ValueError("Generation completed but produced no assets")
asset = await self.dao.assets.get_asset(generation.result_list[0], with_data=False)
if not asset:
raise ValueError(f"Asset {generation.result_list[0]} not found after generation")
return generation, asset
# ------------------------------------------------------------------
# Step 3 — Send to admin
# ------------------------------------------------------------------
async def _send_to_admin(
self,
generation: Generation,
asset: Asset,
prompt: str,
caption: str,
):
img_data = await self.s3_adapter.get_file(asset.minio_object_name)
if not img_data:
raise ValueError(f"Cannot load image from S3: {asset.minio_object_name}")
self.pending_sessions[generation.id] = {
"prompt": prompt,
"caption": caption,
"asset_id": asset.id,
}
msg = await self.bot.send_photo(
chat_id=self.admin_id,
photo=BufferedInputFile(img_data, filename="daily.png"),
caption=(
f"📸 <b>Ежедневная генерация</b>\n\n"
f"<b>Подпись для соцсетей:</b>\n{caption}\n\n"
f"<b>Промпт:</b>\n<code>{prompt[:300]}</code>"
),
reply_markup=make_admin_keyboard(generation.id),
)
self.pending_sessions[generation.id]["message_id"] = msg.message_id
self.pending_sessions[generation.id]["chat_id"] = msg.chat.id
# ------------------------------------------------------------------
# Admin action handlers (called from Telegram callback router)
# ------------------------------------------------------------------
async def handle_post(self, generation_id: str, message_id: int, chat_id: int):
"""Post to Instagram feed + story."""
session = self.pending_sessions.get(generation_id)
if not session:
return
if not self.meta_adapter:
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption="⚠️ Meta API не настроен (META_ACCESS_TOKEN не задан). Публикация недоступна.",
)
return
try:
asset = await self.dao.assets.get_asset(session["asset_id"], with_data=False)
if not asset or not asset.minio_object_name:
raise ValueError("Asset not found in DB")
image_url = await self.s3_adapter.get_presigned_url(
asset.minio_object_name, expiration=3600
)
if not image_url:
raise ValueError("Could not generate presigned URL for image")
feed_id = await self.meta_adapter.post_to_feed(image_url, session["caption"])
story_id = await self.meta_adapter.post_to_story(image_url)
self.pending_sessions.pop(generation_id, None)
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption=(
f"✅ <b>Опубликовано!</b>\n\n"
f"📰 Feed ID: <code>{feed_id}</code>\n"
f"📖 Story ID: <code>{story_id}</code>"
),
)
except Exception as e:
logger.error(f"Meta publish failed for generation {generation_id}: {e}", exc_info=True)
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption=f"❌ <b>Ошибка публикации:</b>\n<code>{e}</code>",
reply_markup=make_admin_keyboard(generation_id),
)
async def handle_regen_all(self, generation_id: str, message_id: int, chat_id: int):
"""Restart from step 1: generate new prompt, caption, and image."""
self.pending_sessions.pop(generation_id, None)
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption="🔄 <b>Перегенерация с нуля...</b>",
)
asyncio.create_task(self._run_regen_all(chat_id))
async def _run_regen_all(self, chat_id: int):
try:
await self.run_daily_generation()
except Exception as e:
logger.error(f"Regen-all failed: {e}", exc_info=True)
await self.bot.send_message(chat_id=chat_id, text=f"❌ Ошибка перегенерации:\n<code>{e}</code>")
async def handle_regen_image(self, generation_id: str, message_id: int, chat_id: int):
"""Restart from step 2: generate new image using existing prompt/caption."""
session = self.pending_sessions.pop(generation_id, None)
if not session:
return
prompt = session["prompt"]
caption = session["caption"]
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption="🖼 <b>Перегенерация изображения...</b>",
)
asyncio.create_task(self._run_regen_image(prompt, caption, chat_id))
async def _run_regen_image(self, prompt: str, caption: str, chat_id: int):
try:
generation, asset = await self._generate_image_and_save(prompt)
await self._send_to_admin(generation, asset, prompt, caption)
except Exception as e:
logger.error(f"Regen-image failed: {e}", exc_info=True)
await self.bot.send_message(chat_id=chat_id, text=f"❌ Ошибка генерации:\n<code>{e}</code>")
async def handle_regen_more(self, generation_id: str, message_id: int, chat_id: int):
"""Generate 2 more pose-varied images using the existing prompt/caption."""
session = self.pending_sessions.get(generation_id)
if not session:
return
prompt = session["prompt"]
caption = session["caption"]
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption=" <b>Генерирую ещё 2 варианта...</b>",
)
asyncio.create_task(self._run_regen_more(prompt, caption, chat_id))
async def _run_regen_more(self, prompt: str, caption: str, chat_id: int):
variation_hints = [
"Slightly vary the pose and camera angle while keeping the same scene, environment and lighting.",
"Try a different subtle pose or expression, same background and setting as described.",
]
for i, hint in enumerate(variation_hints):
try:
generation, asset = await self._generate_image_and_save(prompt, variation_hint=hint)
await self._send_to_admin(generation, asset, prompt, caption)
except Exception as e:
logger.error(f"Regen-more variant {i + 1} failed: {e}", exc_info=True)
await self.bot.send_message(
chat_id=chat_id,
text=f"❌ Ошибка варианта {i + 1}:\n<code>{e}</code>",
)
async def handle_cancel(self, generation_id: str, message_id: int, chat_id: int):
"""Dismiss: remove buttons, do nothing else."""
self.pending_sessions.pop(generation_id, None)
await self.bot.edit_message_caption(
chat_id=chat_id,
message_id=message_id,
caption="🚫 Отменено.",
)