84 lines
2.9 KiB
Python
84 lines
2.9 KiB
Python
import uuid
|
|
|
|
import numpy as np
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from backend.app.models.activity import Activity, ActivityMetrics, DataPoint
|
|
from backend.app.models.rider import Rider
|
|
|
|
|
|
def calculate_metrics(
|
|
data_points: list[DataPoint],
|
|
activity: Activity,
|
|
rider_id: uuid.UUID,
|
|
session: AsyncSession,
|
|
) -> ActivityMetrics | None:
|
|
"""Calculate power-based metrics for an activity."""
|
|
if not data_points:
|
|
return None
|
|
|
|
powers = np.array([dp.power for dp in data_points if dp.power is not None], dtype=float)
|
|
hrs = np.array([dp.heart_rate for dp in data_points if dp.heart_rate is not None], dtype=float)
|
|
cadences = np.array([dp.cadence for dp in data_points if dp.cadence is not None], dtype=float)
|
|
speeds = np.array([dp.speed for dp in data_points if dp.speed is not None], dtype=float)
|
|
|
|
avg_power = float(np.mean(powers)) if len(powers) > 0 else None
|
|
max_power = int(np.max(powers)) if len(powers) > 0 else None
|
|
np_value = _normalized_power(powers) if len(powers) >= 30 else avg_power
|
|
|
|
avg_hr = int(np.mean(hrs)) if len(hrs) > 0 else None
|
|
max_hr = int(np.max(hrs)) if len(hrs) > 0 else None
|
|
avg_cadence = int(np.mean(cadences)) if len(cadences) > 0 else None
|
|
avg_speed = float(np.mean(speeds)) if len(speeds) > 0 else None
|
|
|
|
# IF, VI, TSS require FTP — will be None if no FTP set
|
|
intensity_factor = None
|
|
variability_index = None
|
|
tss = None
|
|
|
|
if np_value and avg_power and avg_power > 0:
|
|
variability_index = np_value / avg_power
|
|
|
|
return ActivityMetrics(
|
|
activity_id=activity.id,
|
|
tss=tss,
|
|
normalized_power=round(np_value, 1) if np_value else None,
|
|
intensity_factor=intensity_factor,
|
|
variability_index=round(variability_index, 2) if variability_index else None,
|
|
avg_power=round(avg_power, 1) if avg_power else None,
|
|
max_power=max_power,
|
|
avg_hr=avg_hr,
|
|
max_hr=max_hr,
|
|
avg_cadence=avg_cadence,
|
|
avg_speed=round(avg_speed, 2) if avg_speed else None,
|
|
)
|
|
|
|
|
|
def calculate_metrics_with_ftp(
|
|
metrics: ActivityMetrics,
|
|
ftp: float,
|
|
duration_seconds: int,
|
|
) -> ActivityMetrics:
|
|
"""Enrich metrics with FTP-dependent values (IF, TSS)."""
|
|
if metrics.normalized_power and ftp > 0:
|
|
metrics.intensity_factor = round(metrics.normalized_power / ftp, 2)
|
|
metrics.tss = round(
|
|
(duration_seconds * metrics.normalized_power * metrics.intensity_factor)
|
|
/ (ftp * 3600)
|
|
* 100,
|
|
1,
|
|
)
|
|
return metrics
|
|
|
|
|
|
def _normalized_power(powers: np.ndarray) -> float:
|
|
"""
|
|
NP = 4th root of mean of 4th powers of 30s rolling average.
|
|
"""
|
|
if len(powers) < 30:
|
|
return float(np.mean(powers))
|
|
|
|
rolling = np.convolve(powers, np.ones(30) / 30, mode="valid")
|
|
return float(np.power(np.mean(np.power(rolling, 4)), 0.25))
|