This commit is contained in:
xds
2026-02-02 16:15:17 +03:00
commit e6aad48e72
21 changed files with 631 additions and 0 deletions

4
.env Normal file
View File

@@ -0,0 +1,4 @@
BOT_TOKEN=8495170789:AAHyjjhHwwVtd9_ROnjHqPHRdnmyVr1aeaY
GEMINI_API_KEY=AIzaSyAHzDYhgjOqZZnvOnOFRGaSkKu4OAN3kZE
MONGO_HOST=mongodb://admin:super_secure_password@31.59.58.220:27017/
ADMIN_ID=567047

14
Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.13-slim
# Рабочая директория внутри контейнера
WORKDIR /app
# Копируем зависимости (если есть requirements.txt)
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Копируем код
COPY . .
# Запуск приложения (замени app.py на свой файл)
CMD ["python", "main.py"]

0
adapters/__init__.py Normal file
View File

View File

@@ -0,0 +1,99 @@
import os
import io
import logging
from datetime import datetime
from typing import List, Union, Dict, Any
from PIL import Image
# Импортируем из нового SDK
from google import genai
from google.genai import types
# Для настройки логгера
logger = logging.getLogger(__name__)
class GoogleAdapter:
def __init__(self, api_key: str):
if not api_key:
raise ValueError("API Key for Gemini is missing")
self.client = genai.Client(api_key=api_key)
# Укажите актуальную модель.
# Если gemini-3-pro-image-preview недоступна, используйте gemini-2.0-flash-exp
self.model_name = "gemini-3-pro-preview"
def generate(
self,
prompt: str,
image_bytes: bytes = None,
generate_image: bool = False
) -> Dict[str, Any]:
"""
Универсальный метод:
- Если generate_image=True: просим модель вернуть картинку (Image Generation).
- Если image_bytes переданы + generate_image=False: это Vision (описание фото).
- Если image_bytes + generate_image=True: это Image-to-Image (редактирование).
"""
if generate_image:
self.model_name = "gemini-3-pro-image-preview"
else :
self.model_name = "gemini-3-pro-preview"
contents = [prompt]
# Если есть входное изображение (для Vision или для редактирования)
if image_bytes:
try:
image = Image.open(io.BytesIO(image_bytes))
contents.append(image)
except Exception as e:
logger.error(f"Error processing input image: {e}")
return {"error": "Не удалось обработать входящее изображение."}
# Настраиваем конфигурацию
# Для генерации картинок добавляем 'IMAGE' в response_modalities
modalities = ['TEXT']
if generate_image:
modalities.append('IMAGE')
try:
# Вызов API (синхронный метод в обертке, но aiogram вызывает его в треде,
# либо используйте client.aio для асинхронности если поддерживается версией SDK)
# В google-genai v0.3+ есть асинхронный клиент, но для простоты здесь стандартный вызов.
# Чтобы не блокировать event loop, в main.py мы обернем это в to_thread при необходимости,
# но пока используем стандартный вызов.
response = self.client.models.generate_content(
model=self.model_name,
contents=contents,
config=types.GenerateContentConfig(
response_modalities=modalities,
temperature=0.7 if not generate_image else 1.0,
)
)
result = {"text": "", "images": []}
# Парсим ответ (Text или Inline Data)
if response.parts:
for part in response.parts:
if part.text:
result["text"] += part.text
# Проверяем наличие сгенерированного изображения
if part.inline_data:
# ИСПРАВЛЕНИЕ: Берем "сырые" байты напрямую из ответа
# Это работает быстрее и не вызывает ошибку с PIL
# part.inline_data.data — это уже bytes
byte_arr = io.BytesIO(part.inline_data.data)
now = datetime.now()
# Имя файла для телеграма (формально)
byte_arr.name = f'{now.timestamp()}.png'
result["images"].append(byte_arr)
return result
except Exception as e:
logger.error(f"Gemini API Error: {e}")
return {"error": f"Ошибка API: {str(e)}"}

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
services:
tot-bot:
image: tot-bot:latest
container_name: tot-bot
build:
context: .
network: host
network_mode: host
restart: unless-stopped

15
keyboards.py Normal file
View File

@@ -0,0 +1,15 @@
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
def get_request_kb():
return InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔐 Запросить доступ", callback_data="req_access")]
])
def get_admin_decision_kb(user_id: int):
"""Кнопки для админа с ID пользователя в callback_data"""
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="✅ Разрешить", callback_data=f"access_allow_{user_id}"),
InlineKeyboardButton(text="🚫 Запретить", callback_data=f"access_deny_{user_id}")
]
])

102
main.py Normal file
View File

@@ -0,0 +1,102 @@
import asyncio
import logging
import os
from aiogram import Bot, Dispatcher, Router, F
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command, CommandObject
from aiogram.types import Message, BufferedInputFile
from aiogram.fsm.storage.mongo import MongoStorage
from dotenv import load_dotenv
from motor.motor_asyncio import AsyncIOMotorClient
# Импорты
from adapters.google_adapter import GoogleAdapter
from middlewares.auth import AuthMiddleware
from middlewares.dao import DaoMiddleware
from repos.char_repo import CharacterRepo
from repos.dao import DAO
from repos.user_repo import UsersRepo
from routers import char_router
# ВАЖНО: Импортируем роутер с логикой кнопок, а не создаем пустой
from routers.auth_router import router as auth_router
from routers.gen_router import router as gen_router
from routers.char_router import router as char_router
load_dotenv()
# Настройки
BOT_TOKEN = os.getenv("BOT_TOKEN")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
MONGO_HOST = os.getenv("MONGO_HOST")
ADMIN_ID = int(os.getenv("ADMIN_ID")) # Сразу преобразуем в int
# Инициализация
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
# БД
mongo_client = AsyncIOMotorClient(MONGO_HOST)
users_repo = UsersRepo(mongo_client)
char_repo = CharacterRepo(mongo_client)
# Dispatcher
# Если MongoStorage пока не настроен на authSource=admin, можно временно убрать storage=...
dp = Dispatcher(storage=MongoStorage(mongo_client))
# ВНЕДРЕНИЕ ЗАВИСИМОСТЕЙ (чтобы они были доступны в хендлерах)
dp["repo"] = users_repo
dp["admin_id"] = ADMIN_ID
dp["gemini"] = GoogleAdapter(api_key=GEMINI_API_KEY) # Инициализируем тут
# РОУТИНГ
# 1. Роутер авторизации (кнопки) - ПОДКЛЮЧАЕМ ПЕРВЫМ И БЕЗ МИДЛВАРИ
dp.include_router(auth_router)
main_router = Router()
dp.include_router(main_router)
dp.include_router(char_router)
dp.include_router(gen_router)
# 2. Основной роутер (чат с ботом)
# Вешаем защиту ТОЛЬКО на основной роутер
main_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_ID))
gen_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_ID))
char_router.message.middleware(DaoMiddleware(dao=DAO(client=mongo_client)))
def setup_logging() -> None:
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
# --- ХЕНДЛЕРЫ ОСНОВНОГО РОУТЕРА ---
# Переносим их прямо сюда или в отдельный файл routers/chat_router.py
@main_router.message(Command("help"))
async def show_help(message: Message) -> None:
await message.answer("Для того, чтобы обратиться для текстовой генерации - просто отправь промпт.\n\n"
"Для генерации фото - /image {prompt}\n\n"
"Можно отправить фото и команду /image {prompt}\n\n"
"Диалоги не поддерживаются!!!! <b>Каждое новое сообщение - новый диалог</b>")
@main_router.message(CommandStart())
async def cmd_start(message: Message):
await message.answer("👋 Привет! Я готов к работе.\n\n"
"Для того, чтобы обратиться для текстовой генерации - просто отправь промпт.\n\n"
"Для генерации фото - /image {prompt}\n\n"
"Можно отправить фото и команду /image {prompt}\n\n"
"Диалоги не поддерживаются!!!! <b>Каждое новое сообщение - новый диалог</b>"
)
# --- ЗАПУСК ---
if __name__ == "__main__":
setup_logging()
try:
asyncio.run(dp.start_polling(bot))
except KeyboardInterrupt:
print("Bot stopped")

0
middlewares/__init__.py Normal file
View File

50
middlewares/auth.py Normal file
View File

@@ -0,0 +1,50 @@
from typing import Callable, Dict, Any, Awaitable
from aiogram import BaseMiddleware
from aiogram.types import Message
from repos.user_repo import UsersRepo, UserStatus
from keyboards import get_request_kb
class AuthMiddleware(BaseMiddleware):
def __init__(self, repo: UsersRepo, admin_id: int):
self.repo = repo
self.admin_id = admin_id
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
user = event.from_user
# Админа пускаем всегда
# if user.id == self.admin_id:
# return await handler(event, data)
# Получаем данные из БД
db_user = await self.repo.get_user(user.id)
status = db_user.get("status") if db_user else UserStatus.NONE
# 1. Если доступ уже разрешен — пропускаем к боту
if status == UserStatus.ALLOWED:
return await handler(event, data)
# 2. Если статус PENDING (ждет решения)
if status == UserStatus.PENDING:
await event.answer("⏳ Ваша заявка находится на рассмотрении администратора.")
return
# 3. Если нет в базе или ЗАПРЕЩЕН — проверяем тайминг 24 часа
can_request = await self.repo.can_request_access(user.id)
if can_request:
await event.answer(
"⛔️ У вас нет доступа к этому боту.\nВы можете отправить запрос администратору.",
reply_markup=get_request_kb()
)
else:
# Если 24 часа еще не прошло
await event.answer("⛔️ Доступ запрещен.\nПовторный запрос можно отправить через 24 часа.")
return # Прерываем обработку, хендлеры бота не сработают

13
middlewares/dao.py Normal file
View File

@@ -0,0 +1,13 @@
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject
from repos.dao import DAO
class DaoMiddleware(BaseMiddleware):
def __init__(self, dao: DAO):
self._dao = dao
async def __call__(self, handler, event: TelegramObject, data: dict):
data["dao"] = self._dao
return await handler(event, data)

8
models/Character.py Normal file
View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
class Character(BaseModel):
id: int | None
name: str
character_image: bytes
character_bio: str

0
models/__init__.py Normal file
View File

0
repos/__init__.py Normal file
View File

16
repos/char_repo.py Normal file
View File

@@ -0,0 +1,16 @@
from motor.motor_asyncio import AsyncIOMotorClient
from models.Character import Character
class CharacterRepo:
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
self.collection = client[db_name]["characters"]
async def add_character(self, character: Character) -> Character:
op = await self.collection.insert_one(character.model_dump())
character.id = op.inserted_id
return character
async def get_character(self, character_id: int) -> Character:
return await self.collection.find_one({"id": character_id})

9
repos/dao.py Normal file
View File

@@ -0,0 +1,9 @@
from motor.motor_asyncio import AsyncIOMotorClient
from repos.char_repo import CharacterRepo
from repos.user_repo import UsersRepo
class DAO:
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
self.chars = CharacterRepo(client, db_name)

65
repos/user_repo.py Normal file
View File

@@ -0,0 +1,65 @@
from datetime import datetime, timedelta
from enum import Enum
from aiogram.types import User
from motor.motor_asyncio import AsyncIOMotorClient
class UserStatus:
ALLOWED = "allowed"
DENIED = "denied"
PENDING = "pending"
NONE = "none" # Пользователя нет в базе
class UsersRepo:
def __init__(self, client: AsyncIOMotorClient, db_name="bot_db"):
self.collection = client[db_name]["users"]
async def get_user(self, user_id: int):
return await self.collection.find_one({"user_id": user_id})
async def create_or_update_request(self, user: User):
"""
Обновляет дату последнего запроса и ставит статус PENDING.
Сохраняет всю инфу о юзере.
"""
now = datetime.now()
data = {
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"status": UserStatus.PENDING,
"last_request_date": now
}
await self.collection.update_one(
{"user_id": user.id},
{"$set": data},
upsert=True
)
async def set_status(self, user_id: int, status: str):
"""Меняет статус (разрешен/запрещен)"""
await self.collection.update_one(
{"user_id": user_id},
{"$set": {"status": status}}
)
async def can_request_access(self, user_id: int) -> bool:
"""
Проверяет, можно ли отправить запрос (прошло ли 24 часа).
Возвращает True, если пользователя нет или прошло > 24ч.
"""
user = await self.get_user(user_id)
if not user:
return True
last_date = user.get("last_request_date")
if not last_date:
return True
# Проверка на 24 часа
if datetime.now() - last_date > timedelta(hours=24):
return True
return False

41
requirements.txt Normal file
View File

@@ -0,0 +1,41 @@
aiofiles==24.1.0
aiogram==3.24.0
aiohappyeyeballs==2.6.1
aiohttp==3.11.18
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.12.1
attrs==25.4.0
certifi==2026.1.4
cffi==2.0.0
charset-normalizer==3.4.4
cryptography==46.0.4
distro==1.9.0
dnspython==2.8.0
frozenlist==1.8.0
google-auth==2.48.0
google-genai==1.61.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
magic-filter==1.0.12
motor==3.7.1
multidict==6.7.1
pillow==12.1.0
propcache==0.4.1
pyasn1==0.6.2
pyasn1_modules==0.4.2
pycparser==3.0
pydantic==2.10.6
pydantic_core==2.27.2
pymongo==4.16.0
python-dotenv==1.2.1
requests==2.32.5
rsa==4.9.1
sniffio==1.3.1
tenacity==9.1.2
typing_extensions==4.15.0
urllib3==2.6.3
websockets==15.0.1
yarl==1.22.0

0
routers/__init__.py Normal file
View File

83
routers/auth_router.py Normal file
View File

@@ -0,0 +1,83 @@
from aiogram import Router, F, Bot
from aiogram.types import CallbackQuery, Message
from repos.user_repo import UsersRepo, UserStatus
from keyboards import get_admin_decision_kb
router = Router()
# Чтобы IDE не ругалась, переменные инициализируются в main,
# но здесь мы ожидаем, что они будут переданы или доступны через DI (workflow_data)
# В этом примере я буду доставать их из data, переданной диспетчером, или глобально (для простоты примера - глобально в рамках архитектуры aiogram лучше через middleware DI).
# Для простоты доступа предположим, что repo и admin_id прокинуты.
@router.callback_query(F.data == "req_access")
async def user_request_access(callback: CallbackQuery, repo: UsersRepo, bot: Bot, admin_id: int):
"""Пользователь нажал 'Запросить доступ'"""
user = callback.from_user
# Двойная проверка на 24 часа (на случай если пользователь не обновлял сообщение)
if not await repo.can_request_access(user.id):
await callback.answer("⏳ Вы уже отправляли запрос недавно. Повторите через сутки.", show_alert=True)
return
# 1. Записываем в БД статус PENDING
await repo.create_or_update_request(user)
# 2. Уведомляем пользователя
await callback.message.edit_text("✅ Заявка отправлена администратору. Ожидайте решения.")
# 3. Отправляем уведомление Админу
# Формируем красивый текст
info = (
f"🔔 <b>Новый запрос доступа!</b>\n"
f"👤 Имя: {user.full_name}\n"
f"🆔 ID: <code>{user.id}</code>\n"
f"🔗 Username: @{user.username if user.username else 'нет'}"
)
await bot.send_message(
chat_id=admin_id,
text=info,
reply_markup=get_admin_decision_kb(user.id)
)
await callback.answer()
@router.callback_query(F.data.startswith("access_"))
async def admin_decision(callback: CallbackQuery, repo: UsersRepo, bot: Bot):
"""Админ нажал Разрешить или Запретить"""
action, user_id_str = callback.data.split("_")[1], callback.data.split("_")[2]
target_user_id = int(user_id_str)
if action == "allow":
# Обновляем БД
await repo.set_status(target_user_id, UserStatus.ALLOWED)
# Ответ админу
await callback.message.edit_text(f"✅ Доступ для {target_user_id} <b>РАЗРЕШЕН</b>.")
# Уведомление пользователю
try:
await bot.send_message(target_user_id, "🎉 <b>Вам предоставлен доступ к боту!</b>\nНажмите /start")
except:
await callback.message.answer(
f"⚠️ Не удалось уведомить пользователя {target_user_id} (он заблокировал бота?)")
elif action == "deny":
# Обновляем БД (статус DENIED, дата запроса остается старой - то есть через сутки он сможет снова попросить)
# Если хотите, чтобы при отказе таймер 24ч сбрасывался на "сейчас", нужно обновить last_request_date.
# В текущей реализации repo, мы просто меняем статус. Таймер тикает от момента подачи заявки пользователем.
await repo.set_status(target_user_id, UserStatus.DENIED)
await callback.message.edit_text(f"🚫 Доступ для {target_user_id} <b>ЗАПРЕЩЕН</b>.")
try:
await bot.send_message(target_user_id, "🚫 Администратор отклонил ваш запрос доступа.")
except:
pass
await callback.answer()

48
routers/char_router.py Normal file
View File

@@ -0,0 +1,48 @@
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import *
from aiogram import Router, F
from models.Character import Character
from repos.dao import DAO
router = Router()
class States(StatesGroup):
char_wait_name = State()
char_wait_bio = State()
@router.message(F.document, Command("add_char"))
async def add_char(message: Message, state: FSMContext, dao: DAO):
await state.set_data({"photo": bot.download(file=message.document.file_id)})
await state.set_state(States.char_wait_name)
await message.answer("Кайф, теперь напиши ее имя")
@router.callback_query(States.char_wait_name)
async def new_char_name(message: Message, state: FSMContext, dao: DAO):
await state.set_data({"name": message.text})
await state.set_state(States.char_wait_bio)
await message.answer("А теперь напиши био. Хоть чуть чуть.")
@router.callback_query(States.char_wait_bio)
async def new_char_bio(message: Message, state: FSMContext, dao: DAO):
data = await state.get_data()
photo = data["photo"]
name = data["name"]
char = Character(id=None, name=name, character_image=photo, character_bio=message.text)
await dao.chars.add_character(char)
await message.answer_photo(photo=BufferedInputFile(char.character_image, "img.png"), caption="Персонаж создан!\n"
f"Имя:{char.name}\n"
f"Био: {char.character_bio}\n"
)
@router.message(Command("add_char"))
async def add_char_cmd(message: Message):
await message.answer(
"Добавление персонажа производится через отправку документа-фото исходного изображения персонажа.")

55
routers/gen_router.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
from aiogram import Router, Bot, F
from aiogram.enums import ParseMode
from aiogram.filters import *
from aiogram.types import *
from adapters.google_adapter import GoogleAdapter
router = Router()
@router.message(Command("image"))
async def cmd_image_gen(message: Message, command: CommandObject, gemini: GoogleAdapter, bot: Bot):
# ... ваш код ...
# Обратите внимание: gemini теперь прилетает аргументом, так как мы сделали dp["gemini"]
prompt = command.args
if not prompt:
await message.answer("⚠️ Напиши промпт.")
return
wait_msg = await message.answer("🎨 Генерирую...")
# Получение байтов фото (логика та же)
image_bytes = None
if message.photo:
file_io = await bot.download(message.photo[-1].file_id)
image_bytes = file_io.getvalue()
elif message.reply_to_message and message.reply_to_message.photo:
file_io = await bot.download(message.reply_to_message.photo[-1].file_id)
image_bytes = file_io.getvalue()
result = await asyncio.to_thread(
gemini.generate, prompt=prompt, image_bytes=image_bytes, generate_image=True
)
await wait_msg.delete()
if result.get("images"):
for img in result["images"]:
await message.answer_document(BufferedInputFile(img.read(), "img.png"))
elif result.get("text"):
await message.answer(result["text"])
else:
await message.answer(f"Ошибка: {result.get('error', 'Unknown')}")
@router.message(F.text)
async def handle_text(message: Message, gemini: GoogleAdapter, bot: Bot):
await bot.send_chat_action(message.chat.id, "typing")
result = await asyncio.to_thread(gemini.generate, prompt=message.text)
if result.get("text"):
await message.answer(result["text"], parse_mode=ParseMode.MARKDOWN)
else:
await message.answer("Ошибка генерации")