This commit is contained in:
xds
2026-03-22 12:40:33 +03:00
commit 28a5d51389
61 changed files with 6085 additions and 0 deletions

View File

View File

@@ -0,0 +1,104 @@
import json
import logging
import time
from google import genai
from google.genai import types
from app.config import settings
logger = logging.getLogger("app.services.ai_advisor")
SYSTEM_PROMPT = """
Ты — эксперт по 3D-печати из инженерных пластиков по технологии FDM.
Твоя задача — рекомендовать оптимальный материал для печати на основе описания задачи клиента.
Доступные материалы:
{materials_json}
Правила:
1. Всегда рекомендуй один основной материал и 1-2 альтернативы.
2. Учитывай: температурный режим, механические нагрузки, химическое воздействие, UV, влажность.
3. Если задача не подходит для FDM-печати (слишком мелкие детали, высокая точность) — честно скажи об этом.
4. Отвечай кратко, по делу, на русском языке.
5. Если клиент не указал критичные параметры — задай уточняющие вопросы.
Формат ответа — строго JSON:
{{
"recommended_material_id": <int>,
"recommended_material_name": "<str>",
"reasoning": "<обоснование на русском>",
"alternatives": [{{"material_id": <int>, "name": "<str>", "why": "<причина>"}}],
"questions": ["<вопрос, если нужна доп. информация>"]
}}
"""
async def get_material_recommendation(
task_description: str,
materials_data: list[dict],
budget_preference: str = "optimal",
file_info: dict | None = None,
) -> dict:
"""Get material recommendation from Google Gemini API."""
logger.info("=== AI Advisor request ===")
logger.info("Task: %s", task_description)
logger.info("Budget preference: %s", budget_preference)
logger.info("File info: %s", file_info)
logger.info("Materials count: %d", len(materials_data))
if not settings.GOOGLE_API_KEY:
logger.error("GOOGLE_API_KEY is not configured")
raise ValueError("GOOGLE_API_KEY not configured")
materials_json = json.dumps(materials_data, ensure_ascii=False, indent=2)
system = SYSTEM_PROMPT.format(materials_json=materials_json)
logger.debug("System prompt length: %d chars", len(system))
user_message = f"Описание задачи: {task_description}\nПредпочтение по бюджету: {budget_preference}"
if file_info:
user_message += f"\nИнформация о модели: {json.dumps(file_info, ensure_ascii=False)}"
logger.debug("User message: %s", user_message)
logger.info("Sending request to Gemini API (model: gemini-2.0-flash)...")
start_time = time.time()
client = genai.Client(api_key=settings.GOOGLE_API_KEY)
response = await client.aio.models.generate_content(
model="gemini-3-flash-preview",
contents=user_message,
config=types.GenerateContentConfig(
system_instruction=system,
max_output_tokens=1024,
temperature=0.3,
),
)
elapsed = time.time() - start_time
logger.info("Gemini API responded in %.2f seconds", elapsed)
response_text = response.text
logger.debug("Raw response (%d chars): %s", len(response_text), response_text[:500])
try:
result = json.loads(response_text)
logger.info("Response parsed as JSON successfully")
logger.info("Recommended material: id=%s, name=%s",
result.get("recommended_material_id"), result.get("recommended_material_name"))
logger.info("Alternatives: %d, Questions: %d",
len(result.get("alternatives", [])), len(result.get("questions", [])))
logger.info("=== AI Advisor complete ===")
return result
except json.JSONDecodeError:
logger.warning("Direct JSON parse failed, trying to extract JSON from response...")
start = response_text.find("{")
end = response_text.rfind("}") + 1
if start != -1 and end > start:
extracted = response_text[start:end]
logger.debug("Extracted JSON substring [%d:%d]: %s", start, end, extracted[:300])
result = json.loads(extracted)
logger.info("Extracted JSON parsed successfully")
logger.info("=== AI Advisor complete ===")
return result
logger.error("Failed to extract JSON from AI response: %s", response_text[:200])
raise ValueError("AI вернул невалидный JSON")

View File

@@ -0,0 +1,62 @@
import logging
from dataclasses import dataclass
import trimesh
logger = logging.getLogger("app.services.file_parser")
@dataclass
class FileInfo:
volume_cm3: float
surface_area_cm2: float
bounding_box_mm: dict[str, float]
is_watertight: bool
triangle_count: int
SUPPORTED_EXTENSIONS = {".stl", ".3mf", ".obj"}
def parse_3d_file(file_path: str, file_extension: str) -> FileInfo:
"""Parse a 3D file and return geometric properties."""
ext = file_extension.lower().lstrip(".")
logger.info("Parsing 3D file: path=%s, extension=%s", file_path, ext)
logger.debug("Loading mesh with trimesh (file_type=%s)...", ext)
mesh = trimesh.load(file_path, file_type=ext)
logger.debug("Trimesh loaded object type: %s", type(mesh).__name__)
if isinstance(mesh, trimesh.Scene):
meshes = list(mesh.dump())
logger.info("File is a Scene with %d geometries, concatenating...", len(meshes))
if not meshes:
logger.error("Scene contains no geometries")
raise ValueError("Файл не содержит 3D-геометрии")
mesh = trimesh.util.concatenate(meshes)
logger.debug("Concatenated into single Trimesh")
if not isinstance(mesh, trimesh.Trimesh):
logger.error("Could not extract Trimesh object, got: %s", type(mesh).__name__)
raise ValueError("Не удалось извлечь 3D-геометрию из файла")
volume_cm3 = abs(mesh.volume) / 1000.0
surface_area_cm2 = mesh.area / 100.0
bbox = {
"x": round(float(mesh.bounding_box.extents[0]), 2),
"y": round(float(mesh.bounding_box.extents[1]), 2),
"z": round(float(mesh.bounding_box.extents[2]), 2),
}
is_watertight = bool(mesh.is_watertight)
triangle_count = len(mesh.faces)
logger.info("Parse result: volume=%.2f cm3, area=%.2f cm2, bbox=%s, watertight=%s, triangles=%d",
volume_cm3, surface_area_cm2, bbox, is_watertight, triangle_count)
return FileInfo(
volume_cm3=volume_cm3,
surface_area_cm2=surface_area_cm2,
bounding_box_mm=bbox,
is_watertight=is_watertight,
triangle_count=triangle_count,
)

View File

@@ -0,0 +1,140 @@
import logging
from dataclasses import dataclass
from app.services.file_parser import FileInfo
logger = logging.getLogger("app.services.price_engine")
TIME_RATE_PER_HOUR = 200.0 # руб/час
SETUP_TIME_MIN = 15.0 # минуты
TRAVEL_TIME_PER_LAYER_MIN = 0.3
POST_PROCESSING_COSTS = {
"sanding": 300.0,
"painting": 500.0,
"threading": 200.0,
"acetone_smoothing": 400.0,
}
QUANTITY_DISCOUNTS = [
(1, 0),
(2, 5),
(6, 10),
(21, 15),
(101, 20),
]
@dataclass
class PriceResult:
weight_grams: float
material_cost_rub: float
print_time_hours: float
time_cost_rub: float
post_processing_cost_rub: float
subtotal_rub: float
quantity: int
quantity_discount_percent: int
total_rub: float
estimated_days: int
def get_quantity_discount(quantity: int) -> int:
discount = 0
for min_qty, disc in QUANTITY_DISCOUNTS:
if quantity >= min_qty:
discount = disc
logger.debug("Quantity discount for %d pcs: %d%%", quantity, discount)
return discount
def estimate_print_time(file_info: FileInfo, layer_height_mm: float, flow_rate_mm3_s: float) -> float:
"""Estimate print time in hours."""
z_height = file_info.bounding_box_mm.get("z", 10.0)
layers = max(z_height / layer_height_mm, 1)
volume_mm3 = file_info.volume_cm3 * 1000.0
volume_per_layer = volume_mm3 / layers
time_per_layer_min = volume_per_layer / flow_rate_mm3_s / 60.0
total_min = layers * (time_per_layer_min + TRAVEL_TIME_PER_LAYER_MIN) + SETUP_TIME_MIN
hours = round(total_min / 60.0, 1)
logger.debug("Print time estimate: z=%.1fmm, layers=%.0f, vol_per_layer=%.1fmm3, "
"time_per_layer=%.2fmin, total=%.1fmin (%.1fh)",
z_height, layers, volume_per_layer, time_per_layer_min, total_min, hours)
return hours
def calculate_price(
file_info: FileInfo,
density_g_cm3: float,
price_per_gram: float,
flow_rate_mm3_s: float,
infill_percent: int = 30,
layer_height_mm: float = 0.2,
quantity: int = 1,
post_processing: list[str] | None = None,
) -> PriceResult:
post_processing = post_processing or []
logger.info("=== Price calculation start ===")
logger.info("Input: volume=%.2f cm3, density=%.2f g/cm3, price_per_gram=%.1f RUB",
file_info.volume_cm3, density_g_cm3, price_per_gram)
logger.info("Params: infill=%d%%, layer=%.2fmm, qty=%d, post_processing=%s",
infill_percent, layer_height_mm, quantity, post_processing)
effective_volume = file_info.volume_cm3 * (infill_percent / 100.0) * 0.7 + file_info.volume_cm3 * 0.3
logger.debug("Effective volume: %.2f cm3 (infill-scaled: %.2f + walls: %.2f)",
effective_volume,
file_info.volume_cm3 * (infill_percent / 100.0) * 0.7,
file_info.volume_cm3 * 0.3)
weight_g = round(effective_volume * density_g_cm3, 1)
material_cost = round(weight_g * price_per_gram, 2)
logger.debug("Weight: %.1f g, material cost: %.2f RUB", weight_g, material_cost)
print_time_h = estimate_print_time(file_info, layer_height_mm, flow_rate_mm3_s)
time_cost = round(print_time_h * TIME_RATE_PER_HOUR, 2)
logger.debug("Print time: %.1f h, time cost: %.2f RUB (rate: %.0f RUB/h)", print_time_h, time_cost, TIME_RATE_PER_HOUR)
pp_cost = 0.0
for pp in post_processing:
cost = POST_PROCESSING_COSTS.get(pp, 0)
logger.debug("Post-processing '%s': %.0f RUB", pp, cost)
pp_cost += cost
pp_cost = round(pp_cost, 2)
logger.debug("Total post-processing cost: %.2f RUB", pp_cost)
subtotal = round(material_cost + time_cost + pp_cost, 2)
logger.debug("Subtotal (1 pc): %.2f RUB = material(%.2f) + time(%.2f) + pp(%.2f)",
subtotal, material_cost, time_cost, pp_cost)
discount_pct = get_quantity_discount(quantity)
total = round(subtotal * quantity * (1 - discount_pct / 100.0), 2)
logger.info("Total: %.2f RUB (qty=%d, discount=%d%%, subtotal_per_unit=%.2f)",
total, quantity, discount_pct, subtotal)
if print_time_h <= 2:
estimated_days = 2
elif print_time_h <= 8:
estimated_days = 3
else:
estimated_days = 5
if quantity > 10:
estimated_days += 2
if quantity > 50:
estimated_days += 3
logger.info("Estimated days: %d", estimated_days)
logger.info("=== Price calculation complete ===")
return PriceResult(
weight_grams=weight_g,
material_cost_rub=material_cost,
print_time_hours=print_time_h,
time_cost_rub=time_cost,
post_processing_cost_rub=pp_cost,
subtotal_rub=subtotal,
quantity=quantity,
quantity_discount_percent=discount_pct,
total_rub=total,
estimated_days=estimated_days,
)

View File

@@ -0,0 +1,67 @@
import io
import logging
from minio import Minio
from minio.error import S3Error
from app.config import settings
logger = logging.getLogger("app.services.storage")
_client: Minio | None = None
def get_minio_client() -> Minio:
global _client
if _client is None:
logger.info("Initializing MinIO client: endpoint=%s, secure=%s", settings.MINIO_ENDPOINT, settings.MINIO_SECURE)
_client = Minio(
endpoint=settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=settings.MINIO_SECURE,
)
logger.info("MinIO client created, checking bucket '%s'...", settings.MINIO_BUCKET)
if not _client.bucket_exists(settings.MINIO_BUCKET):
_client.make_bucket(settings.MINIO_BUCKET)
logger.info("Created MinIO bucket: %s", settings.MINIO_BUCKET)
else:
logger.info("MinIO bucket '%s' already exists", settings.MINIO_BUCKET)
return _client
def upload_file(object_name: str, data: bytes, content_type: str = "application/octet-stream") -> str:
"""Upload file to MinIO. Returns the object name (key)."""
logger.info("Uploading to MinIO: object=%s, size=%d bytes, content_type=%s", object_name, len(data), content_type)
client = get_minio_client()
client.put_object(
bucket_name=settings.MINIO_BUCKET,
object_name=object_name,
data=io.BytesIO(data),
length=len(data),
content_type=content_type,
)
logger.info("Upload complete: %s/%s", settings.MINIO_BUCKET, object_name)
return object_name
def download_file(object_name: str) -> bytes:
"""Download file from MinIO."""
logger.info("Downloading from MinIO: %s/%s", settings.MINIO_BUCKET, object_name)
client = get_minio_client()
response = client.get_object(settings.MINIO_BUCKET, object_name)
try:
data = response.read()
logger.info("Download complete: %s, size=%d bytes", object_name, len(data))
return data
finally:
response.close()
response.release_conn()
def delete_file(object_name: str) -> None:
"""Delete file from MinIO."""
logger.info("Deleting from MinIO: %s/%s", settings.MINIO_BUCKET, object_name)
client = get_minio_client()
client.remove_object(settings.MINIO_BUCKET, object_name)
logger.info("Deleted: %s", object_name)

View File

@@ -0,0 +1,48 @@
import logging
import httpx
from app.config import settings
logger = logging.getLogger("app.services.telegram")
async def notify_new_order(
order_id: str,
client_name: str,
client_phone: str,
material_name: str,
total_rub: float,
comment: str | None = None,
) -> None:
"""Send a notification about a new order to Telegram."""
logger.info("=== Telegram notification ===")
logger.info("Order: %s, client: %s, phone: %s, material: %s, total: %.2f RUB",
order_id, client_name, client_phone, material_name, total_rub)
if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID:
logger.warning("Telegram credentials not configured (token=%s, chat_id=%s), skipping",
"set" if settings.TELEGRAM_BOT_TOKEN else "empty",
"set" if settings.TELEGRAM_CHAT_ID else "empty")
return
text = (
f"\U0001f195 Новый заказ #{order_id}\n"
f"Клиент: {client_name}\n"
f"Телефон: {client_phone}\n"
f"Материал: {material_name}\n"
f"Сумма: {total_rub} \u20bd\n"
f"Комментарий: {comment or '\u2014'}"
)
url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage"
logger.debug("Sending to Telegram: chat_id=%s, text_length=%d", settings.TELEGRAM_CHAT_ID, len(text))
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"chat_id": settings.TELEGRAM_CHAT_ID, "text": text})
logger.info("Telegram response: status=%d, body=%s", resp.status_code, resp.text[:200])
if resp.status_code != 200:
logger.error("Telegram API error: %s", resp.text)
except Exception:
logger.exception("Failed to send Telegram notification")