import hashlib import json import logging import time import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.models.app_settings import AppSettings logger = logging.getLogger("app.services.ai_advisor") SYSTEM_PROMPT = """ Ты — эксперт по 3D-печати из инженерных пластиков по технологии FDM. Твоя задача — рекомендовать оптимальный материал для печати на основе описания задачи клиента. Доступные материалы: {materials_json} Правила: 1. Всегда рекомендуй один основной материал и 1-2 альтернативы. 2. Учитывай: температурный режим, механические нагрузки, химическое воздействие, UV, влажность. 3. Если задача не подходит для FDM-печати (слишком мелкие детали, высокая точность) — честно скажи об этом. 4. Отвечай кратко, по делу, на русском языке. 5. Если клиент не указал критичные параметры — задай уточняющие вопросы. Формат ответа — строго JSON: {{ "recommended_material_id": , "recommended_material_name": "", "reasoning": "<обоснование на русском>", "alternatives": [{{"material_id": , "name": "", "why": "<причина>"}}], "questions": ["<вопрос, если нужна доп. информация>"] }} """ async def _get_setting(db: AsyncSession, key: str, default: str = "") -> str: result = await db.execute(select(AppSettings.value).where(AppSettings.key == key)) row = result.scalar_one_or_none() return row if row is not None else default def _make_proxy_signature(salt: str) -> tuple[str, int]: timestamp = int(time.time()) hash_input = f"{timestamp}{salt}".encode() signature = hashlib.sha256(hash_input).hexdigest() return signature, timestamp async def _call_via_proxy(proxy_url: str, proxy_salt: str, messages: list[dict]) -> str: """Send request through AI proxy.""" signature, timestamp = _make_proxy_signature(proxy_salt) headers = { "x-signature": signature, "x-timestamp": str(timestamp), } async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( f"{proxy_url.rstrip('/')}/generate_text", json={"messages": messages}, headers=headers, ) if resp.status_code != 200: logger.error("AI proxy error %d: %s", resp.status_code, resp.text[:500]) raise ValueError(f"AI proxy вернул ошибку: {resp.status_code}") data = resp.json() text = data.get("response", "") if not text: raise ValueError(f"AI proxy вернул пустой ответ (finish_reason={data.get('finish_reason')})") return text async def _call_direct(api_key: str, system: str, user_message: str) -> str: """Call Qwen via OpenAI-compatible DashScope API.""" url = f"{settings.QWEN_BASE_URL.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "model": settings.QWEN_MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user_message}, ], "temperature": 0.3, "max_tokens": 1024, } async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(url, json=payload, headers=headers) if resp.status_code != 200: logger.error("Qwen API error %d: %s", resp.status_code, resp.text[:500]) raise ValueError(f"Qwen API ошибка: {resp.status_code}") data = resp.json() choices = data.get("choices", []) if not choices: raise ValueError("Qwen API вернул пустой ответ") return choices[0].get("message", {}).get("content", "") def _parse_json_response(text: str) -> dict: try: return json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") + 1 if start != -1 and end > start: return json.loads(text[start:end]) raise ValueError("AI вернул невалидный JSON") async def get_material_recommendation( task_description: str, materials_data: list[dict], budget_preference: str = "optimal", file_info: dict | None = None, db: AsyncSession | None = None, ) -> dict: """Get material recommendation — via proxy or direct, based on DB settings.""" logger.info("=== AI Advisor request ===") logger.info("Task: %s", task_description) # Read settings from DB, fallback to config use_proxy = True proxy_url = settings.AI_PROXY_URL proxy_salt = settings.AI_PROXY_SALT direct_api_key = settings.QWEN_API_KEY if db: use_proxy_str = await _get_setting(db, "ai_use_proxy", "true") use_proxy = use_proxy_str.lower() in ("true", "1", "yes") db_proxy_url = await _get_setting(db, "ai_proxy_url") db_proxy_salt = await _get_setting(db, "ai_proxy_salt") db_api_key = await _get_setting(db, "ai_direct_api_key") if db_proxy_url: proxy_url = db_proxy_url if db_proxy_salt: proxy_salt = db_proxy_salt if db_api_key: direct_api_key = db_api_key materials_json = json.dumps(materials_data, ensure_ascii=False, indent=2) system = SYSTEM_PROMPT.format(materials_json=materials_json) user_message = f"Описание задачи: {task_description}\nПредпочтение по бюджету: {budget_preference}" if file_info: user_message += f"\nИнформация о модели: {json.dumps(file_info, ensure_ascii=False)}" start_time = time.time() if use_proxy: logger.info("Mode: PROXY (%s)", proxy_url) if not proxy_url: raise ValueError("AI proxy URL не настроен") messages = [{"role": "user", "content": system + "\n\n" + user_message}] response_text = await _call_via_proxy(proxy_url, proxy_salt, messages) else: logger.info("Mode: DIRECT (Qwen API, model=%s)", settings.QWEN_MODEL) if not direct_api_key: raise ValueError("Qwen API Key не настроен (QWEN_API_KEY)") response_text = await _call_direct(direct_api_key, system, user_message) elapsed = time.time() - start_time logger.info("AI responded in %.2f seconds", elapsed) logger.debug("Raw response (%d chars): %s", len(response_text), response_text[:500]) result = _parse_json_response(response_text) logger.info("Recommended: id=%s, name=%s", result.get("recommended_material_id"), result.get("recommended_material_name")) logger.info("=== AI Advisor complete ===") return result