From 22c65dde8962524849b8d7142c232074db786f7a Mon Sep 17 00:00:00 2001 From: Orinks Date: Sun, 8 Feb 2026 02:13:52 +0000 Subject: [PATCH] test: add unit tests for temperature_utils and alert_aggregator - 51 tests for temperature_utils (conversions, formatting, edge cases) - 36 tests for alert_aggregator (grouping, dedup, severity ranking) - All 87 new tests passing, full suite green (912 passed) --- tests/test_alert_aggregator.py | 254 ++++++++++++++++++++++++++++++++ tests/test_temperature_utils.py | 199 +++++++++++++++++++++++++ 2 files changed, 453 insertions(+) create mode 100644 tests/test_alert_aggregator.py create mode 100644 tests/test_temperature_utils.py diff --git a/tests/test_alert_aggregator.py b/tests/test_alert_aggregator.py new file mode 100644 index 00000000..ec642863 --- /dev/null +++ b/tests/test_alert_aggregator.py @@ -0,0 +1,254 @@ +"""Tests for accessiweather.weather_client_alerts.AlertAggregator.""" + +from datetime import UTC, datetime, timedelta + +from accessiweather.models.alerts import WeatherAlert, WeatherAlerts +from accessiweather.weather_client_alerts import AlertAggregator + + +def _make_alert( + event="Winter Storm Warning", + areas=None, + onset=None, + source=None, + description="Test alert", + severity="Severe", + urgency="Immediate", + certainty="Likely", + headline=None, + instruction=None, +): + return WeatherAlert( + title=f"Test: {event}", + description=description, + event=event, + areas=areas or ["Test County"], + onset=onset, + source=source, + severity=severity, + urgency=urgency, + certainty=certainty, + headline=headline, + instruction=instruction, + ) + + +class TestAlertAggregatorInit: + def test_default_window(self): + agg = AlertAggregator() + assert agg.dedup_time_window == timedelta(minutes=60) + + def test_custom_window(self): + agg = AlertAggregator(dedup_time_window_minutes=30) + assert agg.dedup_time_window == timedelta(minutes=30) + + +class TestAggregateAlerts: + def setup_method(self): + self.agg = AlertAggregator() + + def test_both_none(self): + result = self.agg.aggregate_alerts(None, None) + assert result.alerts == [] + + def test_nws_only(self): + nws = WeatherAlerts(alerts=[_make_alert(source=None)]) + result = self.agg.aggregate_alerts(nws, None) + assert len(result.alerts) == 1 + assert result.alerts[0].source == "nws" + + def test_vc_only(self): + vc = WeatherAlerts(alerts=[_make_alert(source=None)]) + result = self.agg.aggregate_alerts(None, vc) + assert len(result.alerts) == 1 + assert result.alerts[0].source == "visualcrossing" + + def test_both_sources_no_duplicates(self): + nws = WeatherAlerts(alerts=[_make_alert(event="Tornado Warning")]) + vc = WeatherAlerts(alerts=[_make_alert(event="Flood Watch")]) + result = self.agg.aggregate_alerts(nws, vc) + assert len(result.alerts) == 2 + + def test_both_sources_with_duplicate(self): + now = datetime.now(UTC) + nws = WeatherAlerts(alerts=[_make_alert(onset=now, source="nws")]) + vc = WeatherAlerts(alerts=[_make_alert(onset=now, source="visualcrossing")]) + result = self.agg.aggregate_alerts(nws, vc) + assert len(result.alerts) == 1 # deduplicated + + def test_empty_alert_lists(self): + nws = WeatherAlerts(alerts=[]) + vc = WeatherAlerts(alerts=[]) + result = self.agg.aggregate_alerts(nws, vc) + assert result.alerts == [] + + def test_source_preserved_if_already_set(self): + nws = WeatherAlerts(alerts=[_make_alert(source="custom-nws")]) + result = self.agg.aggregate_alerts(nws, None) + assert result.alerts[0].source == "custom-nws" + + +class TestDeduplicateAlerts: + def setup_method(self): + self.agg = AlertAggregator() + + def test_empty_list(self): + assert self.agg._deduplicate_alerts([]) == [] + + def test_no_duplicates(self): + alerts = [ + _make_alert(event="Tornado Warning"), + _make_alert(event="Flood Watch"), + ] + result = self.agg._deduplicate_alerts(alerts) + assert len(result) == 2 + + def test_duplicates_merged(self): + now = datetime.now(UTC) + alerts = [ + _make_alert(onset=now, source="nws", description="short"), + _make_alert(onset=now, source="vc", description="a much longer description here"), + ] + result = self.agg._deduplicate_alerts(alerts) + assert len(result) == 1 + + +class TestIsDuplicate: + def setup_method(self): + self.agg = AlertAggregator() + + def test_different_events(self): + a1 = _make_alert(event="Tornado Warning") + a2 = _make_alert(event="Flood Watch") + assert not self.agg._is_duplicate(a1, a2) + + def test_same_event_same_area(self): + a1 = _make_alert() + a2 = _make_alert() + assert self.agg._is_duplicate(a1, a2) + + def test_non_overlapping_areas(self): + a1 = _make_alert(areas=["County A"]) + a2 = _make_alert(areas=["County B"]) + assert not self.agg._is_duplicate(a1, a2) + + def test_onset_within_window(self): + now = datetime.now(UTC) + a1 = _make_alert(onset=now) + a2 = _make_alert(onset=now + timedelta(minutes=30)) + assert self.agg._is_duplicate(a1, a2) + + def test_onset_outside_window(self): + now = datetime.now(UTC) + a1 = _make_alert(onset=now) + a2 = _make_alert(onset=now + timedelta(hours=2)) + assert not self.agg._is_duplicate(a1, a2) + + def test_none_onsets_still_match(self): + a1 = _make_alert(onset=None) + a2 = _make_alert(onset=None) + assert self.agg._is_duplicate(a1, a2) + + def test_mixed_tz_aware_naive(self): + aware = datetime(2025, 1, 1, 12, 0, tzinfo=UTC) + naive = datetime(2025, 1, 1, 12, 0) + a1 = _make_alert(onset=aware) + a2 = _make_alert(onset=naive) + # Should not crash; timezone normalization handles it + result = self.agg._is_duplicate(a1, a2) + assert isinstance(result, bool) + + def test_one_onset_none(self): + a1 = _make_alert(onset=datetime.now(UTC)) + a2 = _make_alert(onset=None) + # If one onset is None, time check is skipped, so they match on event+area + assert self.agg._is_duplicate(a1, a2) + + +class TestAreasOverlap: + def setup_method(self): + self.agg = AlertAggregator() + + def test_empty_first(self): + assert self.agg._areas_overlap([], ["County A"]) is True + + def test_empty_second(self): + assert self.agg._areas_overlap(["County A"], []) is True + + def test_both_empty(self): + assert self.agg._areas_overlap([], []) is True + + def test_overlap(self): + assert self.agg._areas_overlap(["County A", "County B"], ["County B", "County C"]) is True + + def test_no_overlap(self): + assert self.agg._areas_overlap(["County A"], ["County B"]) is False + + def test_case_insensitive(self): + assert self.agg._areas_overlap(["county a"], ["COUNTY A"]) is True + + def test_whitespace_stripped(self): + assert self.agg._areas_overlap([" County A "], ["county a"]) is True + + +class TestMergeDuplicateAlerts: + def setup_method(self): + self.agg = AlertAggregator() + + def test_single_alert(self): + alert = _make_alert() + result = self.agg._merge_duplicate_alerts([alert]) + assert result is alert + + def test_nws_preferred_as_base(self): + nws = _make_alert(source="nws", severity="Severe") + vc = _make_alert(source="visualcrossing", severity="Unknown") + result = self.agg._merge_duplicate_alerts([vc, nws]) + # NWS should be base (sorted first) + assert result.severity == "Severe" + + def test_longer_description_wins(self): + nws = _make_alert(source="nws", description="short") + vc = _make_alert(source="visualcrossing", description="a much longer and more detailed description") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert result.description == "a much longer and more detailed description" + + def test_longer_headline_wins(self): + nws = _make_alert(source="nws", headline="brief") + vc = _make_alert(source="visualcrossing", headline="a much longer headline text") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert result.headline == "a much longer headline text" + + def test_longer_instruction_wins(self): + nws = _make_alert(source="nws", instruction="go") + vc = _make_alert(source="visualcrossing", instruction="take shelter immediately and seek cover") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert result.instruction == "take shelter immediately and seek cover" + + def test_unknown_metadata_replaced(self): + nws = _make_alert(source="nws", severity="Unknown", urgency="Unknown", certainty="Unknown") + vc = _make_alert(source="visualcrossing", severity="Moderate", urgency="Future", certainty="Possible") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert result.severity == "Moderate" + assert result.urgency == "Future" + assert result.certainty == "Possible" + + def test_source_merged(self): + nws = _make_alert(source="nws") + vc = _make_alert(source="visualcrossing") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert "nws" in result.source + assert "visualcrossing" in result.source + + def test_areas_unioned(self): + nws = _make_alert(source="nws", areas=["County A"]) + vc = _make_alert(source="visualcrossing", areas=["County B"]) + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert set(result.areas) == {"County A", "County B"} + + def test_non_unknown_severity_kept(self): + # If base already has good severity, VC Unknown shouldn't replace it + nws = _make_alert(source="nws", severity="Extreme") + vc = _make_alert(source="visualcrossing", severity="Unknown") + result = self.agg._merge_duplicate_alerts([nws, vc]) + assert result.severity == "Extreme" diff --git a/tests/test_temperature_utils.py b/tests/test_temperature_utils.py new file mode 100644 index 00000000..2cd9a864 --- /dev/null +++ b/tests/test_temperature_utils.py @@ -0,0 +1,199 @@ +"""Tests for accessiweather.utils.temperature_utils module.""" + + +import pytest + +from accessiweather.utils.temperature_utils import ( + TemperatureUnit, + _normalize_dewpoint_unit, + calculate_dewpoint, + celsius_to_fahrenheit, + fahrenheit_to_celsius, + format_temperature, + get_temperature_values, +) + +# --- celsius_to_fahrenheit / fahrenheit_to_celsius --- + +class TestConversions: + def test_celsius_to_fahrenheit_freezing(self): + assert celsius_to_fahrenheit(0) == 32 + + def test_celsius_to_fahrenheit_boiling(self): + assert celsius_to_fahrenheit(100) == 212 + + def test_fahrenheit_to_celsius_freezing(self): + assert fahrenheit_to_celsius(32) == 0 + + def test_fahrenheit_to_celsius_boiling(self): + assert fahrenheit_to_celsius(212) == 100 + + def test_roundtrip(self): + assert abs(fahrenheit_to_celsius(celsius_to_fahrenheit(25)) - 25) < 1e-9 + + +# --- _normalize_dewpoint_unit --- + +class TestNormalizeDewpointUnit: + @pytest.mark.parametrize("val", ["c", "celsius", "°c", "degc", "wmounit:degc"]) + def test_celsius_strings(self, val): + assert _normalize_dewpoint_unit(val) == TemperatureUnit.CELSIUS + + @pytest.mark.parametrize("val", ["f", "fahrenheit", "°f", "degf", "wmounit:degf"]) + def test_fahrenheit_strings(self, val): + assert _normalize_dewpoint_unit(val) == TemperatureUnit.FAHRENHEIT + + def test_celsius_enum(self): + assert _normalize_dewpoint_unit(TemperatureUnit.CELSIUS) == TemperatureUnit.CELSIUS + + def test_fahrenheit_enum(self): + assert _normalize_dewpoint_unit(TemperatureUnit.FAHRENHEIT) == TemperatureUnit.FAHRENHEIT + + def test_both_enum_maps_to_fahrenheit(self): + assert _normalize_dewpoint_unit(TemperatureUnit.BOTH) == TemperatureUnit.FAHRENHEIT + + def test_none_defaults_to_fahrenheit(self): + assert _normalize_dewpoint_unit(None) == TemperatureUnit.FAHRENHEIT + + def test_invalid_string_defaults_to_fahrenheit(self): + assert _normalize_dewpoint_unit("kelvin") == TemperatureUnit.FAHRENHEIT + + def test_whitespace_stripped(self): + assert _normalize_dewpoint_unit(" celsius ") == TemperatureUnit.CELSIUS + + +# --- calculate_dewpoint --- + +class TestCalculateDewpoint: + def test_basic_fahrenheit(self): + result = calculate_dewpoint(72, 50, unit=TemperatureUnit.FAHRENHEIT) + assert result is not None + assert 50 < result < 60 # rough sanity + + def test_basic_celsius(self): + result = calculate_dewpoint(22, 50, unit=TemperatureUnit.CELSIUS) + assert result is not None + assert 10 < result < 15 + + def test_none_temperature(self): + assert calculate_dewpoint(None, 50) is None + + def test_none_humidity(self): + assert calculate_dewpoint(72, None) is None + + def test_both_none(self): + assert calculate_dewpoint(None, None) is None + + def test_non_numeric_temperature(self): + assert calculate_dewpoint("abc", 50) is None + + def test_non_numeric_humidity(self): + assert calculate_dewpoint(72, "xyz") is None + + def test_zero_humidity(self): + assert calculate_dewpoint(72, 0) is None + + def test_negative_humidity(self): + assert calculate_dewpoint(72, -5) is None + + def test_hundred_percent_humidity(self): + # At 100% humidity, dewpoint should equal temperature + result = calculate_dewpoint(20, 100, unit=TemperatureUnit.CELSIUS) + assert result is not None + assert abs(result - 20) < 0.5 + + def test_string_unit(self): + result = calculate_dewpoint(72, 50, unit="celsius") + # unit="celsius" means input is Celsius + assert result is not None + + def test_very_low_humidity(self): + # 0.05% humidity should clamp to 0.1 + result = calculate_dewpoint(72, 0.05, unit=TemperatureUnit.FAHRENHEIT) + assert result is not None + + +# --- format_temperature --- + +class TestFormatTemperature: + def test_fahrenheit_only(self): + result = format_temperature(72, TemperatureUnit.FAHRENHEIT) + assert "72" in result + assert "°F" in result + + def test_celsius_only(self): + result = format_temperature(72, TemperatureUnit.CELSIUS) + assert "°C" in result + + def test_both_units(self): + result = format_temperature(72, TemperatureUnit.BOTH) + assert "°F" in result + assert "°C" in result + + def test_none_temperature_with_celsius(self): + result = format_temperature(None, TemperatureUnit.FAHRENHEIT, temperature_c=20) + assert "°F" in result + + def test_none_temperature_none_celsius(self): + assert format_temperature(None) == "N/A" + + def test_smart_precision_whole_number(self): + result = format_temperature(72.0, TemperatureUnit.FAHRENHEIT, smart_precision=True) + assert result == "72°F" + + def test_smart_precision_decimal(self): + result = format_temperature(72.5, TemperatureUnit.FAHRENHEIT, smart_precision=True) + assert result == "72.5°F" + + def test_smart_precision_off(self): + result = format_temperature(72.0, TemperatureUnit.FAHRENHEIT, smart_precision=False, precision=1) + assert result == "72.0°F" + + def test_celsius_smart_precision_whole(self): + result = format_temperature(None, TemperatureUnit.CELSIUS, temperature_c=20.0, smart_precision=True) + assert result == "20°C" + + def test_both_with_temperature_c_none(self): + result = format_temperature(72, TemperatureUnit.BOTH) + assert "°F" in result + assert "°C" in result + + def test_both_smart_precision(self): + result = format_temperature(32.0, TemperatureUnit.BOTH, temperature_c=0.0, smart_precision=True) + assert result == "32°F (0°C)" + + def test_precision_2(self): + result = format_temperature(72.123, TemperatureUnit.FAHRENHEIT, smart_precision=False, precision=2) + assert result == "72.12°F" + + def test_only_temperature_c_celsius_unit(self): + result = format_temperature(None, TemperatureUnit.CELSIUS, temperature_c=25.0) + assert "25" in result + assert "°C" in result + + def test_only_temperature_c_both_unit(self): + result = format_temperature(None, TemperatureUnit.BOTH, temperature_c=0.0) + assert "°F" in result + assert "°C" in result + + +# --- get_temperature_values --- + +class TestGetTemperatureValues: + def test_both_none(self): + assert get_temperature_values(None, None) == (None, None) + + def test_only_fahrenheit(self): + f, c = get_temperature_values(32, None) + assert f == 32 + assert abs(c - 0) < 0.01 + + def test_only_celsius(self): + f, c = get_temperature_values(None, 100) + assert abs(f - 212) < 0.01 + assert c == 100 + + def test_both_provided(self): + f, c = get_temperature_values(72, 22.2) + assert f == 72 + assert c == 22.2