89 lines
2.7 KiB
Python
89 lines
2.7 KiB
Python
import numpy as np
|
|
|
|
from backend.app.models.activity import DataPoint, Interval
|
|
|
|
|
|
def detect_intervals(
|
|
data_points: list[DataPoint],
|
|
ftp: float | None = None,
|
|
min_duration: int = 30,
|
|
) -> list[Interval]:
|
|
"""
|
|
Auto-detect work/rest intervals based on power thresholds.
|
|
Work = power >= 88% FTP (sweetspot and above)
|
|
Rest = power < 55% FTP (active recovery)
|
|
"""
|
|
powers = [dp.power for dp in data_points]
|
|
if not any(p is not None for p in powers):
|
|
return []
|
|
|
|
if not ftp or ftp <= 0:
|
|
# Without FTP, use median power as threshold
|
|
valid_powers = [p for p in powers if p is not None and p > 0]
|
|
if not valid_powers:
|
|
return []
|
|
threshold_high = np.median(valid_powers) * 1.15
|
|
threshold_low = np.median(valid_powers) * 0.65
|
|
else:
|
|
threshold_high = ftp * 0.88
|
|
threshold_low = ftp * 0.55
|
|
|
|
intervals: list[Interval] = []
|
|
current_type: str | None = None
|
|
start_idx: int = 0
|
|
|
|
for i, dp in enumerate(data_points):
|
|
p = dp.power if dp.power is not None else 0
|
|
if p >= threshold_high:
|
|
new_type = "work"
|
|
elif p < threshold_low:
|
|
new_type = "rest"
|
|
else:
|
|
continue # tempo zone — don't break interval
|
|
|
|
if current_type is None:
|
|
current_type = new_type
|
|
start_idx = i
|
|
elif new_type != current_type:
|
|
interval = _build_interval(data_points, start_idx, i - 1, current_type, min_duration)
|
|
if interval:
|
|
intervals.append(interval)
|
|
current_type = new_type
|
|
start_idx = i
|
|
|
|
# Close last interval
|
|
if current_type is not None:
|
|
interval = _build_interval(data_points, start_idx, len(data_points) - 1, current_type, min_duration)
|
|
if interval:
|
|
intervals.append(interval)
|
|
|
|
return intervals
|
|
|
|
|
|
def _build_interval(
|
|
data_points: list[DataPoint],
|
|
start_idx: int,
|
|
end_idx: int,
|
|
interval_type: str,
|
|
min_duration: int,
|
|
) -> Interval | None:
|
|
segment = data_points[start_idx:end_idx + 1]
|
|
if len(segment) < min_duration:
|
|
return None
|
|
|
|
powers = [dp.power for dp in segment if dp.power is not None]
|
|
hrs = [dp.heart_rate for dp in segment if dp.heart_rate is not None]
|
|
|
|
duration = int((segment[-1].timestamp - segment[0].timestamp).total_seconds())
|
|
if duration < min_duration:
|
|
return None
|
|
|
|
return Interval(
|
|
start_ts=segment[0].timestamp,
|
|
end_ts=segment[-1].timestamp,
|
|
interval_type=interval_type,
|
|
avg_power=round(sum(powers) / len(powers), 1) if powers else None,
|
|
avg_hr=round(sum(hrs) / len(hrs)) if hrs else None,
|
|
duration=duration,
|
|
)
|