"""AI Coaching service — onboarding, plan generation, chat.""" import json import re from datetime import date, datetime, timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.models.activity import Activity, ActivityMetrics from backend.app.models.coaching import CoachingChat from backend.app.models.fitness import FitnessHistory, PowerCurve from backend.app.models.rider import Rider from backend.app.models.training import TrainingPlan from backend.app.services.gemini_client import chat_async ONBOARDING_SYSTEM = """You are VeloBrain AI Coach — a professional cycling coach. You are conducting an onboarding interview with a new athlete. Ask questions ONE AT A TIME, in a friendly conversational tone. Keep responses short (2-3 sentences + question). Questions to cover (in rough order): 1. Main cycling goal (fitness, racing, gran fondo, weight loss, etc.) 2. Target event/race (if any) and its date 3. Current weekly training volume (hours/week) 4. How many days per week they can train 5. Which days are available for training 6. Indoor trainer availability (smart trainer, basic, none) 7. Power meter availability 8. Any injuries or health concerns 9. Previous coaching or structured training experience 10. What they enjoy most about cycling When ALL questions are answered, respond with your summary and then output EXACTLY this marker on a new line: [ONBOARDING_COMPLETE] Followed by a JSON block with the structured data: ```json { "goal": "...", "target_event": "...", "target_event_date": "...", "hours_per_week": N, "days_per_week": N, "available_days": ["monday", ...], "has_indoor_trainer": true/false, "trainer_type": "smart/basic/none", "has_power_meter": true/false, "injuries": "...", "coaching_experience": "...", "enjoys": "..." } ``` Respond in Russian.""" PLAN_GENERATION_SYSTEM = """You are VeloBrain AI Coach generating a structured training plan. Based on the rider's profile, current fitness, and goals, create a detailed multi-week training plan. Output ONLY a valid JSON block with this structure: ```json { "goal": "short goal description", "description": "plan overview in 2-3 sentences", "phase": "base/build/peak/recovery", "duration_weeks": N, "weeks": [ { "week_number": 1, "focus": "week focus description", "target_tss": 300, "target_hours": 8, "days": [ { "day": "monday", "workout_type": "rest", "title": "Rest Day", "description": "", "duration_minutes": 0, "target_tss": 0, "target_if": 0 } ] } ] } ``` workout_type options: rest, endurance, tempo, sweetspot, threshold, vo2max, sprint, recovery, race Plan duration: 4-8 weeks based on goal. Include progressive overload with recovery weeks every 3-4 weeks. Adjust intensity based on rider's FTP and experience level. All descriptions in Russian.""" ADJUSTMENT_SYSTEM = """You are VeloBrain AI Coach reviewing a training plan. The rider's plan needs adjustment based on their recent performance, compliance, and fatigue. Analyze the data provided and suggest specific changes to upcoming weeks. When you've decided on adjustments, output: [PLAN_ADJUSTED] Followed by the updated weeks JSON. Respond in Russian.""" GENERAL_CHAT_SYSTEM = """You are VeloBrain AI Coach — a knowledgeable and supportive cycling coach. You have access to the rider's training data and can answer questions about: - Training methodology and periodization - Nutrition and recovery - Equipment and bike fit - Race strategy and pacing - Interpreting their power/HR data Be concise, specific, and actionable. Use the rider's actual data when relevant. Respond in Russian.""" async def build_rider_context(rider: Rider, session: AsyncSession) -> str: """Build a concise context string with rider's current state.""" lines = [ f"Rider: {rider.name}", f"FTP: {rider.ftp or 'not set'} W", f"Weight: {rider.weight or 'not set'} kg", f"LTHR: {rider.lthr or 'not set'} bpm", f"Experience: {rider.experience_level or 'not set'}", f"Goals: {rider.goals or 'not set'}", ] if rider.ftp and rider.weight: lines.append(f"W/kg: {rider.ftp / rider.weight:.2f}") # Coaching profile if rider.coaching_profile: cp = rider.coaching_profile lines.append(f"\nCoaching Profile:") for k, v in cp.items(): lines.append(f" {k}: {v}") # Fitness (latest CTL/ATL/TSB) fh_query = ( select(FitnessHistory) .where(FitnessHistory.rider_id == rider.id) .order_by(FitnessHistory.date.desc()) .limit(1) ) fh_result = await session.execute(fh_query) fh = fh_result.scalar_one_or_none() if fh: lines.append(f"\nFitness: CTL={fh.ctl:.0f} ATL={fh.atl:.0f} TSB={fh.tsb:.0f}") # Recent 4 weeks volume four_weeks_ago = date.today() - timedelta(weeks=4) vol_query = ( select( Activity.date, Activity.duration, ActivityMetrics.tss, ) .outerjoin(ActivityMetrics, ActivityMetrics.activity_id == Activity.id) .where(Activity.rider_id == rider.id) .where(Activity.date >= four_weeks_ago) .order_by(Activity.date.desc()) ) vol_result = await session.execute(vol_query) rides = list(vol_result.all()) if rides: total_hours = sum(r.duration for r in rides) / 3600 total_tss = sum(float(r.tss or 0) for r in rides) lines.append(f"\nLast 4 weeks: {len(rides)} rides, {total_hours:.1f}h, TSS={total_tss:.0f}") lines.append(f"Avg/week: {total_hours / 4:.1f}h, TSS={total_tss / 4:.0f}") # Personal records (from power curves) pc_query = ( select(PowerCurve.curve_data) .join(Activity, Activity.id == PowerCurve.activity_id) .where(Activity.rider_id == rider.id) ) pc_result = await session.execute(pc_query) best: dict[int, int] = {} for row in pc_result: for dur_str, power in row.curve_data.items(): dur = int(dur_str) if dur not in best or power > best[dur]: best[dur] = power if best: pr_strs = [] for dur in sorted(best.keys()): if dur < 60: pr_strs.append(f"{dur}s={best[dur]}W") elif dur < 3600: pr_strs.append(f"{dur // 60}m={best[dur]}W") else: pr_strs.append(f"{dur // 3600}h={best[dur]}W") lines.append(f"\nPower PRs: {', '.join(pr_strs)}") # Active plan status plan_query = ( select(TrainingPlan) .where(TrainingPlan.rider_id == rider.id) .where(TrainingPlan.status == "active") .order_by(TrainingPlan.created_at.desc()) .limit(1) ) plan_result = await session.execute(plan_query) plan = plan_result.scalar_one_or_none() if plan: lines.append(f"\nActive plan: '{plan.goal}' ({plan.start_date} to {plan.end_date}), phase: {plan.phase}") return "\n".join(lines) async def process_chat_message( rider: Rider, chat_id, user_message: str, session: AsyncSession, ) -> str: """Process a user message in a coaching chat and return AI response.""" chat = await session.get(CoachingChat, chat_id) if not chat or chat.rider_id != rider.id: raise ValueError("Chat not found") # Build context rider_context = await build_rider_context(rider, session) # Select system prompt system_prompts = { "onboarding": ONBOARDING_SYSTEM, "general": GENERAL_CHAT_SYSTEM, "adjustment": ADJUSTMENT_SYSTEM, } system = system_prompts.get(chat.chat_type, GENERAL_CHAT_SYSTEM) system = f"{system}\n\n--- Rider Data ---\n{rider_context}" # Build message history messages = list(chat.messages_json or []) messages.append({"role": "user", "text": user_message}) # Call Gemini gemini_messages = [{"role": m["role"], "text": m["text"]} for m in messages] response = await chat_async(gemini_messages, system_instruction=system, temperature=0.7) # Save messages now = datetime.utcnow().isoformat() messages_to_save = list(chat.messages_json or []) messages_to_save.append({"role": "user", "text": user_message, "timestamp": now}) messages_to_save.append({"role": "model", "text": response, "timestamp": now}) chat.messages_json = messages_to_save # Check for onboarding completion if chat.chat_type == "onboarding" and "[ONBOARDING_COMPLETE]" in response: chat.status = "completed" profile_data = _extract_json(response) if profile_data: rider.coaching_profile = profile_data rider.onboarding_completed = True if profile_data.get("goal"): rider.goals = profile_data["goal"] # Check for plan adjustment if chat.chat_type == "adjustment" and "[PLAN_ADJUSTED]" in response: chat.status = "completed" plan_data = _extract_json(response) if plan_data: plan_query = ( select(TrainingPlan) .where(TrainingPlan.rider_id == rider.id) .where(TrainingPlan.status == "active") .order_by(TrainingPlan.created_at.desc()) .limit(1) ) plan_result = await session.execute(plan_query) plan = plan_result.scalar_one_or_none() if plan and "weeks" in plan_data: current = plan.weeks_json or {} current["weeks"] = plan_data["weeks"] plan.weeks_json = current await session.commit() return response async def generate_plan(rider: Rider, session: AsyncSession) -> TrainingPlan: """Generate a new training plan using AI.""" rider_context = await build_rider_context(rider, session) prompt = f"Generate a training plan for this rider.\n\n{rider_context}" messages = [{"role": "user", "text": prompt}] response = await chat_async( messages, system_instruction=PLAN_GENERATION_SYSTEM, temperature=0.5, ) plan_data = _extract_json(response) if not plan_data or "weeks" not in plan_data: raise ValueError("Failed to parse plan from AI response") # Cancel existing active plans existing_query = ( select(TrainingPlan) .where(TrainingPlan.rider_id == rider.id) .where(TrainingPlan.status == "active") ) existing_result = await session.execute(existing_query) for old_plan in existing_result.scalars().all(): old_plan.status = "cancelled" duration_weeks = plan_data.get("duration_weeks", len(plan_data["weeks"])) start = date.today() # Align to next Monday days_until_monday = (7 - start.weekday()) % 7 if days_until_monday == 0: days_until_monday = 0 start = start + timedelta(days=days_until_monday) plan = TrainingPlan( rider_id=rider.id, goal=plan_data.get("goal", rider.goals or "General fitness"), start_date=start, end_date=start + timedelta(weeks=duration_weeks), phase=plan_data.get("phase", "base"), weeks_json=plan_data, description=plan_data.get("description", ""), status="active", onboarding_data=rider.coaching_profile, ) session.add(plan) await session.commit() await session.refresh(plan) return plan async def get_today_workout(rider: Rider, session: AsyncSession) -> dict | None: """Get today's planned workout from the active plan.""" plan_query = ( select(TrainingPlan) .where(TrainingPlan.rider_id == rider.id) .where(TrainingPlan.status == "active") .order_by(TrainingPlan.created_at.desc()) .limit(1) ) result = await session.execute(plan_query) plan = result.scalar_one_or_none() if not plan or not plan.weeks_json: return None today = date.today() if today < plan.start_date or today > plan.end_date: return None week_num = (today - plan.start_date).days // 7 + 1 day_name = today.strftime("%A").lower() weeks = plan.weeks_json.get("weeks", []) for week in weeks: if week.get("week_number") == week_num: for day in week.get("days", []): if day.get("day") == day_name: return { "plan_id": str(plan.id), "plan_goal": plan.goal, "week_number": week_num, "week_focus": week.get("focus", ""), **day, } return None async def calculate_compliance(plan: TrainingPlan, session: AsyncSession) -> list[dict]: """Compare planned vs actual per week, matching linked activities to specific days.""" if not plan.weeks_json: return [] weeks = plan.weeks_json.get("weeks", []) results = [] for week in weeks: week_num = week.get("week_number", 0) week_start = plan.start_date + timedelta(weeks=week_num - 1) week_end = week_start + timedelta(days=7) planned_days = [d for d in week.get("days", []) if d.get("workout_type") != "rest"] planned_rides = len(planned_days) planned_tss = week.get("target_tss", 0) # Skip future weeks if week_start > date.today(): results.append({ "week_number": week_num, "focus": week.get("focus", ""), "planned_tss": planned_tss, "actual_tss": 0, "planned_hours": week.get("target_hours", 0), "actual_hours": 0, "planned_rides": planned_rides, "actual_rides": 0, "adherence_pct": 0, "status": "upcoming", "days": [ { "day": d.get("day"), "planned": d.get("title", d.get("workout_type")), "workout_type": d.get("workout_type"), "activity_id": None, "completed": False, } for d in week.get("days", []) ], }) continue # Get actual activities in this week act_query = ( select(Activity, ActivityMetrics.tss) .outerjoin(ActivityMetrics, ActivityMetrics.activity_id == Activity.id) .where(Activity.rider_id == plan.rider_id) .where(Activity.date >= week_start) .where(Activity.date < week_end) ) act_result = await session.execute(act_query) acts = list(act_result.all()) actual_tss = sum(float(r.tss or 0) for r in acts) actual_hours = sum(r[0].duration for r in acts) / 3600 actual_rides = len(acts) # Build per-day status: match linked activities to planned days linked_activities = { a[0].plan_day: a[0] for a in acts if a[0].training_plan_id == plan.id and a[0].plan_week == week_num and a[0].plan_day } day_statuses = [] completed_workouts = 0 for d in week.get("days", []): day_name = d.get("day") is_rest = d.get("workout_type") == "rest" linked = linked_activities.get(day_name) completed = linked is not None and not is_rest if completed: completed_workouts += 1 day_statuses.append({ "day": day_name, "planned": d.get("title", d.get("workout_type")), "workout_type": d.get("workout_type"), "activity_id": str(linked.id) if linked else None, "completed": completed, }) adherence = 0 if planned_rides > 0: adherence = min(100, round(completed_workouts / planned_rides * 100)) is_current = week_start <= date.today() < week_end results.append({ "week_number": week_num, "focus": week.get("focus", ""), "planned_tss": planned_tss, "actual_tss": round(actual_tss, 0), "planned_hours": week.get("target_hours", 0), "actual_hours": round(actual_hours, 1), "planned_rides": planned_rides, "actual_rides": actual_rides, "adherence_pct": adherence, "status": "current" if is_current else "completed", "days": day_statuses, }) return results async def link_activity_to_plan( activity, rider_id, session: AsyncSession, ) -> None: """Auto-link an activity to the active training plan based on date. Finds the planned workout for the activity's date and sets training_plan_id, plan_week, plan_day on the activity. """ plan_query = ( select(TrainingPlan) .where(TrainingPlan.rider_id == rider_id) .where(TrainingPlan.status == "active") .order_by(TrainingPlan.created_at.desc()) .limit(1) ) result = await session.execute(plan_query) plan = result.scalar_one_or_none() if not plan or not plan.weeks_json: return activity_date = activity.date.date() if hasattr(activity.date, "date") else activity.date if activity_date < plan.start_date or activity_date > plan.end_date: return week_num = (activity_date - plan.start_date).days // 7 + 1 day_name = activity_date.strftime("%A").lower() weeks = plan.weeks_json.get("weeks", []) for week in weeks: if week.get("week_number") == week_num: for day in week.get("days", []): if day.get("day") == day_name and day.get("workout_type") != "rest": activity.training_plan_id = plan.id activity.plan_week = week_num activity.plan_day = day_name return def _extract_json(text: str) -> dict | None: """Extract JSON from AI response text.""" # Try to find JSON in code blocks match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass # Try to find raw JSON object match = re.search(r"\{[\s\S]*\}", text) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: pass return None