diff --git a/opensoar/thermals/flight_phases.py b/opensoar/thermals/flight_phases.py index 051beba..59753fa 100644 --- a/opensoar/thermals/flight_phases.py +++ b/opensoar/thermals/flight_phases.py @@ -1,9 +1,26 @@ -from collections import namedtuple -from typing import Union, List +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Optional, Union +from opensoar.task.trip import Trip from opensoar.thermals.pysoar_thermal_detector import PySoarThermalDetector +from opensoar.utilities.helper_functions import ( + calculate_distance_bearing, + total_distance_travelled, +) -Phase = namedtuple('Phase', 'is_cruise fixes') + +@dataclass +class Phase: + is_cruise: bool + fixes: list[dict[str, Any]] + duration: Optional[float] = None # Duration in seconds + gps_altitude_gain: Optional[float] = None # Gain using GPS altitude + pressure_altitude_gain: Optional[float] = None # Gain using pressure altitude + average_climb_rate_gps: Optional[float] = None # Climb rate from GPS + average_climb_rate_pressure: Optional[float] = None # Climb rate from pressure + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None class FlightPhases: @@ -11,42 +28,51 @@ class FlightPhases: Container to combine the different flight phases (thermal and cruise) with helper methods for easy access. """ - def __init__(self, classification_method: str, trace: list, trip=None): + def __init__( + self, + classification_method: str, + trace: list[dict[str, Any]], + trip: Optional[Trip] = None, + ): """ :param classification_method: currently only 'pysoar' supported - :param trace: + :param trace: :param trip: optional parameter for obtain thermals per leg """ - if classification_method == 'pysoar': + if classification_method == "pysoar": self._thermal_detector = PySoarThermalDetector() else: - raise ValueError('Classification method {} not supported'.format(classification_method)) + raise ValueError( + "Classification method {} not supported".format(classification_method) + ) self._trip = trip self._phases = self._thermal_detector.analyse(trace) - def thermals(self, leg: Union[int, str]=None) -> List[Phase]: + def thermals(self, leg: Optional[Union[int, str]] = None) -> list[Phase]: """ Obtain only thermal phases. :param leg: can be 0, 1, 2 or 'all'. Obtain only thermals within specified leg or all legs. - :return: + :return: list of thermal Phase objects """ if leg is not None: - self._check_leg(leg) - thermals = list() + thermals = [] for phase in self._phases: if phase.is_cruise: continue - if leg == 'all': + if isinstance(leg, str) and leg == "all": thermal = self._get_phase_within_trip(phase) - else: + elif isinstance(leg, int): thermal = self._get_phase_within_leg(phase, leg) + else: + # This should never happen due to _check_leg, but it's safe to guard + continue if thermal is not None: thermals.append(thermal) @@ -55,27 +81,28 @@ def thermals(self, leg: Union[int, str]=None) -> List[Phase]: else: return [phase for phase in self._phases if not phase.is_cruise] - def cruises(self, leg: Union[int, str]=None) -> List[Phase]: + def cruises(self, leg: Optional[Union[int, str]] = None) -> list[Phase]: """ Obtain only cruise phases. - :param leg:can be 0, 1, ... or 'all'. Obtain only cruises within specified leg or all legs. - :return: + :param leg: can be 0, 1, ... or 'all'. Obtain only cruises within specified leg or all legs. + :return: list of cruise Phase objects """ if leg is not None: self._check_leg(leg) - cruises = list() + cruises = [] for phase in self._phases: - if not phase.is_cruise: continue - if leg == 'all': + if isinstance(leg, str) and leg == "all": cruise = self._get_phase_within_trip(phase) - else: + elif isinstance(leg, int): cruise = self._get_phase_within_leg(phase, leg) + else: + continue # Shouldn't happen due to _check_leg if cruise is not None: cruises.append(cruise) @@ -84,26 +111,26 @@ def cruises(self, leg: Union[int, str]=None) -> List[Phase]: else: return [phase for phase in self._phases if phase.is_cruise] - def all_phases(self, leg: Union[int, str]=None) -> List[Phase]: + def all_phases(self, leg: Optional[Union[int, str]] = None) -> list[Phase]: """ Obtain all phases (cruise and thermal). :param leg: obtain only phases within specified leg (using int for leg), or obtain only phases within trip (using leg='all') - :return: + :return: list of Phase objects """ if leg is not None: - self._check_leg(leg) - phases = list() + phases: list[Phase] = [] for phase in self._phases: - - if leg == 'all': + if isinstance(leg, str) and leg == "all": phase_ = self._get_phase_within_trip(phase) - else: + elif isinstance(leg, int): phase_ = self._get_phase_within_leg(phase, leg) + else: + continue # Shouldn't happen due to _check_leg if phase_ is not None: phases.append(phase_) @@ -112,27 +139,29 @@ def all_phases(self, leg: Union[int, str]=None) -> List[Phase]: else: return self._phases - def _check_leg(self, leg): + def _check_leg(self, leg: Union[int, str]) -> None: if self._trip is None: - raise ValueError('No trip specified') + raise ValueError("No trip specified") + + if isinstance(leg, str): + if leg != "all": + raise NotImplementedError( + "Only 'all' is supported as a string leg identifier" + ) + elif isinstance(leg, int): + if leg > self._trip.started_legs() - 1: + raise ValueError(f"Trip only contains {self._trip.started_legs()} legs") else: + raise NotImplementedError("Leg must be an int or 'all'") - if type(leg) == str: - if leg != 'all': - raise NotImplementedError - elif type(leg) == int: - if leg > self._trip.started_legs() - 1: - raise ValueError('Trip only contains {} legs'.format(self._trip.started_legs())) - else: - raise NotImplementedError - - def _get_phase_within_leg(self, phase: Phase, leg: int) -> Phase: - + def _get_phase_within_leg(self, phase: Phase, leg: int) -> Optional[Phase]: """ Get part of phase that falls within a specified leg - :param leg: - :return: + :param leg: + :return: Phase with metadata, or None if outside leg """ + if self._trip is None: + raise ValueError("No trip specified") phase_start_in_leg = self._trip.fix_on_leg(phase.fixes[0], leg) phase_end_in_leg = self._trip.fix_on_leg(phase.fixes[-1], leg) @@ -165,19 +194,55 @@ def _get_phase_within_leg(self, phase: Phase, leg: int) -> Phase: else: end_fix = phase.fixes[-1] - phase_start_index = phase.fixes.index(start_fix) - phase_end_index = phase.fixes.index(end_fix) - return Phase(phase.is_cruise, phase.fixes[phase_start_index:phase_end_index + 1]) - - def _get_phase_within_trip(self, phase): + try: + duration = (end_fix["datetime"] - start_fix["datetime"]).total_seconds() + except (KeyError, TypeError): + duration = None + + try: + gps_gain = end_fix["gps_alt"] - start_fix["gps_alt"] + pressure_gain = end_fix["pressure_alt"] - start_fix["pressure_alt"] + except (KeyError, TypeError): + gps_gain = None + pressure_gain = None + + climb_rate_gps = ( + gps_gain / duration if duration and gps_gain is not None else None + ) + climb_rate_pressure = ( + pressure_gain / duration if duration and pressure_gain is not None else None + ) + + try: + phase_start_index = phase.fixes.index(start_fix) + phase_end_index = phase.fixes.index(end_fix) + fix_subset = phase.fixes[phase_start_index : phase_end_index + 1] + except ValueError: + return None + return Phase( + is_cruise=phase.is_cruise, + fixes=fix_subset, + duration=duration, + gps_altitude_gain=gps_gain, + pressure_altitude_gain=pressure_gain, + average_climb_rate_gps=climb_rate_gps, + average_climb_rate_pressure=climb_rate_pressure, + start_time=start_fix.get("datetime"), + end_time=end_fix.get("datetime"), + ) + + def _get_phase_within_trip(self, phase: Phase) -> Optional[Phase]: """ Get part of phase that falls within the trip. :param phase: - :return: phase. None if completely outside trip + :return: Phase with metadata, or None if outside trip """ + if self._trip is None: + raise ValueError("No trip specified") + first_leg = 0 last_leg = self._trip.started_legs() - 1 @@ -203,7 +268,11 @@ def _get_phase_within_trip(self, phase): if phase_start_after_trip: return None - start_fix = self._trip.fixes[first_leg] if use_trip_start_fix else phase.fixes[first_leg] + start_fix = ( + self._trip.fixes[first_leg] + if use_trip_start_fix + else phase.fixes[first_leg] + ) if use_trip_end_fix: if self._trip.outlanded() and last_leg == self._trip.outlanding_leg(): @@ -213,6 +282,119 @@ def _get_phase_within_trip(self, phase): else: end_fix = phase.fixes[-1] - phase_start_index = phase.fixes.index(start_fix) - phase_end_index = phase.fixes.index(end_fix) - return Phase(phase.is_cruise, phase.fixes[phase_start_index:phase_end_index + 1]) + try: + duration = (end_fix["datetime"] - start_fix["datetime"]).total_seconds() + except (KeyError, TypeError): + duration = None + + try: + gps_gain = end_fix["gps_alt"] - start_fix["gps_alt"] + pressure_gain = end_fix["pressure_alt"] - start_fix["pressure_alt"] + except (KeyError, TypeError): + gps_gain = None + pressure_gain = None + + climb_rate_gps = ( + gps_gain / duration if duration and gps_gain is not None else None + ) + climb_rate_pressure = ( + pressure_gain / duration if duration and pressure_gain is not None else None + ) + + try: + phase_start_index = phase.fixes.index(start_fix) + phase_end_index = phase.fixes.index(end_fix) + fix_subset = phase.fixes[phase_start_index : phase_end_index + 1] + except ValueError: + return None # Fix not found in phase + + return Phase( + is_cruise=phase.is_cruise, + fixes=fix_subset, + duration=duration, + gps_altitude_gain=gps_gain, + pressure_altitude_gain=pressure_gain, + average_climb_rate_gps=climb_rate_gps, + average_climb_rate_pressure=climb_rate_pressure, + start_time=start_fix.get("datetime"), + end_time=end_fix.get("datetime"), + ) + + def filter_by_time_range(self, start: datetime, end: datetime) -> list[Phase]: + """ + Return all phases that overlap with the given time range. + + :param start: Start of the time window + :param end: End of the time window + :return: List of Phase objects that intersect with the time range + """ + return [ + phase + for phase in self._phases + if phase.start_time + and phase.end_time + and not (phase.end_time < start or phase.start_time > end) + ] + + def filter_by_location( + self, center_lat: float, center_lon: float, radius_km: float + ) -> list[Phase]: + """ + Return all phases that pass within a given radius of a geographic point. + + :param center_lat: Latitude of the center point + :param center_lon: Longitude of the center point + :param radius_km: Radius in kilometers + :return: List of Phase objects that intersect the area + """ + + def within_radius(fix) -> bool: + try: + dist, _ = calculate_distance_bearing( + fix, {"lat": center_lat, "lon": center_lon} + ) + return dist <= radius_km * 1000 # convert km to meters + except KeyError: + return False + + return [ + phase + for phase in self._phases + if any(within_radius(fix) for fix in phase.fixes) + ] + + def summary_stats(self) -> dict: + """ + Return summary statistics for the flight phases. + + Includes: + - Total number of thermals + - Total number of cruises + - Average thermal duration (in seconds) + - Total cruise distance (in meters) + """ + thermals = [p for p in self._phases if not p.is_cruise] + cruises = [p for p in self._phases if p.is_cruise] + + total_thermals = len(thermals) + total_cruises = len(cruises) + + avg_thermal_duration = None + if thermals: + durations = [p.duration for p in thermals if p.duration is not None] + if durations: + avg_thermal_duration = sum(durations) / len(durations) + + total_cruise_distance = 0 + for cruise in cruises: + try: + total_cruise_distance += total_distance_travelled(cruise.fixes) + except Exception: + continue # Skip if fix data is malformed + + return { + "total_thermals": total_thermals, + "total_cruises": total_cruises, + "average_thermal_duration_sec": avg_thermal_duration, + "total_cruise_distance_m": total_cruise_distance, + } diff --git a/opensoar/thermals/pysoar_thermal_detector.py b/opensoar/thermals/pysoar_thermal_detector.py index 693e51d..b2a9587 100644 --- a/opensoar/thermals/pysoar_thermal_detector.py +++ b/opensoar/thermals/pysoar_thermal_detector.py @@ -1,4 +1,13 @@ -from opensoar.utilities.helper_functions import triple_iterator, calculate_bearing_change, calculate_distance_bearing +from typing import TYPE_CHECKING + +from opensoar.utilities.helper_functions import ( + calculate_bearing_change, + calculate_distance_bearing, + triple_iterator, +) + +if TYPE_CHECKING: + from opensoar.thermals.flight_phases import Phase class PySoarThermalDetector: @@ -13,112 +22,134 @@ class PySoarThermalDetector: THERMAL_THRESHOLD_BEARINGRATE_AVG = 2 # deg/s THERMAL_THRESHOLD_BEARINGRATE = 4 # deg/s - def __init__(self): + def __init__(self) -> None: pass - def analyse(self, trace): - - # To prevent circular import with flight_phases + def _create_phase(self, is_cruise: bool, fixes: list[dict]) -> "Phase": from opensoar.thermals.flight_phases import Phase + start = fixes[0] + end = fixes[-1] + duration = (end["datetime"] - start["datetime"]).total_seconds() + gps_gain = end["gps_alt"] - start["gps_alt"] + pressure_gain = end["pressure_alt"] - start["pressure_alt"] + climb_rate_gps = gps_gain / duration if duration > 0 else None + climb_rate_pressure = pressure_gain / duration if duration > 0 else None + + return Phase( + is_cruise=is_cruise, + fixes=fixes, + duration=duration, + gps_altitude_gain=gps_gain, + pressure_altitude_gain=pressure_gain, + average_climb_rate_gps=climb_rate_gps, + average_climb_rate_pressure=climb_rate_pressure, + start_time=start["datetime"], + end_time=end["datetime"], + ) + + def analyse(self, trace: list[dict]) -> list["Phase"]: cruise = True - possible_thermal_fixes = list() - possible_cruise_fixes = list() + possible_thermal_fixes = [] + possible_cruise_fixes = [] sharp_thermal_entry_found = False turning_left = True total_bearing_change = 0 - # Start with first phase - phases = [Phase(cruise, trace[0:2])] + phases = [self._create_phase(cruise, trace[0:2])] for fix_minus2, fix_minus1, fix in triple_iterator(trace): - - time_minus2 = fix_minus2['datetime'] - time_minus1 = fix_minus1['datetime'] - time = fix['datetime'] + time_minus2 = fix_minus2["datetime"] + time_minus1 = fix_minus1["datetime"] + time = fix["datetime"] bearing_change = calculate_bearing_change(fix_minus2, fix_minus1, fix) - delta_t = (0.5 * (time - time_minus1).total_seconds() + - 0.5 * (time - time_minus2).total_seconds()) + delta_t = ( + 0.5 * (time - time_minus1).total_seconds() + + 0.5 * (time - time_minus2).total_seconds() + ) bearing_change_rate = bearing_change / delta_t if cruise: - - continuing_left = turning_left and bearing_change_rate < self.MINIMUM_BEARING_CHANGE_RATE - continuing_right = not turning_left and bearing_change_rate > -self.MINIMUM_BEARING_CHANGE_RATE + continuing_left = ( + turning_left + and bearing_change_rate < self.MINIMUM_BEARING_CHANGE_RATE + ) + continuing_right = ( + not turning_left + and bearing_change_rate > -self.MINIMUM_BEARING_CHANGE_RATE + ) if continuing_left or continuing_right: - total_bearing_change += bearing_change - - if len(possible_thermal_fixes) == 0: + if not possible_thermal_fixes: + possible_thermal_fixes = [fix] + elif ( + not sharp_thermal_entry_found + and abs(bearing_change_rate) > self.CRUISE_THRESHOLD_BEARINGRATE + ): + sharp_thermal_entry_found = True + phases[-1].fixes.extend(possible_thermal_fixes) possible_thermal_fixes = [fix] else: - if not sharp_thermal_entry_found and abs(bearing_change_rate) > self.CRUISE_THRESHOLD_BEARINGRATE: - sharp_thermal_entry_found = True - phases[-1].fixes.extend(possible_thermal_fixes) - possible_thermal_fixes = [fix] - else: - possible_thermal_fixes.append(fix) - - else: # sign change + possible_thermal_fixes.append(fix) + else: total_bearing_change = bearing_change sharp_thermal_entry_found = False - - if len(possible_thermal_fixes) == 0: + if not possible_thermal_fixes: phases[-1].fixes.append(fix) else: phases[-1].fixes.extend([*possible_thermal_fixes, fix]) - possible_thermal_fixes = list() - + possible_thermal_fixes = [] turning_left = bearing_change_rate < 0 if abs(total_bearing_change) > self.CRUISE_THRESHOLD_BEARINGTOT: cruise = False phases[-1].fixes.append(possible_thermal_fixes[0]) - phases.append(Phase(cruise, possible_thermal_fixes)) - - possible_thermal_fixes = list() + phases.append(self._create_phase(cruise, possible_thermal_fixes)) + possible_thermal_fixes = [] sharp_thermal_entry_found = False total_bearing_change = 0 - else: # thermal - + else: if abs(bearing_change_rate) > self.THERMAL_THRESHOLD_BEARINGRATE: - if len(possible_cruise_fixes) != 0: + if possible_cruise_fixes: phases[-1].fixes.extend([*possible_cruise_fixes, fix]) - possible_cruise_fixes = list() + possible_cruise_fixes = [] else: phases[-1].fixes.append(fix) - - else: # possible cruise - - if len(possible_cruise_fixes) == 0: + else: + if not possible_cruise_fixes: possible_cruise_fixes = [fix] total_bearing_change = bearing_change else: possible_cruise_fixes.append(fix) total_bearing_change += bearing_change - delta_t = (time - possible_cruise_fixes[0]['datetime']).total_seconds() - cruise_distance, _ = calculate_distance_bearing(possible_cruise_fixes[0], fix) - temp_bearing_rate_avg = 0 if delta_t == 0 else total_bearing_change / delta_t - - if (cruise_distance > self.THERMAL_THRESHOLD_DISTANCE and - abs(temp_bearing_rate_avg) < self.THERMAL_THRESHOLD_BEARINGRATE_AVG): - + delta_t = ( + time - possible_cruise_fixes[0]["datetime"] + ).total_seconds() + cruise_distance, _ = calculate_distance_bearing( + possible_cruise_fixes[0], fix + ) + temp_bearing_rate_avg = ( + 0 if delta_t == 0 else total_bearing_change / delta_t + ) + + if ( + cruise_distance > self.THERMAL_THRESHOLD_DISTANCE + and abs(temp_bearing_rate_avg) + < self.THERMAL_THRESHOLD_BEARINGRATE_AVG + ): cruise = True phases[-1].fixes.append(possible_cruise_fixes[0]) - phases.append(Phase(cruise, possible_cruise_fixes)) - possible_cruise_fixes = list() + phases.append(self._create_phase(cruise, possible_cruise_fixes)) + possible_cruise_fixes = [] total_bearing_change = 0 - # add possible fixes at the end - if cruise: - if len(possible_thermal_fixes) != 0: - phases[-1].fixes.extend(possible_thermal_fixes) - else: - if len(possible_cruise_fixes) != 0: - phases[-1].fixes.extend(possible_cruise_fixes) + if cruise and possible_thermal_fixes: + phases[-1].fixes.extend(possible_thermal_fixes) + elif not cruise and possible_cruise_fixes: + phases[-1].fixes.extend(possible_cruise_fixes) return phases diff --git a/tests/thermals/test_flight_phases.py b/tests/thermals/test_flight_phases.py index c68cfa0..6c9c767 100644 --- a/tests/thermals/test_flight_phases.py +++ b/tests/thermals/test_flight_phases.py @@ -1,79 +1,94 @@ +import datetime import os import unittest -import datetime from copy import deepcopy from opensoar.task.trip import Trip from opensoar.thermals.flight_phases import FlightPhases from opensoar.utilities.helper_functions import double_iterator - -from tests.task.helper_functions import get_trace, get_task +from tests.task.helper_functions import get_task, get_trace class TestFlightPhases(unittest.TestCase): - pysoar_phase_start_times = [ - datetime.datetime(2014, 6, 21, 12, 12, 52, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 20, 22, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 24, 14, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 29, 22, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 33, 6, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 34, 50, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 37, 42, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 47, 14, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 12, 52, 42, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 13, 1, 0, tzinfo=datetime.timezone.utc), - datetime.datetime(2014, 6, 21, 13, 4, 52, tzinfo=datetime.timezone.utc), - ] - - cwd = os.path.dirname(__file__) - igc_path = os.path.join(cwd, '..', 'igc_files', 'race_task_completed.igc') - - trace = get_trace(igc_path) - race_task = get_task(igc_path) - trip = Trip(race_task, trace) - phases = FlightPhases('pysoar', trace, trip) + @classmethod + def setUpClass(cls): + cls.cwd = os.path.dirname(__file__) + cls.igc_path = os.path.join( + cls.cwd, "..", "igc_files", "race_task_completed.igc" + ) + cls.trace = get_trace(cls.igc_path) + cls.race_task = get_task(cls.igc_path) + cls.trip = Trip(cls.race_task, cls.trace) + cls.phases = FlightPhases("pysoar", cls.trace, cls.trip) + + # Reference start times for each phase detected by PySoar + cls.pysoar_phase_start_times = [ + datetime.datetime(2014, 6, 21, 12, 12, 52, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 20, 22, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 24, 14, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 29, 22, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 33, 6, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 34, 50, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 37, 42, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 47, 14, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 12, 52, 42, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 13, 1, 0, tzinfo=datetime.timezone.utc), + datetime.datetime(2014, 6, 21, 13, 4, 52, tzinfo=datetime.timezone.utc), + ] def test_all_phases(self): - all_phases = self.phases.all_phases(leg='all') + all_phases = self.phases.all_phases(leg="all") # Check if end fixes are the same as the start fixes of next phase for phase, next_phase in double_iterator(all_phases): - self.assertEqual(phase[1][-1], next_phase[1][0]) + self.assertEqual(phase.fixes[-1], next_phase.fixes[0]) # check same number of phases self.assertEqual(len(all_phases), len(self.pysoar_phase_start_times)) # check if start times of phases are within 2 seconds - for phase, pysoar_phase_start_time in zip(all_phases, self.pysoar_phase_start_times): - time_diff = (pysoar_phase_start_time - phase.fixes[0]['datetime']).total_seconds() + for phase, pysoar_phase_start_time in zip( + all_phases, self.pysoar_phase_start_times + ): + time_diff = ( + pysoar_phase_start_time - phase.fixes[0]["datetime"] + ).total_seconds() self.assertLessEqual(abs(time_diff), 2) def test_thermals(self): - thermals = self.phases.thermals(leg='all') + thermals = self.phases.thermals(leg="all") # check if indeed only thermals for thermal in thermals: self.assertFalse(thermal.is_cruise) # check if correct phases are classified as thermals - for thermal, pysoar_start_time in zip(thermals, self.pysoar_phase_start_times[1::2]): - time_diff = (pysoar_start_time - thermal.fixes[0]['datetime']).total_seconds() + for thermal, pysoar_start_time in zip( + thermals, self.pysoar_phase_start_times[1::2] + ): + time_diff = ( + pysoar_start_time - thermal.fixes[0]["datetime"] + ).total_seconds() self.assertLessEqual(abs(time_diff), 2) def test_cruises(self): - cruises = self.phases.cruises(leg='all') + cruises = self.phases.cruises(leg="all") # check if indeed only cruises for cruise in cruises: self.assertTrue(cruise.is_cruise) # check if correct phases are classified as cruises - for cruise, pysoar_start_time in zip(cruises, self.pysoar_phase_start_times[0::2]): - time_diff = (pysoar_start_time - cruise.fixes[0]['datetime']).total_seconds() + for cruise, pysoar_start_time in zip( + cruises, self.pysoar_phase_start_times[0::2] + ): + time_diff = ( + pysoar_start_time - cruise.fixes[0]["datetime"] + ).total_seconds() self.assertLessEqual(abs(time_diff), 2) def test_thermals_on_leg(self): @@ -87,24 +102,22 @@ def test_thermals_on_leg(self): for thermal in thermals_leg2: self.assertFalse(thermal.is_cruise) - leg_start_time = self.trip.fixes[1]['datetime'] - leg_end_time = self.trip.fixes[2]['datetime'] + leg_start_time = self.trip.fixes[1]["datetime"] + leg_end_time = self.trip.fixes[2]["datetime"] # check start-time of first thermal - start_time = thermals_leg2[0].fixes[0]['datetime'] - diff = (leg_start_time - start_time).total_seconds() - self.assertEqual(diff, 0) + start_time = thermals_leg2[0].fixes[0]["datetime"] + self.assertEqual((leg_start_time - start_time).total_seconds(), 0) - # check endtime of last thermal - end_time = thermals_leg2[-1].fixes[-1]['datetime'] - diff = (leg_end_time - end_time).total_seconds() - self.assertEqual(diff, 0) + # check end-time of last thermal + end_time = thermals_leg2[-1].fixes[-1]["datetime"] + self.assertEqual((leg_end_time - end_time).total_seconds(), 0) def test_cruises_on_leg(self): cruises_leg2 = self.phases.cruises(leg=1) - # check indeed subset of all thermals + # check indeed subset of all cruises self.assertTrue(len(cruises_leg2) < len(self.phases.cruises())) # check all cruises @@ -116,18 +129,78 @@ def test_phases_on_leg_spanning_complete_leg(self): the end of the leg.""" trace = [ - {'datetime': datetime.datetime(2012, 5, 26, 11, 33, 26, tzinfo=datetime.timezone.utc), 'lat': 52.468183333333336, 'lon': 6.3402, 'validity': 'A', - 'pressure_alt': -37, 'gps_alt': 47, 'FXA': 2, 'SIU': 1}, - {'datetime': datetime.datetime(2012, 5, 26, 11, 33, 34, tzinfo=datetime.timezone.utc), 'lat': 52.468183333333336, 'lon': 6.3402, 'validity': 'A', - 'pressure_alt': -37, 'gps_alt': 47, 'FXA': 2, 'SIU': 1}, - {'datetime': datetime.datetime(2012, 5, 26, 11, 33, 42, tzinfo=datetime.timezone.utc), 'lat': 52.468183333333336, 'lon': 6.3402, 'validity': 'A', - 'pressure_alt': -37, 'gps_alt': 47, 'FXA': 2, 'SIU': 1}, - {'datetime': datetime.datetime(2012, 5, 26, 11, 33, 50, tzinfo=datetime.timezone.utc), 'lat': 52.468183333333336, 'lon': 6.3402, 'validity': 'A', - 'pressure_alt': -37, 'gps_alt': 48, 'FXA': 1, 'SIU': 1}, - {'datetime': datetime.datetime(2012, 5, 26, 11, 33, 58, tzinfo=datetime.timezone.utc), 'lat': 52.468183333333336, 'lon': 6.340216666666667, 'validity': 'A', - 'pressure_alt': -37, 'gps_alt': 48, 'FXA': 1, 'SIU': 1}, - {'datetime': datetime.datetime(2012, 5, 26, 11, 34, 6, tzinfo=datetime.timezone.utc), 'lat': 52.46816666666667, 'lon': 6.339666666666667, 'validity': 'A', - 'pressure_alt': -38, 'gps_alt': 49, 'FXA': 1, 'SIU': 1}, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 33, 26, tzinfo=datetime.timezone.utc + ), + "lat": 52.468183333333336, + "lon": 6.3402, + "validity": "A", + "pressure_alt": -37, + "gps_alt": 47, + "FXA": 2, + "SIU": 1, + }, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 33, 34, tzinfo=datetime.timezone.utc + ), + "lat": 52.468183333333336, + "lon": 6.3402, + "validity": "A", + "pressure_alt": -37, + "gps_alt": 47, + "FXA": 2, + "SIU": 1, + }, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 33, 42, tzinfo=datetime.timezone.utc + ), + "lat": 52.468183333333336, + "lon": 6.3402, + "validity": "A", + "pressure_alt": -37, + "gps_alt": 47, + "FXA": 2, + "SIU": 1, + }, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 33, 50, tzinfo=datetime.timezone.utc + ), + "lat": 52.468183333333336, + "lon": 6.3402, + "validity": "A", + "pressure_alt": -37, + "gps_alt": 48, + "FXA": 1, + "SIU": 1, + }, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 33, 58, tzinfo=datetime.timezone.utc + ), + "lat": 52.468183333333336, + "lon": 6.340216666666667, + "validity": "A", + "pressure_alt": -37, + "gps_alt": 48, + "FXA": 1, + "SIU": 1, + }, + { + "datetime": datetime.datetime( + 2012, 5, 26, 11, 34, 6, tzinfo=datetime.timezone.utc + ), + "lat": 52.46816666666667, + "lon": 6.339666666666667, + "validity": "A", + "pressure_alt": -38, + "gps_alt": 49, + "FXA": 1, + "SIU": 1, + }, ] # originally this did trip = deepcopy(self._trip) @@ -136,18 +209,14 @@ def test_phases_on_leg_spanning_complete_leg(self): race_task = get_task(self.igc_path) trip = Trip(race_task, _trace) - trip.fixes = [ - trace[1], - trace[4] - ] + trip.fixes = [trace[1], trace[4]] - phases = FlightPhases('pysoar', trace, trip) + phases = FlightPhases("pysoar", trace, trip) - # there should only be one phase: starting at first fix and ending at last fix of trace - # these are conditions to a correct test setup, therefore no actual tests - assert len(phases._phases) == 1 - assert phases._phases[0].fixes[0]['datetime'] == trace[0]['datetime'] - assert phases._phases[0].fixes[-1]['datetime'] == trace[-1]['datetime'] + # There should only be one phase: starting at first fix and ending at last fix of trace + self.assertEqual(len(phases._phases), 1) + self.assertEqual(phases._phases[0].fixes[0]["datetime"], trace[0]["datetime"]) + self.assertEqual(phases._phases[0].fixes[-1]["datetime"], trace[-1]["datetime"]) all_phases_leg0 = phases.all_phases(leg=0) @@ -157,5 +226,93 @@ def test_phases_on_leg_spanning_complete_leg(self): # check if phase correctly starts and ends at the trip fixes and not the trace fixes first_phase_fix = all_phases_leg0[0].fixes[0] last_phase_fix = all_phases_leg0[0].fixes[-1] - self.assertEqual(first_phase_fix['datetime'], trip.fixes[0]['datetime']) - self.assertEqual(last_phase_fix['datetime'], trip.fixes[-1]['datetime']) + self.assertEqual(first_phase_fix["datetime"], trip.fixes[0]["datetime"]) + self.assertEqual(last_phase_fix["datetime"], trip.fixes[-1]["datetime"]) + + def test_invalid_leg_index(self): + with self.assertRaises(ValueError): + self.phases.thermals(leg=99) + + def test_leg_filter_without_trip(self): + phases_no_trip = FlightPhases("pysoar", self.trace) + with self.assertRaises(ValueError): + phases_no_trip.thermals(leg=0) + + def test_invalid_classification_method(self): + with self.assertRaises(ValueError): + FlightPhases("unsupported_method", self.trace, self.trip) + + def test_all_phases_no_leg(self): + all_phases = self.phases.all_phases(leg="all") + self.assertEqual(len(all_phases), len(self.pysoar_phase_start_times)) + + def test_phase_boundaries_are_continuous(self): + all_phases = self.phases.all_phases(leg="all") + for phase, next_phase in double_iterator(all_phases): + self.assertEqual( + phase.fixes[-1]["datetime"], next_phase.fixes[0]["datetime"] + ) + + def test_phase_metadata_fields(self): + phase = self.phases.all_phases(leg="all")[0] + + self.assertIsNotNone(phase.duration) + self.assertIsInstance(phase.duration, float) + + self.assertIsNotNone(phase.gps_altitude_gain) + self.assertIsInstance(phase.gps_altitude_gain, (int, float)) + + self.assertIsNotNone(phase.pressure_altitude_gain) + self.assertIsInstance(phase.pressure_altitude_gain, (int, float)) + + self.assertIsNotNone(phase.average_climb_rate_gps) + self.assertIsInstance(phase.average_climb_rate_gps, float) + + self.assertIsNotNone(phase.start_time) + self.assertIsInstance(phase.start_time, datetime.datetime) + + self.assertIsNotNone(phase.end_time) + self.assertIsInstance(phase.end_time, datetime.datetime) + + def test_filter_by_time_range(self): + start = datetime.datetime(2014, 6, 21, 12, 20, 0, tzinfo=datetime.timezone.utc) + end = datetime.datetime(2014, 6, 21, 12, 40, 0, tzinfo=datetime.timezone.utc) + + filtered = self.phases.filter_by_time_range(start, end) + + for phase in filtered: + self.assertLessEqual(phase.start_time, end) + self.assertGreaterEqual(phase.end_time, start) + + def test_filter_by_location(self): + center_lat = 52.33 + center_lon = 6.24 + radius_km = 5 + + filtered = self.phases.filter_by_location(center_lat, center_lon, radius_km) + + self.assertGreater(len(filtered), 0) + for phase in filtered: + self.assertTrue( + any( + abs(fix["lat"] - center_lat) < 0.1 + and abs(fix["lon"] - center_lon) < 0.1 + for fix in phase.fixes + ) + ) + + def test_summary_stats(self): + stats = self.phases.summary_stats() + + self.assertIn("total_thermals", stats) + self.assertIn("total_cruises", stats) + self.assertIn("average_thermal_duration_sec", stats) + self.assertIn("total_cruise_distance_m", stats) + + self.assertIsInstance(stats["total_thermals"], int) + self.assertIsInstance(stats["total_cruises"], int) + self.assertTrue( + stats["average_thermal_duration_sec"] is None + or isinstance(stats["average_thermal_duration_sec"], float) + ) + self.assertIsInstance(stats["total_cruise_distance_m"], (int, float))