import uuid import numpy as np from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.models.activity import Activity, ActivityMetrics, DataPoint from backend.app.models.rider import Rider def calculate_metrics( data_points: list[DataPoint], activity: Activity, rider_id: uuid.UUID, session: AsyncSession, ) -> ActivityMetrics | None: """Calculate power-based metrics for an activity.""" if not data_points: return None powers = np.array([dp.power for dp in data_points if dp.power is not None], dtype=float) hrs = np.array([dp.heart_rate for dp in data_points if dp.heart_rate is not None], dtype=float) cadences = np.array([dp.cadence for dp in data_points if dp.cadence is not None], dtype=float) speeds = np.array([dp.speed for dp in data_points if dp.speed is not None], dtype=float) avg_power = float(np.mean(powers)) if len(powers) > 0 else None max_power = int(np.max(powers)) if len(powers) > 0 else None np_value = _normalized_power(powers) if len(powers) >= 30 else avg_power avg_hr = int(np.mean(hrs)) if len(hrs) > 0 else None max_hr = int(np.max(hrs)) if len(hrs) > 0 else None avg_cadence = int(np.mean(cadences)) if len(cadences) > 0 else None avg_speed = float(np.mean(speeds)) if len(speeds) > 0 else None # IF, VI, TSS require FTP — will be None if no FTP set intensity_factor = None variability_index = None tss = None if np_value and avg_power and avg_power > 0: variability_index = np_value / avg_power return ActivityMetrics( activity_id=activity.id, tss=tss, normalized_power=round(np_value, 1) if np_value else None, intensity_factor=intensity_factor, variability_index=round(variability_index, 2) if variability_index else None, avg_power=round(avg_power, 1) if avg_power else None, max_power=max_power, avg_hr=avg_hr, max_hr=max_hr, avg_cadence=avg_cadence, avg_speed=round(avg_speed, 2) if avg_speed else None, ) def calculate_metrics_with_ftp( metrics: ActivityMetrics, ftp: float, duration_seconds: int, ) -> ActivityMetrics: """Enrich metrics with FTP-dependent values (IF, TSS).""" if metrics.normalized_power and ftp > 0: metrics.intensity_factor = round(metrics.normalized_power / ftp, 2) metrics.tss = round( (duration_seconds * metrics.normalized_power * metrics.intensity_factor) / (ftp * 3600) * 100, 1, ) return metrics def _normalized_power(powers: np.ndarray) -> float: """ NP = 4th root of mean of 4th powers of 30s rolling average. """ if len(powers) < 30: return float(np.mean(powers)) rolling = np.convolve(powers, np.ones(30) / 30, mode="valid") return float(np.power(np.mean(np.power(rolling, 4)), 0.25))