import numpy as np from backend.app.models.activity import DataPoint, Interval def detect_intervals( data_points: list[DataPoint], ftp: float | None = None, min_duration: int = 30, ) -> list[Interval]: """ Auto-detect work/rest intervals based on power thresholds. Work = power >= 88% FTP (sweetspot and above) Rest = power < 55% FTP (active recovery) """ powers = [dp.power for dp in data_points] if not any(p is not None for p in powers): return [] if not ftp or ftp <= 0: # Without FTP, use median power as threshold valid_powers = [p for p in powers if p is not None and p > 0] if not valid_powers: return [] threshold_high = np.median(valid_powers) * 1.15 threshold_low = np.median(valid_powers) * 0.65 else: threshold_high = ftp * 0.88 threshold_low = ftp * 0.55 intervals: list[Interval] = [] current_type: str | None = None start_idx: int = 0 for i, dp in enumerate(data_points): p = dp.power if dp.power is not None else 0 if p >= threshold_high: new_type = "work" elif p < threshold_low: new_type = "rest" else: continue # tempo zone — don't break interval if current_type is None: current_type = new_type start_idx = i elif new_type != current_type: interval = _build_interval(data_points, start_idx, i - 1, current_type, min_duration) if interval: intervals.append(interval) current_type = new_type start_idx = i # Close last interval if current_type is not None: interval = _build_interval(data_points, start_idx, len(data_points) - 1, current_type, min_duration) if interval: intervals.append(interval) return intervals def _build_interval( data_points: list[DataPoint], start_idx: int, end_idx: int, interval_type: str, min_duration: int, ) -> Interval | None: segment = data_points[start_idx:end_idx + 1] if len(segment) < min_duration: return None powers = [dp.power for dp in segment if dp.power is not None] hrs = [dp.heart_rate for dp in segment if dp.heart_rate is not None] duration = int((segment[-1].timestamp - segment[0].timestamp).total_seconds()) if duration < min_duration: return None return Interval( start_ts=segment[0].timestamp, end_ts=segment[-1].timestamp, interval_type=interval_type, avg_power=round(sum(powers) / len(powers), 1) if powers else None, avg_hr=round(sum(hrs) / len(hrs)) if hrs else None, duration=duration, )