Files
sport-platform/backend/app/services/intervals.py
2026-03-16 14:46:20 +03:00

89 lines
2.7 KiB
Python

import numpy as np
from backend.app.models.activity import DataPoint, Interval
def detect_intervals(
data_points: list[DataPoint],
ftp: float | None = None,
min_duration: int = 30,
) -> list[Interval]:
"""
Auto-detect work/rest intervals based on power thresholds.
Work = power >= 88% FTP (sweetspot and above)
Rest = power < 55% FTP (active recovery)
"""
powers = [dp.power for dp in data_points]
if not any(p is not None for p in powers):
return []
if not ftp or ftp <= 0:
# Without FTP, use median power as threshold
valid_powers = [p for p in powers if p is not None and p > 0]
if not valid_powers:
return []
threshold_high = np.median(valid_powers) * 1.15
threshold_low = np.median(valid_powers) * 0.65
else:
threshold_high = ftp * 0.88
threshold_low = ftp * 0.55
intervals: list[Interval] = []
current_type: str | None = None
start_idx: int = 0
for i, dp in enumerate(data_points):
p = dp.power if dp.power is not None else 0
if p >= threshold_high:
new_type = "work"
elif p < threshold_low:
new_type = "rest"
else:
continue # tempo zone — don't break interval
if current_type is None:
current_type = new_type
start_idx = i
elif new_type != current_type:
interval = _build_interval(data_points, start_idx, i - 1, current_type, min_duration)
if interval:
intervals.append(interval)
current_type = new_type
start_idx = i
# Close last interval
if current_type is not None:
interval = _build_interval(data_points, start_idx, len(data_points) - 1, current_type, min_duration)
if interval:
intervals.append(interval)
return intervals
def _build_interval(
data_points: list[DataPoint],
start_idx: int,
end_idx: int,
interval_type: str,
min_duration: int,
) -> Interval | None:
segment = data_points[start_idx:end_idx + 1]
if len(segment) < min_duration:
return None
powers = [dp.power for dp in segment if dp.power is not None]
hrs = [dp.heart_rate for dp in segment if dp.heart_rate is not None]
duration = int((segment[-1].timestamp - segment[0].timestamp).total_seconds())
if duration < min_duration:
return None
return Interval(
start_ts=segment[0].timestamp,
end_ts=segment[-1].timestamp,
interval_type=interval_type,
avg_power=round(sum(powers) / len(powers), 1) if powers else None,
avg_hr=round(sum(hrs) / len(hrs)) if hrs else None,
duration=duration,
)