diff --git a/opensoar/competition/strepla.py b/opensoar/competition/strepla.py index 6c3f4d5..409e66a 100644 --- a/opensoar/competition/strepla.py +++ b/opensoar/competition/strepla.py @@ -2,8 +2,11 @@ Helper functions for Strepla competitions. The files from Strepla always contain task information, which can be used for competition analysis. """ + import datetime -from typing import List, Tuple +import logging +from dataclasses import dataclass, field +from typing import Optional, Union from urllib.error import URLError from aerofiles.igc import Reader @@ -16,73 +19,131 @@ from opensoar.task.waypoint import Waypoint from opensoar.utilities.helper_functions import dm2dd +logger = logging.getLogger(__name__) + + +@dataclass +class TaskInfo: + tp: list = field(default_factory=list) + s_line_rad: Optional[int] = None + tp_key: bool = False + tp_key_dim: Optional[list[int]] = None + tp_cyl: bool = False + tp_cyl_rad: Optional[int] = None + f_line: bool = False + f_line_rad: Optional[int] = None + f_cyl: bool = False + f_cyl_rad: Optional[int] = None + tp_aat_rad: list[int] = field(default_factory=list) + tp_aat_angle: list[int] = field(default_factory=list) + aat: bool = False + time_window: Optional[datetime.timedelta] = None + gate_open: Optional[datetime.time] = None + + +@dataclass +class CompetitorInformation: + pilot_name: Optional[str] = None + competition_id: Optional[str] = None + -def get_task_and_competitor_info(lscsd_lines: List[str], lscsr_lines: List[str], lscsa_lines: List[str]) -> Tuple[dict, dict]: - task_info = { - 'tp': [], - 's_line_rad': None, - 'tp_key': False, - 'tp_key_dim': None, - 'tp_cyl': False, - 'tp_cyl_rad': None, - 'f_line': False, - 'f_line_rad': None, - 'f_cyl': False, - 'f_cyl_rad': None, - 'tp_aat_rad': [], - 'tp_aat_angle': [], - 'aat': False, - 'time_window': None, - 'gate_open': None, - } - - competitor_information = { - 'pilot_name': None, - 'competition_id': None, - } +def get_task_and_competitor_info( + lscsd_lines: list[str], lscsr_lines: list[str], lscsa_lines: list[str] +) -> tuple[TaskInfo, CompetitorInformation]: + + task_info = TaskInfo() + competitor_info = CompetitorInformation() for line in [*lscsd_lines, *lscsr_lines, *lscsa_lines]: - if line.startswith('LSCSRSLINE'): - task_info['s_line_rad'] = int((line.split(':'))[1]) / 2 - elif line.startswith('LSCSRFLINE'): - task_info['f_line'] = True - task_info['f_line_rad'] = int((line.split(':'))[1]) - elif line.startswith('LSCSRTKEYHOLE'): - task_info['tp_key'] = True - task_info['tp_key_dim'] = [int(part) for part in line.split(':')[1::]] - elif line.startswith('LSCSRTCYLINDER'): - task_info['tp_cyl'] = True - task_info['tp_cyl_rad'] = int((line.split(':'))[1]) - elif line.startswith('LSCSRFCYLINDER'): - task_info['f_cyl'] = True - task_info['f_cyl_rad'] = int((line.split(':'))[1]) - elif line.startswith('LSCSA0'): - task_info['tp_aat_rad'].append(int((line.split(':'))[1])) - if int(line.split(':')[3]) == 0: - task_info['tp_aat_angle'].append(360) - else: - task_info['tp_aat_angle'].append(int(line.split(':')[3])) - task_info['aat'] = True - elif line.startswith('LSCSDTime window'): - _, hours, minutes = line.split(':') - task_info['time_window'] = datetime.timedelta(hours=int(hours), minutes=int(minutes)) - elif line.startswith('LSCSDGate open'): - _, hours, minutes = line.split(':') - task_info['gate_open'] = datetime.time(int(hours), int(minutes)) - elif line.startswith('LSCSDName'): - competitor_information['pilot_name'] = line.split(':')[1] - elif line.startswith('LSCSDCID'): - competitor_information['competition_id'] = line.split(':')[1] - - return task_info, competitor_information - - -def get_waypoint_name_lat_long(lscs_line_tp: str) -> Tuple[str, float, float]: + try: + if line.startswith("LSCSRSLINE"): + parts = line.split(":") + if len(parts) > 1: + task_info.s_line_rad = int(parts[1]) / 2 + logger.info(f"Start line radius set to {task_info.s_line_rad}") + elif line.startswith("LSCSRFLINE"): + parts = line.split(":") + if len(parts) > 1: + task_info.f_line = True + task_info.f_line_rad = int(parts[1]) + logger.info(f"Finish line radius set to {task_info.f_line_rad}") + elif line.startswith("LSCSRTKEYHOLE"): + parts = line.split(":") + if len(parts) >= 4: + task_info.tp_key = True + task_info.tp_key_dim = [int(p) for p in parts[1:4]] + logger.info( + f"Keyhole sector dimensions set to {task_info.tp_key_dim}" + ) + elif line.startswith("LSCSRTCYLINDER"): + parts = line.split(":") + if len(parts) > 1: + task_info.tp_cyl = True + task_info.tp_cyl_rad = int(parts[1]) + logger.info( + f"Turnpoint cylinder radius set to {task_info.tp_cyl_rad}" + ) + elif line.startswith("LSCSRFCYLINDER"): + parts = line.split(":") + if len(parts) > 1: + task_info.f_cyl = True + task_info.f_cyl_rad = int(parts[1]) + logger.info(f"Finish cylinder radius set to {task_info.f_cyl_rad}") + elif line.startswith("LSCSA0"): + parts = line.split(":") + if len(parts) >= 4: + task_info.tp_aat_rad.append(int(parts[1])) + angle = int(parts[3]) + task_info.tp_aat_angle.append(360 if angle == 0 else angle) + task_info.aat = True + logger.info( + f"AAT sector radius {task_info.tp_aat_rad[-1]}, angle {task_info.tp_aat_angle[-1]}" + ) + elif line.startswith("LSCSDTime window"): + parts = line.split(":") + if len(parts) >= 3: + task_info.time_window = datetime.timedelta( + hours=int(parts[1]), minutes=int(parts[2]) + ) + logger.info(f"Time window set to {task_info.time_window}") + elif line.startswith("LSCSDGate open"): + parts = line.split(":") + if len(parts) >= 3: + task_info.gate_open = datetime.time(int(parts[1]), int(parts[2])) + logger.info(f"Gate open time set to {task_info.gate_open}") + elif line.startswith("LSCSDName"): + parts = line.split(":") + if len(parts) > 1: + competitor_info.pilot_name = parts[1] + logger.info(f"Pilot name set to {competitor_info.pilot_name}") + elif line.startswith("LSCSDCID"): + parts = line.split(":") + if len(parts) > 1: + competitor_info.competition_id = parts[1] + logger.info( + f"Competition ID set to {competitor_info.competition_id}" + ) + except (ValueError, IndexError) as e: + logger.warning(f"Skipping malformed line: {line} — Error: {e}") + continue + + return task_info, competitor_info + + +def get_waypoint_name_lat_long(lscs_line_tp: str) -> tuple[str, float, float]: """Parse LSCSCT line (LSCSCT:074 Main Lohr-M:N4959700:E00934900)""" - _, name, lat, lon = lscs_line_tp.split(':') - - lat_cardinal, lat_degrees, lat_minutes = lat[0], float(lat[1:3]), float(lat[3:5]) + float(lat[5:8]) / 1000 - lon_cardinal, lon_degrees, lon_minutes = lon[0], float(lon[1:4]), float(lon[4:6]) + float(lon[6:9]) / 1000 + _, name, lat, lon = lscs_line_tp.split(":") + + lat_cardinal, lat_degrees, lat_minutes = ( + lat[0], + float(lat[1:3]), + float(lat[3:5]) + float(lat[5:8]) / 1000, + ) + lon_cardinal, lon_degrees, lon_minutes = ( + lon[0], + float(lon[1:4]), + float(lon[4:6]) + float(lon[6:9]) / 1000, + ) lat = dm2dd(lat_degrees, lat_minutes, lat_cardinal) lon = dm2dd(lon_degrees, lon_minutes, lon_cardinal) @@ -90,102 +151,121 @@ def get_waypoint_name_lat_long(lscs_line_tp: str) -> Tuple[str, float, float]: return name, lat, lon -def get_waypoint(lscs_line_tp: str, task_info: dict, n: int, n_tp: int) -> Waypoint: - +def get_waypoint(lscs_line_tp: str, task_info: TaskInfo, n: int, n_tp: int) -> Waypoint: name, lat, lon = get_waypoint_name_lat_long(lscs_line_tp) + logger.debug(f"Waypoint {n+1}: {name} at ({lat}, {lon})") - r_min = None - r_max = None - angle_min = None - angle_max = None - orientation_angle = None + r_min = r_max = angle_min = angle_max = orientation_angle = None line = False - sector_orientation = None - distance_correction = None + sector_orientation = distance_correction = None if n == 0: line = True sector_orientation = "next" - r_max = task_info['s_line_rad'] + r_max = task_info.s_line_rad angle_max = 90 elif 0 < n < (n_tp - 1): sector_orientation = "symmetrical" - if task_info['aat']: - angle_max = (task_info['tp_aat_angle'])[n - 1] / 2 - r_max = (task_info['tp_aat_rad'])[n - 1] + if task_info.aat: + # AAT tasks use variable sectors; set radius and angle from LSCSA lines + angle_max = task_info.tp_aat_angle[n - 1] / 2 + r_max = task_info.tp_aat_rad[n - 1] sector_orientation = "previous" - else: - # turnpoint is DAEC keyhole - if task_info['tp_key']: - r_max = (task_info['tp_key_dim'])[1] - angle_max = ((task_info['tp_key_dim'])[2]) / 2 - r_min = (task_info['tp_key_dim'])[0] - angle_min = 180 - - # turnpoint is cylinder - elif task_info['tp_cyl']: - r_max = task_info['tp_cyl_rad'] - angle_max = 180 - + elif task_info.tp_key: + # Keyhole sector: use dimensions from LSCSRTKEYHOLE line + r_max = task_info.tp_key_dim[1] + angle_max = task_info.tp_key_dim[2] / 2 + r_min = task_info.tp_key_dim[0] + angle_min = 180 + elif task_info.tp_cyl: + r_max = task_info.tp_cyl_rad + angle_max = 180 elif n == n_tp - 1: sector_orientation = "previous" - - # finish is cylinder - if task_info['f_cyl']: - r_max = task_info['f_cyl_rad'] + if task_info.f_cyl: + r_max = task_info.f_cyl_rad distance_correction = "shorten_legs" angle_max = 180 - - # finish is line - elif task_info['f_line']: - r_max = task_info['f_line_rad'] + elif task_info.f_line: + r_max = task_info.f_line_rad angle_max = 90 line = True - return Waypoint(name, lat, lon, r_min, angle_min, r_max, angle_max, line, sector_orientation, distance_correction, - orientation_angle) - - -def get_waypoints(lscsc_lines: List[str], task_info: dict) -> List[Waypoint]: - waypoints = list() - for n, lscsc_line in enumerate(lscsc_lines): - waypoint = get_waypoint(lscsc_line, task_info, n, len(lscsc_lines)) - waypoints.append(waypoint) - - return waypoints - - -def get_info_from_comment_lines(parsed_igc_file: dict, start_time_buffer: int=0): - - lscsd_lines = list() - lscsr_lines = list() - lscsc_lines = list() - lscsa_lines = list() - - for comment_record in parsed_igc_file['comment_records'][1]: - line = 'L{}{}'.format(comment_record['source'], comment_record['comment']) - - if line.startswith('LSCSD'): + logger.info( + f"Waypoint {name} sector type: {'AAT' if task_info.aat else 'Keyhole' if task_info.tp_key else 'Cylinder' if task_info.tp_cyl else 'Line' if line else 'Unknown'}" + ) + + return Waypoint( + name, + lat, + lon, + r_min, + angle_min, + r_max, + angle_max, + line, + sector_orientation, + distance_correction, + orientation_angle, + ) + + +def get_waypoints(lscsc_lines: list[str], task_info: TaskInfo) -> list[Waypoint]: + return [ + get_waypoint(line, task_info, n, len(lscsc_lines)) + for n, line in enumerate(lscsc_lines) + ] + + +def get_info_from_comment_lines( + parsed_igc_file: dict, start_time_buffer: int = 0 +) -> tuple[Union[RaceTask, AAT], TaskInfo, CompetitorInformation]: + + lscsd_lines = [] + lscsr_lines = [] + lscsc_lines = [] + lscsa_lines = [] + + for comment_record in parsed_igc_file["comment_records"][1]: + line = f"L{comment_record['source']}{comment_record['comment']}" + + if line.startswith("LSCSD"): lscsd_lines.append(line) - elif line.startswith('LSCSC'): + elif line.startswith("LSCSC"): lscsc_lines.append(line) - elif line.startswith('LSCSR'): + elif line.startswith("LSCSR"): lscsr_lines.append(line) - elif line.startswith('LSCSA'): + elif line.startswith("LSCSA"): lscsa_lines.append(line) - task_information, competitor_information = get_task_and_competitor_info(lscsd_lines, lscsr_lines, lscsa_lines) - waypoints = get_waypoints(lscsc_lines, task_information) - - aat = task_information['aat'] - t_min = task_information.get('time_window', None) - start_opening = task_information.get('gate_open', None) - timezone = None # unclear where to get timezone information from strepla igc file - - if aat: - task = AAT(waypoints, t_min, timezone, start_opening, start_time_buffer) - else: - task = RaceTask(waypoints, timezone, start_opening, start_time_buffer) - - return task, task_information, competitor_information + logger.debug( + f"Collected {len(lscsd_lines)} LSCSD lines, {len(lscsr_lines)} LSCSR lines, {len(lscsc_lines)} LSCSC lines, {len(lscsa_lines)} LSCSA lines" + ) + + task_info, competitor_info = get_task_and_competitor_info( + lscsd_lines, lscsr_lines, lscsa_lines + ) + waypoints = get_waypoints(lscsc_lines, task_info) + + # Strepla IGC files do not include timezone info; may need external source + timezone = None + + task = ( + AAT( + waypoints, + task_info.time_window, + timezone, + task_info.gate_open, + start_time_buffer, + ) + if task_info.aat + else RaceTask(waypoints, timezone, task_info.gate_open, start_time_buffer) + ) + logger.info( + f"Created {'AAT' if task_info.aat else 'RaceTask'} with {len(waypoints)} waypoints" + ) + logger.info( + f"Parsed task for pilot {competitor_info.pilot_name} (ID: {competitor_info.competition_id})" + ) + return task, task_info, competitor_info diff --git a/tests/competition/test_strepla.py b/tests/competition/test_strepla.py index 5d09c47..4603e5b 100644 --- a/tests/competition/test_strepla.py +++ b/tests/competition/test_strepla.py @@ -1,116 +1,159 @@ -import unittest - import datetime - import os +import unittest from aerofiles.igc import Reader from opensoar.competition.competitor import Competitor -from opensoar.competition.strepla import get_waypoint_name_lat_long, get_waypoints, get_waypoint, get_task_and_competitor_info, get_info_from_comment_lines +from opensoar.competition.strepla import ( + CompetitorInformation, + TaskInfo, + get_info_from_comment_lines, + get_task_and_competitor_info, + get_waypoint, + get_waypoint_name_lat_long, + get_waypoints, +) from opensoar.task.aat import AAT class TestStrepla(unittest.TestCase): lscsc_lines = [ - 'LSCSCS:AP4 Fronhofen Strassen-T:N4942358:E00851490', - 'LSCSCT:074 Main Lohr-M:N4959700:E00934900', - 'LSCSCT:050 Herbstein Kirche:N5033733:E00920800', - 'LSCSCT:120 St Goar Bf:N5009067:E00742850', - 'LSCSCT:079 Meisenheim Station:N4942550:E00739767', - 'LSCSCT:010 Bensheim Lindenfels Krehberg TV:N4941150:E00843883', - 'LSCSCF:ZP Reinheim (Darmstadt Dieburg):N4950433:E00851050', + "LSCSCS:AP4 Fronhofen Strassen-T:N4942358:E00851490", + "LSCSCT:074 Main Lohr-M:N4959700:E00934900", + "LSCSCT:050 Herbstein Kirche:N5033733:E00920800", + "LSCSCT:120 St Goar Bf:N5009067:E00742850", + "LSCSCT:079 Meisenheim Station:N4942550:E00739767", + "LSCSCT:010 Bensheim Lindenfels Krehberg TV:N4941150:E00843883", + "LSCSCF:ZP Reinheim (Darmstadt Dieburg):N4950433:E00851050", ] lscsr_lines = [ - 'LSCSRSLINE:20000', - 'LSCSRTKEYHOLE:500:10000:90', - 'LSCSRFCYLINDER:2500', + "LSCSRSLINE:20000", + "LSCSRTKEYHOLE:500:10000:90", + "LSCSRFCYLINDER:2500", ] lscsd_lines = [ - 'LSCSDCID:IBG', - 'LSCSDName:Leip, Dennis', - 'LSCSDGate open:10:44', - 'LSCSDGate close:12:14', - 'LSCSDTime window:03:30', - 'LSCSDmax Elevation start:1200', - 'LSCSDmax Elevation:3000', - 'LSCSDQNH:1021', - 'LSCSDElevation start:155', + "LSCSDCID:IBG", + "LSCSDName:Leip, Dennis", + "LSCSDGate open:10:44", + "LSCSDGate close:12:14", + "LSCSDTime window:03:30", + "LSCSDmax Elevation start:1200", + "LSCSDmax Elevation:3000", + "LSCSDQNH:1021", + "LSCSDElevation start:155", ] lscsa_lines = [] - def test_waypoint_info_parsing(self): - """test whether name and coordinates are correctly read from line in igc file""" + def setUp(self): + self.task_info, self.competitor_info = get_task_and_competitor_info( + self.lscsd_lines, self.lscsr_lines, self.lscsa_lines + ) - lscs_line_tp = 'LSCSCT:074 Main Lohr-M:N4959700:E00934900' + def test_waypoint_info_parsing(self): + """Test whether name and coordinates are correctly read from line in igc file""" + lscs_line_tp = "LSCSCT:074 Main Lohr-M:N4959700:E00934900" name, lat, lon = get_waypoint_name_lat_long(lscs_line_tp) - self.assertEqual(name, '074 Main Lohr-M') + self.assertEqual(name, "074 Main Lohr-M") self.assertAlmostEqual(lat, 49.9950, places=4) self.assertAlmostEqual(lon, 9.5817, places=4) def test_get_waypoints(self): - task_info, competitor_information = get_task_and_competitor_info(self.lscsd_lines, self.lscsr_lines, []) - waypoints = get_waypoints(self.lscsc_lines, task_info) + waypoints = get_waypoints(self.lscsc_lines, self.task_info) self.assertEqual(len(waypoints), 7) def test_get_waypoint(self): + """Test parsing of a single waypoint and its sector type""" + lscsc_line = self.lscsc_lines[0] + waypoint = get_waypoint(lscsc_line, self.task_info, n=0, n_tp=7) - lscsc_line = 'LSCSCS:AP4 Fronhofen Strassen-T:N4942358:E00851490' - - task_info, competitor_information = get_task_and_competitor_info(self.lscsd_lines, self.lscsr_lines, []) - waypoint = get_waypoint(lscsc_line, task_info, n=0, n_tp=7) - - self.assertEqual(waypoint.name, 'AP4 Fronhofen Strassen-T') + self.assertEqual(waypoint.name, "AP4 Fronhofen Strassen-T") self.assertTrue(waypoint.is_line) + self.assertEqual(waypoint.sector_orientation, "next") + self.assertEqual(waypoint.angle_max, 90) + + def test_task_info_fields(self): + """Test that TaskInfo dataclass fields are correctly populated""" + self.assertIsInstance(self.task_info, TaskInfo) + self.assertTrue(self.task_info.tp_key) + self.assertEqual(self.task_info.tp_key_dim, [500, 10000, 90]) + self.assertEqual(self.task_info.s_line_rad, 10000) + self.assertEqual(self.task_info.f_cyl_rad, 2500) + + def test_competitor_info_fields(self): + """Test that CompetitorInformation dataclass fields are correctly populated""" + self.assertIsInstance(self.competitor_info, CompetitorInformation) + self.assertEqual(self.competitor_info.pilot_name, "Leip, Dennis") + self.assertEqual(self.competitor_info.competition_id, "IBG") def test_aat_from_file(self): """ - Test if aat is correctly recognised and waypoint are correct - file from: https://www.strepla.de/scs/Public/scoreDay.aspx?cId=451&idDay=7912, competitor 1 CX + Test if AAT is correctly recognised and waypoints are correct. + File from: https://www.strepla.de/scs/Public/scoreDay.aspx?cId=451&idDay=7912, competitor 1 CX """ + file_path = os.path.join( + os.path.dirname(__file__), "..", "igc_files", "aat_strepla.igc" + ) - file_path = os.path.join(os.path.dirname(__file__), '..', 'igc_files', 'aat_strepla.igc') - - with open(file_path, 'r', encoding='utf-8') as f: + with open(file_path, "r", encoding="utf-8") as f: parsed_igc_file = Reader().read(f) - trace_errors, trace = parsed_igc_file['fix_records'] - + trace_errors, trace = parsed_igc_file["fix_records"] self.assertEqual(len(trace_errors), 0) - task, _, _ = get_info_from_comment_lines(parsed_igc_file) + task, task_info, competitor_info = get_info_from_comment_lines(parsed_igc_file) self.assertIsInstance(task, AAT) self.assertEqual(task.t_min, datetime.timedelta(hours=2, minutes=30)) expected_waypoints = [ - ('AP3 Muellhalde', None), - ('Loreley', 20000), - ('Kusel', 40000), - ('Loreley', 20000), - ('ZP Anspach/Taunus', None), + ("AP3 Muellhalde", None), + ("Loreley", 20000), + ("Kusel", 40000), + ("Loreley", 20000), + ("ZP Anspach/Taunus", None), ] self.assertEqual(len(task.waypoints), len(expected_waypoints)) for i, waypoint in enumerate(task.waypoints): - expected_name, expected_r_max = expected_waypoints[i] - self.assertEqual(waypoint.name, expected_name) - if 0 < i < len(expected_waypoints) - 1: - self.assertEqual(waypoint.r_max, expected_r_max) + with self.subTest(i=i): + expected_name, expected_r_max = expected_waypoints[i] + self.assertEqual(waypoint.name, expected_name) + if 0 < i < len(expected_waypoints) - 1: + self.assertEqual(waypoint.r_max, expected_r_max) - competitor = Competitor(trace, 'CX', 'Discus2b', 1, 'Karsten Leucker') - competitor.analyse(task, 'pysoar') + competitor = Competitor(trace, "CX", "Discus2b", 1, "Karsten Leucker") + competitor.analyse(task, "pysoar") self.assertEqual(competitor.trip.refined_start_time.hour, 13) self.assertEqual(competitor.trip.refined_start_time.minute, 22) - seconds = competitor.trip.refined_start_time.second + self.assertLessEqual(abs(competitor.trip.refined_start_time.second - 40), 1) dist_diff = sum(competitor.trip.distances) - 283500 - self.assertLessEqual(abs(seconds-40), 1) self.assertEqual(len(competitor.trip.fixes), len(expected_waypoints)) self.assertLessEqual(abs(dist_diff), 1000) + + def test_empty_inputs(self): + """Test behavior when input lists are empty""" + task_info, competitor_info = get_task_and_competitor_info([], [], []) + self.assertIsInstance(task_info, TaskInfo) + self.assertIsInstance(competitor_info, CompetitorInformation) + self.assertFalse(task_info.aat) + self.assertIsNone(competitor_info.pilot_name) + + def test_malformed_line_handling(self): + """Test robustness against malformed input lines""" + malformed_lines = [ + "LSCSRSLINE", + "LSCSRTKEYHOLE:bad:data", + "LSCSDGate open:xx:yy", + ] + try: + get_task_and_competitor_info(malformed_lines, [], []) + except Exception as e: + self.fail(f"Function raised an exception on malformed input: {e}")