This commit is contained in:
xds
2026-03-16 14:46:20 +03:00
parent 00db55720c
commit de8c2472e2
45 changed files with 3714 additions and 140 deletions

View File

@@ -0,0 +1,62 @@
"""Generate AI activity summaries using Gemini."""
from backend.app.models.activity import Activity, ActivityMetrics
from backend.app.services.gemini_client import chat_async
SYSTEM_PROMPT = """You are VeloBrain, an AI cycling coach.
Analyze the cycling activity data and provide a concise, insightful summary in 3-5 sentences.
Focus on: performance highlights, pacing strategy, areas for improvement, and training effect.
Be specific with numbers. Use a friendly, coaching tone.
Respond in Russian.
Use a HTML text formatting."""
def _build_activity_prompt(activity: Activity, rider_ftp: float | None = None) -> str:
m: ActivityMetrics | None = activity.metrics
lines = [
f"Activity: {activity.name or 'Ride'}",
f"Type: {activity.activity_type}",
f"Duration: {activity.duration // 3600}h {(activity.duration % 3600) // 60}m",
]
if activity.distance:
lines.append(f"Distance: {activity.distance / 1000:.1f} km")
if activity.elevation_gain:
lines.append(f"Elevation: {activity.elevation_gain:.0f} m")
if m:
if m.avg_power:
lines.append(f"Avg Power: {m.avg_power:.0f} W")
if m.normalized_power:
lines.append(f"Normalized Power: {m.normalized_power:.0f} W")
if m.tss:
lines.append(f"TSS: {m.tss:.0f}")
if m.intensity_factor:
lines.append(f"IF: {m.intensity_factor:.2f}")
if m.variability_index:
lines.append(f"VI: {m.variability_index:.2f}")
if m.avg_hr:
lines.append(f"Avg HR: {m.avg_hr} bpm")
if m.max_hr:
lines.append(f"Max HR: {m.max_hr} bpm")
if m.avg_cadence:
lines.append(f"Avg Cadence: {m.avg_cadence} rpm")
if rider_ftp:
lines.append(f"Rider FTP: {rider_ftp:.0f} W")
intervals = activity.intervals or []
work_intervals = [i for i in intervals if i.interval_type == "work"]
if work_intervals:
lines.append(f"Work intervals: {len(work_intervals)}")
powers = [i.avg_power for i in work_intervals if i.avg_power]
if powers:
lines.append(f"Interval avg powers: {', '.join(f'{p:.0f}W' for p in powers)}")
return "\n".join(lines)
async def generate_summary(activity: Activity, rider_ftp: float | None = None) -> str:
prompt = _build_activity_prompt(activity, rider_ftp)
messages = [{"role": "user", "text": prompt}]
return await chat_async(messages, system_instruction=SYSTEM_PROMPT, temperature=0.5)

View File

@@ -0,0 +1,461 @@
"""AI Coaching service — onboarding, plan generation, chat."""
import json
import re
from datetime import date, datetime, timedelta
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.models.activity import Activity, ActivityMetrics
from backend.app.models.coaching import CoachingChat
from backend.app.models.fitness import FitnessHistory, PowerCurve
from backend.app.models.rider import Rider
from backend.app.models.training import TrainingPlan
from backend.app.services.gemini_client import chat_async
ONBOARDING_SYSTEM = """You are VeloBrain AI Coach — a professional cycling coach.
You are conducting an onboarding interview with a new athlete.
Ask questions ONE AT A TIME, in a friendly conversational tone.
Keep responses short (2-3 sentences + question).
Questions to cover (in rough order):
1. Main cycling goal (fitness, racing, gran fondo, weight loss, etc.)
2. Target event/race (if any) and its date
3. Current weekly training volume (hours/week)
4. How many days per week they can train
5. Which days are available for training
6. Indoor trainer availability (smart trainer, basic, none)
7. Power meter availability
8. Any injuries or health concerns
9. Previous coaching or structured training experience
10. What they enjoy most about cycling
When ALL questions are answered, respond with your summary and then output EXACTLY this marker on a new line:
[ONBOARDING_COMPLETE]
Followed by a JSON block with the structured data:
```json
{
"goal": "...",
"target_event": "...",
"target_event_date": "...",
"hours_per_week": N,
"days_per_week": N,
"available_days": ["monday", ...],
"has_indoor_trainer": true/false,
"trainer_type": "smart/basic/none",
"has_power_meter": true/false,
"injuries": "...",
"coaching_experience": "...",
"enjoys": "..."
}
```
Respond in Russian."""
PLAN_GENERATION_SYSTEM = """You are VeloBrain AI Coach generating a structured training plan.
Based on the rider's profile, current fitness, and goals, create a detailed multi-week training plan.
Output ONLY a valid JSON block with this structure:
```json
{
"goal": "short goal description",
"description": "plan overview in 2-3 sentences",
"phase": "base/build/peak/recovery",
"duration_weeks": N,
"weeks": [
{
"week_number": 1,
"focus": "week focus description",
"target_tss": 300,
"target_hours": 8,
"days": [
{
"day": "monday",
"workout_type": "rest",
"title": "Rest Day",
"description": "",
"duration_minutes": 0,
"target_tss": 0,
"target_if": 0
}
]
}
]
}
```
workout_type options: rest, endurance, tempo, sweetspot, threshold, vo2max, sprint, recovery, race
Plan duration: 4-8 weeks based on goal.
Include progressive overload with recovery weeks every 3-4 weeks.
Adjust intensity based on rider's FTP and experience level.
All descriptions in Russian."""
ADJUSTMENT_SYSTEM = """You are VeloBrain AI Coach reviewing a training plan.
The rider's plan needs adjustment based on their recent performance, compliance, and fatigue.
Analyze the data provided and suggest specific changes to upcoming weeks.
When you've decided on adjustments, output:
[PLAN_ADJUSTED]
Followed by the updated weeks JSON.
Respond in Russian."""
GENERAL_CHAT_SYSTEM = """You are VeloBrain AI Coach — a knowledgeable and supportive cycling coach.
You have access to the rider's training data and can answer questions about:
- Training methodology and periodization
- Nutrition and recovery
- Equipment and bike fit
- Race strategy and pacing
- Interpreting their power/HR data
Be concise, specific, and actionable. Use the rider's actual data when relevant.
Respond in Russian."""
async def build_rider_context(rider: Rider, session: AsyncSession) -> str:
"""Build a concise context string with rider's current state."""
lines = [
f"Rider: {rider.name}",
f"FTP: {rider.ftp or 'not set'} W",
f"Weight: {rider.weight or 'not set'} kg",
f"LTHR: {rider.lthr or 'not set'} bpm",
f"Experience: {rider.experience_level or 'not set'}",
f"Goals: {rider.goals or 'not set'}",
]
if rider.ftp and rider.weight:
lines.append(f"W/kg: {rider.ftp / rider.weight:.2f}")
# Coaching profile
if rider.coaching_profile:
cp = rider.coaching_profile
lines.append(f"\nCoaching Profile:")
for k, v in cp.items():
lines.append(f" {k}: {v}")
# Fitness (latest CTL/ATL/TSB)
fh_query = (
select(FitnessHistory)
.where(FitnessHistory.rider_id == rider.id)
.order_by(FitnessHistory.date.desc())
.limit(1)
)
fh_result = await session.execute(fh_query)
fh = fh_result.scalar_one_or_none()
if fh:
lines.append(f"\nFitness: CTL={fh.ctl:.0f} ATL={fh.atl:.0f} TSB={fh.tsb:.0f}")
# Recent 4 weeks volume
four_weeks_ago = date.today() - timedelta(weeks=4)
vol_query = (
select(
Activity.date,
Activity.duration,
ActivityMetrics.tss,
)
.outerjoin(ActivityMetrics, ActivityMetrics.activity_id == Activity.id)
.where(Activity.rider_id == rider.id)
.where(Activity.date >= four_weeks_ago)
.order_by(Activity.date.desc())
)
vol_result = await session.execute(vol_query)
rides = list(vol_result.all())
if rides:
total_hours = sum(r.duration for r in rides) / 3600
total_tss = sum(float(r.tss or 0) for r in rides)
lines.append(f"\nLast 4 weeks: {len(rides)} rides, {total_hours:.1f}h, TSS={total_tss:.0f}")
lines.append(f"Avg/week: {total_hours / 4:.1f}h, TSS={total_tss / 4:.0f}")
# Personal records (from power curves)
pc_query = (
select(PowerCurve.curve_data)
.join(Activity, Activity.id == PowerCurve.activity_id)
.where(Activity.rider_id == rider.id)
)
pc_result = await session.execute(pc_query)
best: dict[int, int] = {}
for row in pc_result:
for dur_str, power in row.curve_data.items():
dur = int(dur_str)
if dur not in best or power > best[dur]:
best[dur] = power
if best:
pr_strs = []
for dur in sorted(best.keys()):
if dur < 60:
pr_strs.append(f"{dur}s={best[dur]}W")
elif dur < 3600:
pr_strs.append(f"{dur // 60}m={best[dur]}W")
else:
pr_strs.append(f"{dur // 3600}h={best[dur]}W")
lines.append(f"\nPower PRs: {', '.join(pr_strs)}")
# Active plan status
plan_query = (
select(TrainingPlan)
.where(TrainingPlan.rider_id == rider.id)
.where(TrainingPlan.status == "active")
.order_by(TrainingPlan.created_at.desc())
.limit(1)
)
plan_result = await session.execute(plan_query)
plan = plan_result.scalar_one_or_none()
if plan:
lines.append(f"\nActive plan: '{plan.goal}' ({plan.start_date} to {plan.end_date}), phase: {plan.phase}")
return "\n".join(lines)
async def process_chat_message(
rider: Rider,
chat_id,
user_message: str,
session: AsyncSession,
) -> str:
"""Process a user message in a coaching chat and return AI response."""
chat = await session.get(CoachingChat, chat_id)
if not chat or chat.rider_id != rider.id:
raise ValueError("Chat not found")
# Build context
rider_context = await build_rider_context(rider, session)
# Select system prompt
system_prompts = {
"onboarding": ONBOARDING_SYSTEM,
"general": GENERAL_CHAT_SYSTEM,
"adjustment": ADJUSTMENT_SYSTEM,
}
system = system_prompts.get(chat.chat_type, GENERAL_CHAT_SYSTEM)
system = f"{system}\n\n--- Rider Data ---\n{rider_context}"
# Build message history
messages = list(chat.messages_json or [])
messages.append({"role": "user", "text": user_message})
# Call Gemini
gemini_messages = [{"role": m["role"], "text": m["text"]} for m in messages]
response = await chat_async(gemini_messages, system_instruction=system, temperature=0.7)
# Save messages
now = datetime.utcnow().isoformat()
messages_to_save = list(chat.messages_json or [])
messages_to_save.append({"role": "user", "text": user_message, "timestamp": now})
messages_to_save.append({"role": "model", "text": response, "timestamp": now})
chat.messages_json = messages_to_save
# Check for onboarding completion
if chat.chat_type == "onboarding" and "[ONBOARDING_COMPLETE]" in response:
chat.status = "completed"
profile_data = _extract_json(response)
if profile_data:
rider.coaching_profile = profile_data
rider.onboarding_completed = True
if profile_data.get("goal"):
rider.goals = profile_data["goal"]
# Check for plan adjustment
if chat.chat_type == "adjustment" and "[PLAN_ADJUSTED]" in response:
chat.status = "completed"
plan_data = _extract_json(response)
if plan_data:
plan_query = (
select(TrainingPlan)
.where(TrainingPlan.rider_id == rider.id)
.where(TrainingPlan.status == "active")
.order_by(TrainingPlan.created_at.desc())
.limit(1)
)
plan_result = await session.execute(plan_query)
plan = plan_result.scalar_one_or_none()
if plan and "weeks" in plan_data:
current = plan.weeks_json or {}
current["weeks"] = plan_data["weeks"]
plan.weeks_json = current
await session.commit()
return response
async def generate_plan(rider: Rider, session: AsyncSession) -> TrainingPlan:
"""Generate a new training plan using AI."""
rider_context = await build_rider_context(rider, session)
prompt = f"Generate a training plan for this rider.\n\n{rider_context}"
messages = [{"role": "user", "text": prompt}]
response = await chat_async(
messages,
system_instruction=PLAN_GENERATION_SYSTEM,
temperature=0.5,
)
plan_data = _extract_json(response)
if not plan_data or "weeks" not in plan_data:
raise ValueError("Failed to parse plan from AI response")
# Cancel existing active plans
existing_query = (
select(TrainingPlan)
.where(TrainingPlan.rider_id == rider.id)
.where(TrainingPlan.status == "active")
)
existing_result = await session.execute(existing_query)
for old_plan in existing_result.scalars().all():
old_plan.status = "cancelled"
duration_weeks = plan_data.get("duration_weeks", len(plan_data["weeks"]))
start = date.today()
# Align to next Monday
days_until_monday = (7 - start.weekday()) % 7
if days_until_monday == 0:
days_until_monday = 0
start = start + timedelta(days=days_until_monday)
plan = TrainingPlan(
rider_id=rider.id,
goal=plan_data.get("goal", rider.goals or "General fitness"),
start_date=start,
end_date=start + timedelta(weeks=duration_weeks),
phase=plan_data.get("phase", "base"),
weeks_json=plan_data,
description=plan_data.get("description", ""),
status="active",
onboarding_data=rider.coaching_profile,
)
session.add(plan)
await session.commit()
await session.refresh(plan)
return plan
async def get_today_workout(rider: Rider, session: AsyncSession) -> dict | None:
"""Get today's planned workout from the active plan."""
plan_query = (
select(TrainingPlan)
.where(TrainingPlan.rider_id == rider.id)
.where(TrainingPlan.status == "active")
.order_by(TrainingPlan.created_at.desc())
.limit(1)
)
result = await session.execute(plan_query)
plan = result.scalar_one_or_none()
if not plan or not plan.weeks_json:
return None
today = date.today()
if today < plan.start_date or today > plan.end_date:
return None
week_num = (today - plan.start_date).days // 7 + 1
day_name = today.strftime("%A").lower()
weeks = plan.weeks_json.get("weeks", [])
for week in weeks:
if week.get("week_number") == week_num:
for day in week.get("days", []):
if day.get("day") == day_name:
return {
"plan_id": str(plan.id),
"plan_goal": plan.goal,
"week_number": week_num,
"week_focus": week.get("focus", ""),
**day,
}
return None
async def calculate_compliance(plan: TrainingPlan, session: AsyncSession) -> list[dict]:
"""Compare planned vs actual per week."""
if not plan.weeks_json:
return []
weeks = plan.weeks_json.get("weeks", [])
results = []
for week in weeks:
week_num = week.get("week_number", 0)
week_start = plan.start_date + timedelta(weeks=week_num - 1)
week_end = week_start + timedelta(days=7)
# Skip future weeks
if week_start > date.today():
results.append({
"week_number": week_num,
"focus": week.get("focus", ""),
"planned_tss": week.get("target_tss", 0),
"actual_tss": 0,
"planned_hours": week.get("target_hours", 0),
"actual_hours": 0,
"planned_rides": sum(1 for d in week.get("days", []) if d.get("workout_type") != "rest"),
"actual_rides": 0,
"adherence_pct": 0,
"status": "upcoming",
})
continue
# Get actual activities in this week
act_query = (
select(Activity, ActivityMetrics.tss)
.outerjoin(ActivityMetrics, ActivityMetrics.activity_id == Activity.id)
.where(Activity.rider_id == plan.rider_id)
.where(Activity.date >= week_start)
.where(Activity.date < week_end)
)
act_result = await session.execute(act_query)
acts = list(act_result.all())
actual_tss = sum(float(r.tss or 0) for r in acts)
actual_hours = sum(r[0].duration for r in acts) / 3600
actual_rides = len(acts)
planned_rides = sum(1 for d in week.get("days", []) if d.get("workout_type") != "rest")
planned_tss = week.get("target_tss", 0)
adherence = 0
if planned_rides > 0:
adherence = min(100, round(actual_rides / planned_rides * 100))
is_current = week_start <= date.today() < week_end
results.append({
"week_number": week_num,
"focus": week.get("focus", ""),
"planned_tss": planned_tss,
"actual_tss": round(actual_tss, 0),
"planned_hours": week.get("target_hours", 0),
"actual_hours": round(actual_hours, 1),
"planned_rides": planned_rides,
"actual_rides": actual_rides,
"adherence_pct": adherence,
"status": "current" if is_current else "completed",
})
return results
def _extract_json(text: str) -> dict | None:
"""Extract JSON from AI response text."""
# Try to find JSON in code blocks
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
pass
# Try to find raw JSON object
match = re.search(r"\{[\s\S]*\}", text)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None

View File

@@ -0,0 +1,107 @@
"""CTL / ATL / TSB (Fitness / Fatigue / Form) calculation service."""
from datetime import date, timedelta
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.models.activity import Activity, ActivityMetrics
from backend.app.models.fitness import FitnessHistory
CTL_DAYS = 42 # Chronic Training Load time constant
ATL_DAYS = 7 # Acute Training Load time constant
async def rebuild_fitness_history(
rider_id,
session: AsyncSession,
days_back: int = 365,
) -> list[FitnessHistory]:
"""Rebuild CTL/ATL/TSB for a rider from scratch using exponential moving averages."""
end_date = date.today()
start_date = end_date - timedelta(days=days_back)
# Get all activities with TSS in the date range
query = (
select(
func.date(Activity.date).label("activity_date"),
func.sum(ActivityMetrics.tss).label("daily_tss"),
)
.join(ActivityMetrics, ActivityMetrics.activity_id == Activity.id)
.where(Activity.rider_id == rider_id)
.where(Activity.date >= start_date)
.where(ActivityMetrics.tss.isnot(None))
.group_by(func.date(Activity.date))
.order_by(func.date(Activity.date))
)
result = await session.execute(query)
tss_by_date: dict[date, float] = {}
for row in result:
tss_by_date[row.activity_date] = float(row.daily_tss)
# Delete existing history for this rider
existing = await session.execute(
select(FitnessHistory).where(FitnessHistory.rider_id == rider_id)
)
for entry in existing.scalars().all():
await session.delete(entry)
# Calculate EMA-based CTL/ATL/TSB
ctl = 0.0
atl = 0.0
prev_ctl = 0.0
entries: list[FitnessHistory] = []
current = start_date
while current <= end_date:
daily_tss = tss_by_date.get(current, 0.0)
ctl = ctl + (daily_tss - ctl) / CTL_DAYS
atl = atl + (daily_tss - atl) / ATL_DAYS
tsb = ctl - atl
ramp_rate = ctl - prev_ctl
entry = FitnessHistory(
rider_id=rider_id,
date=current,
ctl=round(ctl, 1),
atl=round(atl, 1),
tsb=round(tsb, 1),
ramp_rate=round(ramp_rate, 2),
)
entries.append(entry)
prev_ctl = ctl
current += timedelta(days=1)
session.add_all(entries)
await session.flush()
return entries
async def get_fitness_history(
rider_id,
session: AsyncSession,
days: int = 90,
) -> list[FitnessHistory]:
"""Get fitness history for a rider, rebuilding if needed."""
cutoff = date.today() - timedelta(days=days)
query = (
select(FitnessHistory)
.where(FitnessHistory.rider_id == rider_id)
.where(FitnessHistory.date >= cutoff)
.order_by(FitnessHistory.date)
)
result = await session.execute(query)
entries = list(result.scalars().all())
# If no data or stale, rebuild
if not entries or entries[-1].date < date.today() - timedelta(days=1):
all_entries = await rebuild_fitness_history(rider_id, session)
entries = [e for e in all_entries if e.date >= cutoff]
return entries

View File

@@ -0,0 +1,88 @@
import numpy as np
from backend.app.models.activity import DataPoint, Interval
def detect_intervals(
data_points: list[DataPoint],
ftp: float | None = None,
min_duration: int = 30,
) -> list[Interval]:
"""
Auto-detect work/rest intervals based on power thresholds.
Work = power >= 88% FTP (sweetspot and above)
Rest = power < 55% FTP (active recovery)
"""
powers = [dp.power for dp in data_points]
if not any(p is not None for p in powers):
return []
if not ftp or ftp <= 0:
# Without FTP, use median power as threshold
valid_powers = [p for p in powers if p is not None and p > 0]
if not valid_powers:
return []
threshold_high = np.median(valid_powers) * 1.15
threshold_low = np.median(valid_powers) * 0.65
else:
threshold_high = ftp * 0.88
threshold_low = ftp * 0.55
intervals: list[Interval] = []
current_type: str | None = None
start_idx: int = 0
for i, dp in enumerate(data_points):
p = dp.power if dp.power is not None else 0
if p >= threshold_high:
new_type = "work"
elif p < threshold_low:
new_type = "rest"
else:
continue # tempo zone — don't break interval
if current_type is None:
current_type = new_type
start_idx = i
elif new_type != current_type:
interval = _build_interval(data_points, start_idx, i - 1, current_type, min_duration)
if interval:
intervals.append(interval)
current_type = new_type
start_idx = i
# Close last interval
if current_type is not None:
interval = _build_interval(data_points, start_idx, len(data_points) - 1, current_type, min_duration)
if interval:
intervals.append(interval)
return intervals
def _build_interval(
data_points: list[DataPoint],
start_idx: int,
end_idx: int,
interval_type: str,
min_duration: int,
) -> Interval | None:
segment = data_points[start_idx:end_idx + 1]
if len(segment) < min_duration:
return None
powers = [dp.power for dp in segment if dp.power is not None]
hrs = [dp.heart_rate for dp in segment if dp.heart_rate is not None]
duration = int((segment[-1].timestamp - segment[0].timestamp).total_seconds())
if duration < min_duration:
return None
return Interval(
start_ts=segment[0].timestamp,
end_ts=segment[-1].timestamp,
interval_type=interval_type,
avg_power=round(sum(powers) / len(powers), 1) if powers else None,
avg_hr=round(sum(hrs) / len(hrs)) if hrs else None,
duration=duration,
)

View File

@@ -1,20 +1,14 @@
import uuid
import numpy as np
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.models.activity import Activity, ActivityMetrics, DataPoint
from backend.app.models.rider import Rider
def calculate_metrics(
data_points: list[DataPoint],
activity: Activity,
rider_id: uuid.UUID,
session: AsyncSession,
ftp: float | None = None,
) -> ActivityMetrics | None:
"""Calculate power-based metrics for an activity."""
"""Calculate all power/HR-based metrics for an activity."""
if not data_points:
return None
@@ -23,61 +17,60 @@ def calculate_metrics(
cadences = np.array([dp.cadence for dp in data_points if dp.cadence is not None], dtype=float)
speeds = np.array([dp.speed for dp in data_points if dp.speed is not None], dtype=float)
avg_power = float(np.mean(powers)) if len(powers) > 0 else None
max_power = int(np.max(powers)) if len(powers) > 0 else None
has_power = len(powers) > 0
has_hr = len(hrs) > 0
avg_power = float(np.mean(powers)) if has_power else None
max_power = int(np.max(powers)) if has_power else None
np_value = _normalized_power(powers) if len(powers) >= 30 else avg_power
avg_hr = int(np.mean(hrs)) if len(hrs) > 0 else None
max_hr = int(np.max(hrs)) if len(hrs) > 0 else None
avg_hr = int(np.mean(hrs)) if has_hr else None
max_hr = int(np.max(hrs)) if has_hr else None
avg_cadence = int(np.mean(cadences)) if len(cadences) > 0 else None
avg_speed = float(np.mean(speeds)) if len(speeds) > 0 else None
# IF, VI, TSS require FTP — will be None if no FTP set
intensity_factor = None
# Variability Index
variability_index = None
tss = None
if np_value and avg_power and avg_power > 0:
variability_index = np_value / avg_power
variability_index = round(np_value / avg_power, 2)
# FTP-dependent metrics
intensity_factor = None
tss = None
if np_value and ftp and ftp > 0:
intensity_factor = round(np_value / ftp, 2)
tss = round(
(activity.duration * np_value * (np_value / ftp))
/ (ftp * 3600)
* 100,
1,
)
# Efficiency Factor: NP / avg HR (aerobic decoupling indicator)
calories = None
if has_power:
# Rough estimate: 1 kJ ≈ 1 kcal, power in watts * seconds / 1000
calories = int(np.sum(powers) / 1000)
return ActivityMetrics(
activity_id=activity.id,
tss=tss,
normalized_power=round(np_value, 1) if np_value else None,
intensity_factor=intensity_factor,
variability_index=round(variability_index, 2) if variability_index else None,
variability_index=variability_index,
avg_power=round(avg_power, 1) if avg_power else None,
max_power=max_power,
avg_hr=avg_hr,
max_hr=max_hr,
avg_cadence=avg_cadence,
avg_speed=round(avg_speed, 2) if avg_speed else None,
calories=calories,
)
def calculate_metrics_with_ftp(
metrics: ActivityMetrics,
ftp: float,
duration_seconds: int,
) -> ActivityMetrics:
"""Enrich metrics with FTP-dependent values (IF, TSS)."""
if metrics.normalized_power and ftp > 0:
metrics.intensity_factor = round(metrics.normalized_power / ftp, 2)
metrics.tss = round(
(duration_seconds * metrics.normalized_power * metrics.intensity_factor)
/ (ftp * 3600)
* 100,
1,
)
return metrics
def _normalized_power(powers: np.ndarray) -> float:
"""
NP = 4th root of mean of 4th powers of 30s rolling average.
"""
"""NP = 4th root of mean of (30s rolling average)^4."""
if len(powers) < 30:
return float(np.mean(powers))
rolling = np.convolve(powers, np.ones(30) / 30, mode="valid")
return float(np.power(np.mean(np.power(rolling, 4)), 0.25))

View File

@@ -0,0 +1,30 @@
import numpy as np
from backend.app.models.activity import DataPoint
# Standard durations for the power duration curve
DURATIONS = [1, 5, 10, 15, 30, 60, 120, 300, 600, 1200, 1800, 3600]
def calculate_power_curve(data_points: list[DataPoint]) -> dict[int, int]:
"""
Calculate max average power for standard durations.
Returns {duration_seconds: max_avg_power}.
"""
powers = np.array([dp.power for dp in data_points if dp.power is not None], dtype=float)
if len(powers) == 0:
return {}
result = {}
for dur in DURATIONS:
if dur > len(powers):
break
if dur == 1:
result[dur] = int(np.max(powers))
else:
# Rolling mean via cumsum for efficiency
cumsum = np.cumsum(np.insert(powers, 0, 0))
rolling_avg = (cumsum[dur:] - cumsum[:-dur]) / dur
result[dur] = int(np.max(rolling_avg))
return result

View File

@@ -0,0 +1,75 @@
import numpy as np
from backend.app.models.activity import DataPoint
# Coggan 7-zone power model (% of FTP)
POWER_ZONES = [
{"zone": 1, "name": "Active Recovery", "min_pct": 0, "max_pct": 55},
{"zone": 2, "name": "Endurance", "min_pct": 55, "max_pct": 75},
{"zone": 3, "name": "Tempo", "min_pct": 75, "max_pct": 90},
{"zone": 4, "name": "Threshold", "min_pct": 90, "max_pct": 105},
{"zone": 5, "name": "VO2max", "min_pct": 105, "max_pct": 120},
{"zone": 6, "name": "Anaerobic", "min_pct": 120, "max_pct": 150},
{"zone": 7, "name": "Neuromuscular", "min_pct": 150, "max_pct": 10000},
]
# 5-zone HR model (% of LTHR)
HR_ZONES = [
{"zone": 1, "name": "Recovery", "min_pct": 0, "max_pct": 81},
{"zone": 2, "name": "Aerobic", "min_pct": 81, "max_pct": 90},
{"zone": 3, "name": "Tempo", "min_pct": 90, "max_pct": 95},
{"zone": 4, "name": "Threshold", "min_pct": 95, "max_pct": 100},
{"zone": 5, "name": "Anaerobic", "min_pct": 100, "max_pct": 10000},
]
def calculate_power_zones(
data_points: list[DataPoint],
ftp: float,
) -> list[dict]:
"""Calculate time-in-zone distribution for power."""
powers = np.array([dp.power for dp in data_points if dp.power is not None], dtype=float)
if len(powers) == 0 or ftp <= 0:
return []
total = len(powers)
result = []
for z in POWER_ZONES:
low = ftp * z["min_pct"] / 100
high = ftp * z["max_pct"] / 100
seconds = int(np.sum((powers >= low) & (powers < high)))
result.append({
"zone": z["zone"],
"name": z["name"],
"min_watts": round(low),
"max_watts": round(high) if z["max_pct"] < 10000 else None,
"seconds": seconds,
"percentage": round(seconds / total * 100, 1) if total > 0 else 0,
})
return result
def calculate_hr_zones(
data_points: list[DataPoint],
lthr: int,
) -> list[dict]:
"""Calculate time-in-zone distribution for heart rate."""
hrs = np.array([dp.heart_rate for dp in data_points if dp.heart_rate is not None], dtype=float)
if len(hrs) == 0 or lthr <= 0:
return []
total = len(hrs)
result = []
for z in HR_ZONES:
low = lthr * z["min_pct"] / 100
high = lthr * z["max_pct"] / 100
seconds = int(np.sum((hrs >= low) & (hrs < high)))
result.append({
"zone": z["zone"],
"name": z["name"],
"min_bpm": round(low),
"max_bpm": round(high) if z["max_pct"] < 10000 else None,
"seconds": seconds,
"percentage": round(seconds / total * 100, 1) if total > 0 else 0,
})
return result