diff --git a/climada/test/common_test_fixtures.py b/climada/test/common_test_fixtures.py
new file mode 100644
index 0000000000..64ed519fe4
--- /dev/null
+++ b/climada/test/common_test_fixtures.py
@@ -0,0 +1,205 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+---
+
+A set of reusable objects for testing purpose.
+
+The objective of this file is to provide minimalistic, understandable and consistent
+default objects for unit and integration testing.
+
+"""
+
+import geopandas as gpd
+import numpy as np
+from scipy.sparse import csr_matrix
+from shapely.geometry import Point
+
+from climada.entity import Exposures, ImpactFunc, ImpactFuncSet
+from climada.hazard import Centroids, Hazard
+from climada.trajectories.snapshot import Snapshot
+
+# ---------------------------------------------------------------------------
+# Coordinate system and metadata
+# ---------------------------------------------------------------------------
+CRS_WGS84 = "EPSG:4326"
+
+# ---------------------------------------------------------------------------
+# Exposure attributes
+# ---------------------------------------------------------------------------
+EXP_DESC = "Test exposure dataset"
+EXP_DESC_LATLON = "Test exposure dataset (lat/lon)"
+EXPOSURE_REF_YEAR = 2020
+EXPOSURE_VALUE_UNIT = "USD"
+VALUES = np.array([0, 1000, 2000, 3000])
+REGIONS = np.array(["A", "A", "B", "B"])
+CATEGORIES = np.array([1, 1, 2, 1])
+
+# Exposure coordinates
+EXP_LONS = np.array([4, 4.5, 4, 4.5])
+EXP_LATS = np.array([45, 45, 45.5, 45.5])
+
+# ---------------------------------------------------------------------------
+# Hazard definition
+# ---------------------------------------------------------------------------
+HAZARD_TYPE = "TEST_HAZARD_TYPE"
+HAZARD_UNIT = "TEST_HAZARD_UNIT"
+
+# Hazard centroid positions
+HAZ_JITTER = 0.1 # To test centroid matching
+HAZ_LONS = EXP_LONS + HAZ_JITTER
+HAZ_LATS = EXP_LATS + HAZ_JITTER
+
+# Hazard events
+EVENT_IDS = np.array([1, 2, 3, 4])
+EVENT_NAMES = ["ev1", "ev2", "ev3", "ev4"]
+DATES = np.array([1, 2, 3, 4])
+
+# Frequency are choosen so that they cumulate nicely
+# to correspond to 100, 50, and 20y return periods (for impacts)
+FREQUENCY = np.array([0.1, 0.03, 0.01, 0.01])
+FREQUENCY_UNIT = "1/year"
+
+# Hazard maximum intensity
+# 100 to match 0 to 100% idea
+# also in line with linear 1:1 impact function
+# for easy mental calculus
+HAZARD_MAX_INTENSITY = 100
+
+# ---------------------------------------------------------------------------
+# Impact function
+# ---------------------------------------------------------------------------
+IMPF_ID = 1
+IMPF_NAME = "IMPF_1"
+
+# ---------------------------------------------------------------------------
+# Future years
+# ---------------------------------------------------------------------------
+EXPOSURE_FUTURE_YEAR = 2040
+
+
+def reusable_minimal_exposures(
+ values=VALUES,
+ regions=REGIONS,
+ group_id=None,
+ lon=EXP_LONS,
+ lat=EXP_LATS,
+ crs=CRS_WGS84,
+ desc=EXP_DESC,
+ ref_year=EXPOSURE_REF_YEAR,
+ value_unit=EXPOSURE_VALUE_UNIT,
+ assign_impf=IMPF_ID,
+ increase_value_factor=1,
+) -> Exposures:
+ data = gpd.GeoDataFrame(
+ {
+ "value": values * increase_value_factor,
+ "region_id": regions,
+ f"impf_{HAZARD_TYPE}": assign_impf,
+ "geometry": [Point(lon, lat) for lon, lat in zip(lon, lat)],
+ },
+ crs=crs,
+ )
+ if group_id is not None:
+ data["group_id"] = group_id
+ return Exposures(
+ data=data,
+ description=desc,
+ ref_year=ref_year,
+ value_unit=value_unit,
+ )
+
+
+def reusable_intensity_mat(max_intensity=HAZARD_MAX_INTENSITY):
+ # Choosen such that:
+ # - 1st event has 0 intensity
+ # - 2nd event has max intensity in first exposure point (defaulting to 0 value)
+ # - 3rd event has 1/2* of max intensity in second centroid
+ # - 4th event has 1/4* of max intensity everywhere
+ # *: So that you can double intensity of the hazard and expect double impacts
+ return csr_matrix(
+ [
+ [0, 0, 0, 0],
+ [max_intensity, 0, 0, 0],
+ [0, max_intensity / 2, 0, 0],
+ [
+ max_intensity / 4,
+ max_intensity / 4,
+ max_intensity / 4,
+ max_intensity / 4,
+ ],
+ ]
+ )
+
+
+def reusable_minimal_hazard(
+ haz_type=HAZARD_TYPE,
+ units=HAZARD_UNIT,
+ lat=HAZ_LATS,
+ lon=HAZ_LONS,
+ crs=CRS_WGS84,
+ event_id=EVENT_IDS,
+ event_name=EVENT_NAMES,
+ date=DATES,
+ frequency=FREQUENCY,
+ frequency_unit=FREQUENCY_UNIT,
+ intensity=None,
+ intensity_factor=1,
+) -> Hazard:
+ intensity = reusable_intensity_mat() if intensity is None else intensity
+ intensity *= intensity_factor
+ return Hazard(
+ haz_type=haz_type,
+ units=units,
+ centroids=Centroids(lat=lat, lon=lon, crs=crs),
+ event_id=event_id,
+ event_name=event_name,
+ date=date,
+ frequency=frequency,
+ frequency_unit=frequency_unit,
+ intensity=intensity,
+ )
+
+
+def reusable_minimal_impfset(
+ hazard=None, name=IMPF_NAME, impf_id=IMPF_ID, max_intensity=HAZARD_MAX_INTENSITY
+):
+ hazard = reusable_minimal_hazard() if hazard is None else hazard
+ return ImpactFuncSet(
+ [
+ ImpactFunc(
+ haz_type=hazard.haz_type,
+ intensity_unit=hazard.units,
+ name=name,
+ intensity=np.array([0, max_intensity / 2, max_intensity]),
+ mdd=np.array([0, 0.5, 1]),
+ paa=np.array([1, 1, 1]),
+ id=impf_id,
+ )
+ ]
+ )
+
+
+def reusable_snapshot(
+ hazard_intensity_increase_factor=1,
+ exposure_value_increase_factor=1,
+ date=EXPOSURE_REF_YEAR,
+):
+ exposures = reusable_minimal_exposures(
+ increase_value_factor=exposure_value_increase_factor
+ )
+ hazard = reusable_minimal_hazard(intensity_factor=hazard_intensity_increase_factor)
+ impfset = reusable_minimal_impfset()
+ return Snapshot(exposure=exposures, hazard=hazard, impfset=impfset, date=date)
diff --git a/climada/test/test_trajectories.py b/climada/test/test_trajectories.py
new file mode 100644
index 0000000000..5df15e2651
--- /dev/null
+++ b/climada/test/test_trajectories.py
@@ -0,0 +1,278 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+---
+
+Test trajectories.
+
+"""
+
+from unittest import TestCase
+
+import numpy as np
+import pandas as pd
+
+from climada.engine.impact_calc import ImpactCalc
+from climada.entity.disc_rates.base import DiscRates
+from climada.test.common_test_fixtures import (
+ CATEGORIES,
+ reusable_minimal_exposures,
+ reusable_minimal_hazard,
+ reusable_minimal_impfset,
+ reusable_snapshot,
+)
+from climada.trajectories import StaticRiskTrajectory
+from climada.trajectories.constants import (
+ AAI_METRIC_NAME,
+ DATE_COL_NAME,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ NO_MEASURE_VALUE,
+ RISK_COL_NAME,
+ UNIT_COL_NAME,
+)
+from climada.trajectories.snapshot import Snapshot
+from climada.trajectories.trajectory import DEFAULT_RP
+
+
+class TestStaticTrajectory(TestCase):
+ PRESENT_DATE = 2020
+ HAZ_INCREASE_INTENSITY_FACTOR = 2
+ EXP_INCREASE_VALUE_FACTOR = 10
+ FUTURE_DATE = 2040
+
+ def setUp(self) -> None:
+ self.base_snapshot = reusable_snapshot(date=self.PRESENT_DATE)
+ self.future_snapshot = reusable_snapshot(
+ hazard_intensity_increase_factor=self.HAZ_INCREASE_INTENSITY_FACTOR,
+ exposure_value_increase_factor=self.EXP_INCREASE_VALUE_FACTOR,
+ date=self.FUTURE_DATE,
+ )
+
+ self.expected_base_imp = ImpactCalc(
+ **self.base_snapshot.impact_calc_data
+ ).impact()
+ self.expected_future_imp = ImpactCalc(
+ **self.future_snapshot.impact_calc_data
+ ).impact()
+ # self.group_vector = self.base_snapshot.exposure.gdf[GROUP_ID_COL_NAME]
+ self.expected_base_return_period_impacts = {
+ rp: imp
+ for rp, imp in zip(
+ self.expected_base_imp.calc_freq_curve(DEFAULT_RP).return_per,
+ self.expected_base_imp.calc_freq_curve(DEFAULT_RP).impact,
+ )
+ }
+ self.expected_future_return_period_impacts = {
+ rp: imp
+ for rp, imp in zip(
+ self.expected_future_imp.calc_freq_curve(DEFAULT_RP).return_per,
+ self.expected_future_imp.calc_freq_curve(DEFAULT_RP).impact,
+ )
+ }
+
+ # fmt: off
+ self.expected_static_metrics = pd.DataFrame.from_dict(
+ {'index': [0, 1, 2, 3, 4, 5, 6, 7],
+ 'columns': [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME, UNIT_COL_NAME, RISK_COL_NAME],
+ 'data': [
+ [pd.Timestamp(str(self.PRESENT_DATE)), 'All', NO_MEASURE_VALUE, 'aai', 'USD', self.expected_base_imp.aai_agg],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 'All', NO_MEASURE_VALUE, 'aai', 'USD', self.expected_future_imp.aai_agg],
+ [pd.Timestamp(str(self.PRESENT_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[0]}', 'USD', self.expected_base_return_period_impacts[DEFAULT_RP[0]]],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[0]}', 'USD', self.expected_future_return_period_impacts[DEFAULT_RP[0]]],
+ [pd.Timestamp(str(self.PRESENT_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[1]}', 'USD', self.expected_base_return_period_impacts[DEFAULT_RP[1]]],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[1]}', 'USD', self.expected_future_return_period_impacts[DEFAULT_RP[1]]],
+ [pd.Timestamp(str(self.PRESENT_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[2]}', 'USD', self.expected_base_return_period_impacts[DEFAULT_RP[2]]],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 'All', NO_MEASURE_VALUE, f'rp_{DEFAULT_RP[2]}', 'USD', self.expected_future_return_period_impacts[DEFAULT_RP[2]]],
+ ],
+ 'index_names': [None],
+ 'column_names': [None]},
+ orient="tight"
+ )
+ # fmt: on
+
+ def test_static_trajectory(self):
+ static_traj = StaticRiskTrajectory([self.base_snapshot, self.future_snapshot])
+ print(static_traj.per_date_risk_metrics())
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ self.expected_static_metrics,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ def test_static_trajectory_one_snap(self):
+ static_traj = StaticRiskTrajectory([self.base_snapshot])
+ expected = pd.DataFrame.from_dict(
+ # fmt: off
+ {
+ "index": [0, 1, 2, 3],
+ "columns": [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME, UNIT_COL_NAME, RISK_COL_NAME,],
+ "data": [
+ [pd.Timestamp(str(self.PRESENT_DATE)), "All", NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_base_imp.aai_agg,],
+ [pd.Timestamp(str(self.PRESENT_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[0]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[0]],],
+ [pd.Timestamp(str(self.PRESENT_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[1]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[1]],],
+ [pd.Timestamp(str(self.PRESENT_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[2]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[2]],],
+ ],
+ "index_names": [None],
+ "column_names": [None],
+ },
+ # fmt: on
+ orient="tight",
+ )
+
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ expected,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ def test_static_trajectory_with_group(self):
+ exp0 = reusable_minimal_exposures(group_id=CATEGORIES)
+ exp1 = reusable_minimal_exposures(
+ group_id=CATEGORIES, increase_value_factor=self.EXP_INCREASE_VALUE_FACTOR
+ )
+ snap0 = Snapshot(
+ exposure=exp0,
+ hazard=reusable_minimal_hazard(),
+ impfset=reusable_minimal_impfset(),
+ date=self.PRESENT_DATE,
+ )
+ snap1 = Snapshot(
+ exposure=exp1,
+ hazard=reusable_minimal_hazard(
+ intensity_factor=self.HAZ_INCREASE_INTENSITY_FACTOR
+ ),
+ impfset=reusable_minimal_impfset(),
+ date=self.FUTURE_DATE,
+ )
+
+ expected_static_metrics = pd.concat(
+ [
+ self.expected_static_metrics,
+ pd.DataFrame.from_dict(
+ # fmt: off
+ {
+ "index": [8, 9, 10, 11],
+ "columns": [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME, UNIT_COL_NAME, RISK_COL_NAME,],
+ "data": [
+ [pd.Timestamp(str(self.PRESENT_DATE)), 1, NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_base_imp.eai_exp[CATEGORIES == 1].sum(),],
+ [pd.Timestamp(str(self.PRESENT_DATE)), 2, NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_base_imp.eai_exp[CATEGORIES == 2].sum(),],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 1, NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_future_imp.eai_exp[CATEGORIES == 1].sum(),],
+ [pd.Timestamp(str(self.FUTURE_DATE)), 2, NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_future_imp.eai_exp[CATEGORIES == 2].sum(),],
+ ],
+ "index_names": [None],
+ "column_names": [None],
+ },
+ # fmt: on
+ orient="tight",
+ ),
+ ]
+ )
+
+ static_traj = StaticRiskTrajectory([snap0, snap1])
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ expected_static_metrics,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ def test_static_trajectory_change_rp(self):
+ static_traj = StaticRiskTrajectory(
+ [self.base_snapshot, self.future_snapshot], return_periods=[10, 60, 1000]
+ )
+ expected = pd.DataFrame.from_dict(
+ # fmt: off
+ {
+ "index": [0, 1, 2, 3, 4, 5, 6, 7],
+ "columns": [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME, UNIT_COL_NAME, RISK_COL_NAME,],
+ "data": [
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_base_imp.aai_agg,],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_future_imp.aai_agg,],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, "rp_10", "USD", 0.0,],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, "rp_10", "USD", 0.0,],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, "rp_60", "USD", 700.0,],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, "rp_60", "USD", 14000.0,],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, "rp_1000", "USD", 1500.0,],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, "rp_1000", "USD", 30000.0,],
+ ],
+ "index_names": [None],
+ "column_names": [None],
+ },
+ # fmt: on
+ orient="tight",
+ )
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ expected,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ # Also check change to other return period
+ static_traj.return_periods = DEFAULT_RP
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ self.expected_static_metrics,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ def test_static_trajectory_risk_disc_rate(self):
+ risk_disc_rate = DiscRates(
+ years=np.array(range(self.PRESENT_DATE, 2041)), rates=np.ones(21) * 0.01
+ )
+ static_traj = StaticRiskTrajectory(
+ [self.base_snapshot, self.future_snapshot], risk_disc_rates=risk_disc_rate
+ )
+ expected = pd.DataFrame.from_dict(
+ # fmt: off
+ {
+ "index": [0, 1, 2, 3, 4, 5, 6, 7],
+ "columns": [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME, UNIT_COL_NAME, RISK_COL_NAME,],
+ "data": [
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_base_imp.aai_agg,],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, AAI_METRIC_NAME, "USD", self.expected_future_imp.aai_agg * ((1 / (1 + 0.01)) ** 20),],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[0]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[0]],],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[0]}", "USD", self.expected_future_return_period_impacts[DEFAULT_RP[0]] * ((1 / (1 + 0.01)) ** 20),],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[1]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[1]],],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[1]}", "USD", self.expected_future_return_period_impacts[DEFAULT_RP[1]] * ((1 / (1 + 0.01)) ** 20),],
+ [pd.Timestamp(str(self.PRESENT_DATE)),"All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[2]}", "USD", self.expected_base_return_period_impacts[DEFAULT_RP[2]],],
+ [pd.Timestamp(str(self.FUTURE_DATE)), "All", NO_MEASURE_VALUE, f"rp_{DEFAULT_RP[2]}", "USD", self.expected_future_return_period_impacts[DEFAULT_RP[2]] * ((1 / (1 + 0.01)) ** 20),],
+ ],
+ "index_names": [None],
+ "column_names": [None],
+ },
+ # fmt: on
+ orient="tight",
+ )
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ expected,
+ check_dtype=False,
+ check_categorical=False,
+ )
+
+ # Also check change to other return period
+ static_traj.risk_disc_rates = None
+ pd.testing.assert_frame_equal(
+ static_traj.per_date_risk_metrics(),
+ self.expected_static_metrics,
+ check_dtype=False,
+ check_categorical=False,
+ )
diff --git a/climada/trajectories/__init__.py b/climada/trajectories/__init__.py
new file mode 100644
index 0000000000..716081568e
--- /dev/null
+++ b/climada/trajectories/__init__.py
@@ -0,0 +1,30 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This module implements risk trajectory objects which enable computation and
+possibly interpolation of risk metric over multiple dates.
+
+"""
+
+from .snapshot import Snapshot
+from .static_trajectory import StaticRiskTrajectory
+
+__all__ = [
+ "Snapshot",
+ "StaticRiskTrajectory",
+]
diff --git a/climada/trajectories/calc_risk_metrics.py b/climada/trajectories/calc_risk_metrics.py
new file mode 100644
index 0000000000..674b3eb599
--- /dev/null
+++ b/climada/trajectories/calc_risk_metrics.py
@@ -0,0 +1,357 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This modules implements the CalcRiskMetrics classes.
+
+CalcRiskMetrics are used to compute risk metrics (and intermediate requirements)
+in between two snapshots.
+
+As these computations are not always required and can become "heavy", a so called "lazy"
+approach is used: computation is only done when required, and then stored.
+
+"""
+
+import logging
+
+import numpy as np
+import pandas as pd
+
+from climada.engine.impact import Impact
+from climada.entity.measures.base import Measure
+from climada.trajectories.constants import (
+ AAI_METRIC_NAME,
+ COORD_ID_COL_NAME,
+ DATE_COL_NAME,
+ EAI_METRIC_NAME,
+ GROUP_COL_NAME,
+ GROUP_ID_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ NO_MEASURE_VALUE,
+ RISK_COL_NAME,
+ RP_VALUE_PREFIX,
+ UNIT_COL_NAME,
+)
+from climada.trajectories.impact_calc_strat import ImpactComputationStrategy
+from climada.trajectories.snapshot import Snapshot
+
+LOGGER = logging.getLogger(__name__)
+
+__all__ = [
+ "CalcRiskMetricsPoints",
+]
+
+_CACHE_SETTINGS = {"ENABLE_LAZY_CACHE": False}
+
+
+def lazy_property(method):
+ """
+ Decorator that converts a method into a cached, lazy-evaluated property.
+
+ This decorator is intended for properties that require heavy computation.
+ The result is calculated only when first accessed and then stored in a
+ corresponding private attribute (e.g., a method named `impact` will
+ cache its result in `_impact`).
+
+ Parameters
+ ----------
+ method : callable
+ The method to be converted into a lazy property.
+
+ Returns
+ -------
+ property
+ A property object that handles the caching logic and attribute access.
+
+ Notes
+ -----
+ The caching behavior can be globally toggled via the
+ `_CACHE_SETTINGS["ENABLE_LAZY_CACHE"]` flag. If disabled, the
+ method will be re-evaluated on every access.
+
+ """
+ attr_name = f"_{method.__name__}"
+
+ @property
+ def _lazy(self):
+ if not _CACHE_SETTINGS.get("ENABLE_LAZY_CACHE", True):
+ return method(self)
+
+ if getattr(self, attr_name) is None:
+ setattr(self, attr_name, method(self))
+
+ return getattr(self, attr_name)
+
+ return _lazy
+
+
+class CalcRiskMetricsPoints:
+ """This class handles the computation of impacts for a list of `Snapshot`.
+
+ Note that most attribute like members are properties with their own docstring.
+
+ Attributes
+ ----------
+
+ impact_computation_strategy: ImpactComputationStrategy, optional
+ The method used to calculate the impact from the (Haz,Exp,Vul) of the snapshots.
+ Defaults to ImpactCalc
+ measure: Measure, optional
+ The measure applied to snapshots. Defaults to None.
+
+ Notes
+ -----
+
+ This class is intended for internal computation.
+ """
+
+ def __init__(
+ self,
+ snapshots: list[Snapshot],
+ impact_computation_strategy: ImpactComputationStrategy,
+ ) -> None:
+ """Initialize a new `CalcRiskMetricsPoints`
+
+ This initializes and instantiate a new `CalcRiskMetricsPoints` object.
+ No computation is done at initialisation and only done "just in time".
+
+ Parameters
+ ----------
+ snapshots : List[Snapshot]
+ The `Snapshot` list to compute risk for.
+ impact_computation_strategy: ImpactComputationStrategy, optional
+ The method used to calculate the impact from the (Haz,Exp,Vul) of the two snapshots.
+ Defaults to ImpactCalc
+
+ """
+
+ self._reset_impact_data()
+ self.snapshots = snapshots
+ self.impact_computation_strategy = impact_computation_strategy
+ self._date_idx = pd.DatetimeIndex(
+ [snap.date for snap in self.snapshots], name=DATE_COL_NAME
+ )
+ self.measure = None
+ try:
+ self._group_id = np.unique(
+ np.concatenate(
+ [
+ snap.exposure.gdf[GROUP_ID_COL_NAME]
+ for snap in self.snapshots
+ if GROUP_ID_COL_NAME in snap.exposure.gdf.columns
+ ]
+ )
+ )
+ except ValueError as exc:
+ error_message = str(exc).lower()
+ if "need at least one array to concatenate" in error_message:
+ self._group_id = np.array([])
+
+ def _reset_impact_data(self):
+ """Util method that resets computed data, for instance when
+ changing the computation strategy.
+
+ """
+ self._impacts = None
+ self._eai_gdf = None
+ self._per_date_eai = None
+ self._per_date_aai = None
+
+ @property
+ def impact_computation_strategy(self) -> ImpactComputationStrategy:
+ """The method used to calculate the impact from the (Haz,Exp,Vul)
+ of the snapshots.
+
+ """
+ return self._impact_computation_strategy
+
+ @impact_computation_strategy.setter
+ def impact_computation_strategy(self, value, /):
+ if not isinstance(value, ImpactComputationStrategy):
+ raise ValueError("Not an impact computation strategy")
+
+ self._impact_computation_strategy = value
+ self._reset_impact_data()
+
+ @lazy_property
+ def impacts(self) -> list[Impact]:
+ """Return Impact object for the different snapshots."""
+
+ return [
+ self.impact_computation_strategy.compute_impacts(
+ snap.exposure, snap.hazard, snap.impfset
+ )
+ for snap in self.snapshots
+ ]
+
+ @lazy_property
+ def per_date_eai(self) -> np.ndarray:
+ """Expected annual impacts per snapshot."""
+
+ return np.array([imp.eai_exp for imp in self.impacts])
+
+ @lazy_property
+ def per_date_aai(self) -> np.ndarray:
+ """Average annual impacts per snapshot."""
+
+ return np.array([imp.aai_agg for imp in self.impacts])
+
+ def calc_eai_gdf(self) -> pd.DataFrame:
+ """Convenience function returning a DataFrame
+ from `per_date_eai`.
+
+ This can easily be merged with the GeoDataFrame of
+ the exposure object of one of the `Snapshot`.
+
+ Notes
+ -----
+
+ The DataFrame from the first snapshot of the list is used
+ as a basis (notably for `value` and `group_id`).
+
+ """
+
+ metric_df = pd.DataFrame(self.per_date_eai, index=self._date_idx)
+ metric_df = metric_df.reset_index().melt(
+ id_vars=DATE_COL_NAME, var_name=COORD_ID_COL_NAME, value_name=RISK_COL_NAME
+ )
+ eai_gdf = pd.concat(
+ [
+ snap.exposure.gdf.reset_index(names=[COORD_ID_COL_NAME]).assign(
+ date=pd.to_datetime(snap.date)
+ )
+ for snap in self.snapshots
+ ]
+ )
+ if GROUP_ID_COL_NAME in eai_gdf.columns:
+ eai_gdf = eai_gdf[[DATE_COL_NAME, COORD_ID_COL_NAME, GROUP_ID_COL_NAME]]
+ else:
+ eai_gdf[[GROUP_ID_COL_NAME]] = pd.NA
+ eai_gdf = eai_gdf[[DATE_COL_NAME, COORD_ID_COL_NAME, GROUP_ID_COL_NAME]]
+
+ eai_gdf = eai_gdf.merge(metric_df, on=[DATE_COL_NAME, COORD_ID_COL_NAME])
+ eai_gdf = eai_gdf.rename(columns={GROUP_ID_COL_NAME: GROUP_COL_NAME})
+ eai_gdf[GROUP_COL_NAME] = pd.Categorical(
+ eai_gdf[GROUP_COL_NAME], categories=self._group_id
+ )
+ eai_gdf[METRIC_COL_NAME] = EAI_METRIC_NAME
+ eai_gdf[MEASURE_COL_NAME] = (
+ self.measure.name if self.measure else NO_MEASURE_VALUE
+ )
+ eai_gdf[UNIT_COL_NAME] = self.snapshots[0].exposure.value_unit
+ return eai_gdf
+
+ def calc_aai_metric(self) -> pd.DataFrame:
+ """Compute a DataFrame of the AAI for each snapshot."""
+
+ aai_df = pd.DataFrame(
+ index=self._date_idx, columns=[RISK_COL_NAME], data=self.per_date_aai
+ )
+ aai_df[GROUP_COL_NAME] = pd.Categorical(
+ [pd.NA] * len(aai_df), categories=self._group_id
+ )
+ aai_df[METRIC_COL_NAME] = AAI_METRIC_NAME
+ aai_df[MEASURE_COL_NAME] = (
+ self.measure.name if self.measure else NO_MEASURE_VALUE
+ )
+ aai_df[UNIT_COL_NAME] = self.snapshots[0].exposure.value_unit
+ aai_df.reset_index(inplace=True)
+ return aai_df
+
+ def calc_aai_per_group_metric(self) -> pd.DataFrame | None:
+ """Compute a DataFrame of the AAI distinguised per group id
+ in the exposures, for each snapshot.
+
+ """
+
+ if len(self._group_id) < 1:
+ LOGGER.warning(
+ "No group id defined in the Exposures object. Per group aai will be empty."
+ )
+ return None
+
+ eai_pres_groups = self.calc_eai_gdf()[
+ [DATE_COL_NAME, COORD_ID_COL_NAME, GROUP_COL_NAME, RISK_COL_NAME]
+ ].copy()
+ aai_per_group_df = eai_pres_groups.groupby(
+ [DATE_COL_NAME, GROUP_COL_NAME], as_index=False, observed=True
+ )[RISK_COL_NAME].sum()
+ aai_per_group_df[METRIC_COL_NAME] = AAI_METRIC_NAME
+ aai_per_group_df[MEASURE_COL_NAME] = (
+ self.measure.name if self.measure else NO_MEASURE_VALUE
+ )
+ aai_per_group_df[UNIT_COL_NAME] = self.snapshots[0].exposure.value_unit
+ return aai_per_group_df
+
+ def calc_return_periods_metric(self, return_periods: list[int]) -> pd.DataFrame:
+ """Compute a DataFrame of the estimated impacts for a list
+ of return periods, for each snapshot.
+
+ Parameters
+ ----------
+
+ return_periods : list of int
+ The return periods to estimate impacts for.
+ """
+
+ per_date_rp = np.array(
+ [
+ imp.calc_freq_curve(return_per=return_periods).impact
+ for imp in self.impacts
+ ]
+ )
+ rp_df = pd.DataFrame(
+ index=self._date_idx, columns=return_periods, data=per_date_rp
+ ).melt(value_name=RISK_COL_NAME, var_name="rp", ignore_index=False)
+ rp_df.reset_index(inplace=True)
+ rp_df[GROUP_COL_NAME] = pd.Categorical(
+ [pd.NA] * len(rp_df), categories=self._group_id
+ )
+ rp_df[METRIC_COL_NAME] = RP_VALUE_PREFIX + "_" + rp_df["rp"].astype(str)
+ rp_df = rp_df.drop("rp", axis=1)
+ rp_df[MEASURE_COL_NAME] = (
+ self.measure.name if self.measure else NO_MEASURE_VALUE
+ )
+ rp_df[UNIT_COL_NAME] = self.snapshots[0].exposure.value_unit
+ return rp_df
+
+ def apply_measure(self, measure: Measure) -> "CalcRiskMetricsPoints":
+ """Creates a new `CalcRiskMetricsPoints` object with a measure.
+
+ The given measure is applied to both snapshot of the risk period.
+
+ Parameters
+ ----------
+ measure : Measure
+ The measure to apply.
+
+ Returns
+ -------
+
+ CalcRiskPeriod
+ The risk period with given measure applied.
+
+ """
+ snapshots = [snap.apply_measure(measure) for snap in self.snapshots]
+ risk_period = CalcRiskMetricsPoints(
+ snapshots,
+ self.impact_computation_strategy,
+ )
+
+ risk_period.measure = measure
+ return risk_period
diff --git a/climada/trajectories/constants.py b/climada/trajectories/constants.py
new file mode 100644
index 0000000000..969e585531
--- /dev/null
+++ b/climada/trajectories/constants.py
@@ -0,0 +1,55 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Define constants for trajectories module.
+"""
+
+DEFAULT_TIME_RESOLUTION = "Y"
+DATE_COL_NAME = "date"
+PERIOD_COL_NAME = "period"
+GROUP_COL_NAME = "group"
+GROUP_ID_COL_NAME = "group_id"
+MEASURE_COL_NAME = "measure"
+NO_MEASURE_VALUE = "no_measure"
+METRIC_COL_NAME = "metric"
+UNIT_COL_NAME = "unit"
+RISK_COL_NAME = "risk"
+COORD_ID_COL_NAME = "coord_id"
+
+DEFAULT_PERIOD_INDEX_NAME = "date"
+
+DEFAULT_RP = (20, 50, 100)
+"""Default return periods to use when computing return period impact estimates."""
+
+DEFAULT_ALLGROUP_NAME = "All"
+"""Default string to use to define the exposure subgroup containing all exposure points."""
+
+EAI_METRIC_NAME = "eai"
+AAI_METRIC_NAME = "aai"
+AAI_PER_GROUP_METRIC_NAME = "aai_per_group"
+CONTRIBUTIONS_METRIC_NAME = "risk_contributions"
+RETURN_PERIOD_METRIC_NAME = "return_periods"
+RP_VALUE_PREFIX = "rp"
+
+
+CONTRIBUTION_BASE_RISK_NAME = "base risk"
+CONTRIBUTION_TOTAL_RISK_NAME = "total risk"
+CONTRIBUTION_EXPOSURE_NAME = "exposure contribution"
+CONTRIBUTION_HAZARD_NAME = "hazard contribution"
+CONTRIBUTION_VULNERABILITY_NAME = "vulnerability contribution"
+CONTRIBUTION_INTERACTION_TERM_NAME = "interaction contribution"
diff --git a/climada/trajectories/impact_calc_strat.py b/climada/trajectories/impact_calc_strat.py
new file mode 100644
index 0000000000..b1bb6eebd3
--- /dev/null
+++ b/climada/trajectories/impact_calc_strat.py
@@ -0,0 +1,114 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This modules implements the impact computation strategy objects for risk
+trajectories.
+
+"""
+
+from abc import ABC, abstractmethod
+
+from climada.engine.impact import Impact
+from climada.engine.impact_calc import ImpactCalc
+from climada.entity.exposures.base import Exposures
+from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet
+from climada.hazard.base import Hazard
+
+__all__ = ["ImpactCalcComputation"]
+
+
+# The following is acceptable.
+# We design a pattern, and currently it requires only to
+# define the compute_impacts method.
+# pylint: disable=too-few-public-methods
+class ImpactComputationStrategy(ABC):
+ """
+ Interface for impact computation strategies.
+
+ This abstract class defines the contract for all concrete strategies
+ responsible for calculating and optionally modifying with a risk transfer,
+ the impact computation, based on a set of inputs (exposure, hazard, vulnerability).
+
+ It revolves around a `compute_impacts()` method that takes as arguments
+ the three dimensions of risk (exposure, hazard, vulnerability) and return an
+ Impact object.
+ """
+
+ @abstractmethod
+ def compute_impacts(
+ self,
+ exp: Exposures,
+ haz: Hazard,
+ vul: ImpactFuncSet,
+ ) -> Impact:
+ """
+ Calculates the total impact, including optional risk transfer application.
+
+ Parameters
+ ----------
+ exp : Exposures
+ The exposure data.
+ haz : Hazard
+ The hazard data (e.g., event intensity).
+ vul : ImpactFuncSet
+ The set of vulnerability functions.
+
+ Returns
+ -------
+ Impact
+ An object containing the computed total impact matrix and metrics.
+
+ See Also
+ --------
+ ImpactCalcComputation : The default implementation of this interface.
+ """
+
+
+class ImpactCalcComputation(ImpactComputationStrategy):
+ r"""
+ Default impact computation strategy using the core engine of climada.
+
+ This strategy first calculates the raw impact using the standard
+ :class:`ImpactCalc` logic.
+
+ """
+
+ def compute_impacts(
+ self,
+ exp: Exposures,
+ haz: Hazard,
+ vul: ImpactFuncSet,
+ ) -> Impact:
+ """
+ Calculates the impact.
+
+ Parameters
+ ----------
+ exp : Exposures
+ The exposure data.
+ haz : Hazard
+ The hazard data.
+ vul : ImpactFuncSet
+ The set of vulnerability functions.
+
+ Returns
+ -------
+ Impact
+ The final impact object.
+ """
+ return ImpactCalc(exposures=exp, impfset=vul, hazard=haz).impact()
diff --git a/climada/trajectories/snapshot.py b/climada/trajectories/snapshot.py
new file mode 100644
index 0000000000..cf93171e0e
--- /dev/null
+++ b/climada/trajectories/snapshot.py
@@ -0,0 +1,232 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This modules implements the Snapshot class.
+
+Snapshot are used to store a snapshot of Exposure, Hazard and Vulnerability
+at a specific date.
+
+"""
+
+import copy
+import datetime
+import logging
+import warnings
+
+from climada.entity.exposures import Exposures
+from climada.entity.impact_funcs import ImpactFuncSet
+from climada.entity.measures.base import Measure
+from climada.hazard import Hazard
+
+LOGGER = logging.getLogger(__name__)
+
+__all__ = ["Snapshot"]
+
+
+class Snapshot:
+ """
+ A snapshot of exposure, hazard, and impact function at a specific date.
+
+ Parameters
+ ----------
+ exposure : Exposures
+ hazard : Hazard
+ impfset : ImpactFuncSet
+ date : int | datetime.date | str
+ The date of the Snapshot, it can be an integer representing a year,
+ a datetime object or a string representation of a datetime object
+ with format "YYYY-MM-DD".
+ ref_only : bool, default False
+ Should the `Snapshot` contain deep copies of the Exposures, Hazard and Impfset (False)
+ or references only (True).
+
+ Attributes
+ ----------
+ date : datetime
+ Date of the snapshot.
+ measure: Measure | None
+ The possible measure applied to the snapshot.
+
+ Notes
+ -----
+
+ The object creates deep copies of the exposure hazard and impact function set.
+
+ Also note that exposure, hazard and impfset are read-only properties.
+ Consider snapshot as immutable objects.
+
+ To create a snapshot with a measure, create a snapshot `snap` without
+ the measure and call `snap.apply_measure(measure)`, which returns a new Snapshot object
+ with the measure applied to its risk dimensions.
+ """
+
+ def __init__(
+ self,
+ *,
+ exposure: Exposures,
+ hazard: Hazard,
+ impfset: ImpactFuncSet,
+ measure: Measure | None,
+ date: int | datetime.date | str,
+ ref_only: bool = False,
+ _from_factory: bool = False,
+ ) -> None:
+ if not _from_factory:
+ warnings.warn(
+ "Direct instantiation of 'Snapshot' is discouraged. "
+ "Use 'Snapshot.from_triplet()' instead.",
+ UserWarning,
+ stacklevel=2,
+ )
+ self._exposure = exposure if ref_only else copy.deepcopy(exposure)
+ self._hazard = hazard if ref_only else copy.deepcopy(hazard)
+ self._impfset = impfset if ref_only else copy.deepcopy(impfset)
+ self._measure = measure if ref_only else copy.deepcopy(measure)
+ self._date = self._convert_to_date(date)
+
+ @classmethod
+ def from_triplet(
+ cls,
+ *,
+ exposure: Exposures,
+ hazard: Hazard,
+ impfset: ImpactFuncSet,
+ date: int | datetime.date | str,
+ ref_only: bool = False,
+ ) -> "Snapshot":
+ """Create a Snapshot from exposure, hazard and impact functions set
+
+ This method is the main point of entry for the creation of Snapshot. It
+ creates a new Snapshot object for the given date with copies of the
+ hazard, exposure and impact function set given in argument (or
+ references if ref_only is True)
+
+ Parameters
+ ----------
+ exposure : Exposures
+ hazard : Hazard
+ impfset : ImpactFuncSet
+ date : int | datetime.date | str
+ ref_only : bool
+ If true, uses references to the exposure, hazard and impact
+ function objects. Note that modifying the original objects after
+ computations using the Snapshot might lead to inconsistencies in
+ results.
+
+ Returns
+ -------
+ Snapshot
+
+ Notes
+ -----
+
+ To create a Snapshot with a measure, first create the Snapshot without
+ the measure using this method, and use `apply_measure(measure)` afterward.
+
+ """
+ return cls(
+ exposure=exposure,
+ hazard=hazard,
+ impfset=impfset,
+ measure=None,
+ date=date,
+ ref_only=ref_only,
+ _from_factory=True,
+ )
+
+ @property
+ def exposure(self) -> Exposures:
+ """Exposure data for the snapshot."""
+ return self._exposure
+
+ @property
+ def hazard(self) -> Hazard:
+ """Hazard data for the snapshot."""
+ return self._hazard
+
+ @property
+ def impfset(self) -> ImpactFuncSet:
+ """Impact function set data for the snapshot."""
+ return self._impfset
+
+ @property
+ def measure(self) -> Measure | None:
+ """(Adaptation) Measure data for the snapshot."""
+ return self._measure
+
+ @property
+ def date(self) -> datetime.date:
+ """Date of the snapshot."""
+ return self._date
+
+ @property
+ def impact_calc_data(self) -> dict:
+ """Convenience function for ImpactCalc class."""
+ return {
+ "exposures": self.exposure,
+ "hazard": self.hazard,
+ "impfset": self.impfset,
+ }
+
+ @staticmethod
+ def _convert_to_date(date_arg) -> datetime.date:
+ """Convert date argument of type int or str to a datetime.date object."""
+ if isinstance(date_arg, int):
+ # Assume the integer represents a year
+ return datetime.date(date_arg, 1, 1)
+ if isinstance(date_arg, str):
+ # Try to parse the string as a date
+ try:
+ return datetime.datetime.strptime(date_arg, "%Y-%m-%d").date()
+ except ValueError as exc:
+ raise ValueError("String must be in the format 'YYYY-MM-DD'") from exc
+ if isinstance(date_arg, datetime.date):
+ # Already a date object
+ return date_arg
+
+ raise TypeError("date_arg must be an int, str, or datetime.date")
+
+ def apply_measure(self, measure: Measure) -> "Snapshot":
+ """Create a new snapshot by applying a Measure object.
+
+ This method creates a new `Snapshot` object by applying a measure on
+ the current one.
+
+ Parameters
+ ----------
+ measure : Measure
+ The measure to be applied to the snapshot.
+
+ Returns
+ -------
+ The Snapshot with the measure applied.
+
+ """
+
+ LOGGER.debug("Applying measure %s on snapshot %s", measure.name, id(self))
+ exp, impfset, haz = measure.apply(self.exposure, self.impfset, self.hazard)
+ snap = Snapshot(
+ exposure=exp,
+ hazard=haz,
+ impfset=impfset,
+ date=self.date,
+ measure=measure,
+ ref_only=True, # Avoid unecessary copies of new objects
+ _from_factory=True,
+ )
+ return snap
diff --git a/climada/trajectories/static_trajectory.py b/climada/trajectories/static_trajectory.py
new file mode 100644
index 0000000000..42a9e8b84a
--- /dev/null
+++ b/climada/trajectories/static_trajectory.py
@@ -0,0 +1,320 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This file implements \"static\" risk trajectory objects, for an easier evaluation
+of risk at multiple points in time (snapshots).
+
+"""
+
+import logging
+from typing import Iterable
+
+import pandas as pd
+
+from climada.entity.disc_rates.base import DiscRates
+from climada.trajectories.calc_risk_metrics import CalcRiskMetricsPoints
+from climada.trajectories.constants import (
+ AAI_METRIC_NAME,
+ AAI_PER_GROUP_METRIC_NAME,
+ COORD_ID_COL_NAME,
+ DATE_COL_NAME,
+ EAI_METRIC_NAME,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ RETURN_PERIOD_METRIC_NAME,
+)
+from climada.trajectories.impact_calc_strat import (
+ ImpactCalcComputation,
+ ImpactComputationStrategy,
+)
+from climada.trajectories.snapshot import Snapshot
+from climada.trajectories.trajectory import (
+ DEFAULT_ALLGROUP_NAME,
+ DEFAULT_DF_COLUMN_PRIORITY,
+ DEFAULT_RP,
+ RiskTrajectory,
+)
+from climada.util import log_level
+from climada.util.dataframe_handling import reorder_dataframe_columns
+
+LOGGER = logging.getLogger(__name__)
+
+__all__ = ["StaticRiskTrajectory"]
+
+
+class StaticRiskTrajectory(RiskTrajectory):
+ """This class implements static risk trajectories, objects that
+ regroup impacts computations for multiple dates.
+
+ This class computes risk metrics over a series of snapshots,
+ optionally applying risk discounting. It does not interpolate risk
+ between the snapshot and only provides results for each snapshot.
+
+ """
+
+ POSSIBLE_METRICS = [
+ EAI_METRIC_NAME,
+ AAI_METRIC_NAME,
+ RETURN_PERIOD_METRIC_NAME,
+ AAI_PER_GROUP_METRIC_NAME,
+ ]
+ """Class variable listing the risk metrics that can be computed.
+
+ Currently:
+
+ - eai, expected impact (per exposure point within a period of 1/frequency
+ unit of the hazard object)
+ - aai, average annual impact (aggregated eai over the whole exposure)
+ - aai_per_group, average annual impact per exposure subgroup (defined from
+ the exposure geodataframe)
+ - return_periods, estimated impacts aggregated over the whole exposure for
+ different return periods
+
+ """
+
+ _DEFAULT_ALL_METRICS = [
+ AAI_METRIC_NAME,
+ RETURN_PERIOD_METRIC_NAME,
+ AAI_PER_GROUP_METRIC_NAME,
+ ]
+
+ def __init__(
+ self,
+ snapshots_list: Iterable[Snapshot],
+ *,
+ return_periods: Iterable[int] = DEFAULT_RP,
+ all_groups_name: str = DEFAULT_ALLGROUP_NAME,
+ risk_disc_rates: DiscRates | None = None,
+ impact_computation_strategy: ImpactComputationStrategy | None = None,
+ ):
+ """Initialize a new `StaticRiskTrajectory`.
+
+ Parameters
+ ----------
+ snapshots_list : list[Snapshot]
+ The list of `Snapshot` object to compute risk from.
+ return_periods: list[int], optional
+ The return periods to use when computing the `return_periods_metric`.
+ Defaults to `DEFAULT_RP` ([20, 50, 100]).
+ all_groups_name: str, optional
+ The string that should be used to define "all exposure points" subgroup.
+ Defaults to `DEFAULT_ALLGROUP_NAME` ("All").
+ risk_disc_rates: DiscRates, optional
+ The discount rate to apply to future risk. Defaults to None.
+ impact_computation_strategy: ImpactComputationStrategy, optional
+ The method used to calculate the impact from the (Haz,Exp,Vul)
+ of the two snapshots. Defaults to :class:`ImpactCalcComputation`.
+
+ """
+ super().__init__(
+ snapshots_list,
+ return_periods=return_periods,
+ all_groups_name=all_groups_name,
+ risk_disc_rates=risk_disc_rates,
+ )
+ self._risk_metrics_calculators = CalcRiskMetricsPoints(
+ self._snapshots,
+ impact_computation_strategy=impact_computation_strategy
+ or ImpactCalcComputation(),
+ )
+
+ @property
+ def impact_computation_strategy(self) -> ImpactComputationStrategy:
+ """The approach or strategy used to calculate the impact from the snapshots."""
+ return self._risk_metrics_calculators.impact_computation_strategy
+
+ @impact_computation_strategy.setter
+ def impact_computation_strategy(self, value, /):
+ if not isinstance(value, ImpactComputationStrategy):
+ raise ValueError("Not an interpolation strategy")
+
+ self._reset_metrics()
+ self._risk_metrics_calculators.impact_computation_strategy = value
+
+ def _generic_metrics(
+ self,
+ /,
+ metric_name: str | None = None,
+ metric_meth: str | None = None,
+ **kwargs,
+ ) -> pd.DataFrame:
+ """Generic method to compute metrics based on the provided metric name and method.
+
+ This method calls the appropriate method from the calculator to return
+ the results for the given metric, in a tidy formatted dataframe.
+
+ It first checks whether the requested metric is a valid one.
+ Then looks for a possible cached value and otherwised asks the
+ calculators (`self._risk_metric_calculators`) to run the computation.
+ The results are then regrouped in a nice and tidy DataFrame.
+ If a `risk_disc_rates` was set, values are converted to net present values.
+ Results are then cached within `self.__metrics` and returned.
+
+ Parameters
+ ----------
+ metric_name : str, optional
+ The name of the metric to return results for.
+ metric_meth : str, optional
+ The name of the specific method of the calculator to call.
+
+ Returns
+ -------
+ pd.DataFrame
+ A tidy formatted dataframe of the risk metric computed for the
+ different snapshots.
+
+ Raises
+ ------
+ NotImplementedError
+ If the requested metric is not part of `POSSIBLE_METRICS`.
+ ValueError
+ If either of the arguments are not provided.
+
+ """
+ if metric_name is None or metric_meth is None:
+ raise ValueError("Both metric_name and metric_meth must be provided.")
+
+ if metric_name not in self.POSSIBLE_METRICS:
+ raise NotImplementedError(
+ f"{metric_name} not implemented ({self.POSSIBLE_METRICS})."
+ )
+
+ # Construct the attribute name for storing the metric results
+ attr_name = f"_{metric_name}_metrics"
+
+ if getattr(self, attr_name) is not None:
+ LOGGER.debug("Returning cached %s", attr_name)
+ return getattr(self, attr_name)
+
+ with log_level(level="WARNING", name_prefix="climada"):
+ tmp = getattr(self._risk_metrics_calculators, metric_meth)(**kwargs)
+ if tmp is None:
+ return tmp
+
+ tmp = tmp.set_index(
+ [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME]
+ )
+ if COORD_ID_COL_NAME in tmp.columns:
+ tmp = tmp.set_index([COORD_ID_COL_NAME], append=True)
+
+ # When more than 2 snapshots, there might be duplicated rows, we need to remove them.
+ # Should not be the case in static trajectory, but in any case we really don't want
+ # duplicated rows, which would mess up some dataframe manipulation down the road.
+ tmp = tmp[~tmp.index.duplicated(keep="first")]
+ tmp = tmp.reset_index()
+ if self._all_groups_name not in tmp[GROUP_COL_NAME].cat.categories:
+ tmp[GROUP_COL_NAME] = tmp[GROUP_COL_NAME].cat.add_categories(
+ [self._all_groups_name]
+ )
+ tmp[GROUP_COL_NAME] = tmp[GROUP_COL_NAME].fillna(self._all_groups_name)
+
+ if self._risk_disc_rates:
+ tmp = self.npv_transform(tmp, self._risk_disc_rates)
+
+ tmp = reorder_dataframe_columns(tmp, DEFAULT_DF_COLUMN_PRIORITY)
+
+ setattr(self, attr_name, tmp)
+ return getattr(self, attr_name)
+
+ def eai_metrics(self, **kwargs) -> pd.DataFrame:
+ """Return the estimated annual impacts at each exposure point for each date.
+
+ This method computes and return a `DataFrame` with eai metric
+ (for each exposure point) for each date.
+
+ Notes
+ -----
+
+ This computation may become quite expensive for big areas with high resolution.
+
+ """
+ metric_df = self._compute_metrics(
+ metric_name=EAI_METRIC_NAME, metric_meth="calc_eai_gdf", **kwargs
+ )
+ return metric_df
+
+ def aai_metrics(self, **kwargs) -> pd.DataFrame:
+ """Return the average annual impacts for each date.
+
+ This method computes and return a `DataFrame` with aai metric for each date.
+
+ """
+
+ return self._compute_metrics(
+ metric_name=AAI_METRIC_NAME, metric_meth="calc_aai_metric", **kwargs
+ )
+
+ def return_periods_metrics(self, **kwargs) -> pd.DataFrame:
+ """Return the estimated impacts for different return periods.
+
+ Return periods to estimate impacts for are defined by `self.return_periods`.
+
+ """
+ return self._compute_metrics(
+ metric_name=RETURN_PERIOD_METRIC_NAME,
+ metric_meth="calc_return_periods_metric",
+ return_periods=self.return_periods,
+ **kwargs,
+ )
+
+ def aai_per_group_metrics(self, **kwargs) -> pd.DataFrame:
+ """Return the average annual impacts for each exposure group ID.
+
+ This method computes and return a `DataFrame` with aai metric for each
+ of the exposure group defined by a group id, for each date.
+
+ """
+
+ return self._compute_metrics(
+ metric_name=AAI_PER_GROUP_METRIC_NAME,
+ metric_meth="calc_aai_per_group_metric",
+ **kwargs,
+ )
+
+ def per_date_risk_metrics(
+ self,
+ metrics: list[str] | None = None,
+ ) -> pd.DataFrame | pd.Series:
+ """Returns a DataFrame of risk metrics for each dates.
+
+ This methods collects (and if needed computes) the `metrics`
+ (Defaulting to AAI_METRIC_NAME, RETURN_PERIOD_METRIC_NAME and AAI_PER_GROUP_METRIC_NAME).
+
+ Parameters
+ ----------
+ metrics : list[str], optional
+ The list of metrics to return (defaults to
+ [AAI_METRIC_NAME,RETURN_PERIOD_METRIC_NAME,AAI_PER_GROUP_METRIC_NAME])
+
+ Returns
+ -------
+ pd.DataFrame | pd.Series
+ A tidy DataFrame with metric values for all possible dates.
+
+ """
+
+ metrics = (
+ [AAI_METRIC_NAME, RETURN_PERIOD_METRIC_NAME, AAI_PER_GROUP_METRIC_NAME]
+ if metrics is None
+ else metrics
+ )
+ return pd.concat(
+ [getattr(self, f"{metric}_metrics")() for metric in metrics],
+ ignore_index=True,
+ )
diff --git a/climada/trajectories/test/test_calc_risk_metrics.py b/climada/trajectories/test/test_calc_risk_metrics.py
new file mode 100644
index 0000000000..7485f3cd3f
--- /dev/null
+++ b/climada/trajectories/test/test_calc_risk_metrics.py
@@ -0,0 +1,448 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This modules implements different sparce matrices interpolation approaches.
+
+"""
+
+import unittest
+from unittest.mock import MagicMock, call, patch
+
+import numpy as np
+import pandas as pd
+
+# Assuming these are the necessary imports from climada
+from climada.entity.exposures import Exposures
+from climada.entity.impact_funcs import ImpactFuncSet
+from climada.entity.impact_funcs.trop_cyclone import ImpfTropCyclone
+from climada.entity.measures.base import Measure
+from climada.hazard import Hazard
+from climada.trajectories.calc_risk_metrics import CalcRiskMetricsPoints
+from climada.trajectories.constants import (
+ AAI_METRIC_NAME,
+ COORD_ID_COL_NAME,
+ DATE_COL_NAME,
+ EAI_METRIC_NAME,
+ GROUP_COL_NAME,
+ GROUP_ID_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ NO_MEASURE_VALUE,
+ RISK_COL_NAME,
+ UNIT_COL_NAME,
+)
+
+# Import the CalcRiskPeriod class and other necessary classes/functions
+from climada.trajectories.impact_calc_strat import (
+ ImpactCalcComputation,
+ ImpactComputationStrategy,
+)
+from climada.trajectories.snapshot import Snapshot
+from climada.util.constants import EXP_DEMO_H5, HAZ_DEMO_H5
+
+
+class TestCalcRiskMetricsPoints(unittest.TestCase):
+ def setUp(self):
+ # Create mock objects for testing
+ self.present_date = 2020
+ self.future_date = 2025
+ self.exposure_present = Exposures.from_hdf5(EXP_DEMO_H5)
+ self.exposure_present.gdf.rename(columns={"impf_": "impf_TC"}, inplace=True)
+ self.exposure_present.gdf["impf_TC"] = 1
+ self.exposure_present.gdf[GROUP_ID_COL_NAME] = (
+ self.exposure_present.gdf["value"]
+ > self.exposure_present.gdf["value"].mean()
+ ) * 1
+ self.hazard_present = Hazard.from_hdf5(HAZ_DEMO_H5)
+ self.exposure_present.assign_centroids(self.hazard_present, distance="approx")
+ self.impfset_present = ImpactFuncSet([ImpfTropCyclone.from_emanuel_usa()])
+
+ self.exposure_future = Exposures.from_hdf5(EXP_DEMO_H5)
+ n_years = self.future_date - self.present_date + 1
+ growth_rate = 1.02
+ growth = growth_rate**n_years
+ self.exposure_future.gdf["value"] = self.exposure_future.gdf["value"] * growth
+ self.exposure_future.gdf.rename(columns={"impf_": "impf_TC"}, inplace=True)
+ self.exposure_future.gdf["impf_TC"] = 1
+ self.exposure_future.gdf[GROUP_ID_COL_NAME] = (
+ self.exposure_future.gdf["value"] > self.exposure_future.gdf["value"].mean()
+ ) * 1
+ self.hazard_future = Hazard.from_hdf5(HAZ_DEMO_H5)
+ self.hazard_future.intensity *= 1.1
+ self.exposure_future.assign_centroids(self.hazard_future, distance="approx")
+ self.impfset_future = ImpactFuncSet(
+ [
+ ImpfTropCyclone.from_emanuel_usa(impf_id=1, v_half=60.0),
+ ]
+ )
+
+ self.measure = MagicMock(spec=Measure)
+ self.measure.name = "Test Measure"
+
+ # Setup mock return values for measure.apply
+ self.measure_exposure = MagicMock(spec=Exposures)
+ self.measure_hazard = MagicMock(spec=Hazard)
+ self.measure_impfset = MagicMock(spec=ImpactFuncSet)
+ self.measure.apply.return_value = (
+ self.measure_exposure,
+ self.measure_impfset,
+ self.measure_hazard,
+ )
+
+ # Create mock snapshots
+ self.mock_snapshot_start = Snapshot(
+ exposure=self.exposure_present,
+ hazard=self.hazard_present,
+ impfset=self.impfset_present,
+ date=self.present_date,
+ )
+ self.mock_snapshot_end = Snapshot(
+ exposure=self.exposure_future,
+ hazard=self.hazard_future,
+ impfset=self.impfset_future,
+ date=self.future_date,
+ )
+
+ # Create an instance of CalcRiskPeriod
+ self.calc_risk_metrics_points = CalcRiskMetricsPoints(
+ [self.mock_snapshot_start, self.mock_snapshot_end],
+ impact_computation_strategy=ImpactCalcComputation(),
+ )
+
+ self.expected_eai = np.array(
+ [
+ [
+ 8702904.63375606,
+ 7870925.19290905,
+ 1805021.12653289,
+ 3827196.02428828,
+ 5815346.97427834,
+ 7870925.19290905,
+ 7871847.53906951,
+ 7870925.19290905,
+ 7886487.76136572,
+ 7870925.19290905,
+ 7876058.84500811,
+ 3858228.67061225,
+ 8401461.85304853,
+ 9210350.19520265,
+ 1806363.23553602,
+ 6922250.59852326,
+ 6711006.70101515,
+ 6886568.00391817,
+ 6703749.80009753,
+ 6704689.17531993,
+ 6703401.93516038,
+ 6818839.81873556,
+ 6716262.5286998,
+ 6703369.87656195,
+ 6703952.06070945,
+ 5678897.05935781,
+ 4984034.77073219,
+ 6708908.84462217,
+ 6702586.9472999,
+ 4961843.43826371,
+ 5139913.92380089,
+ 5255310.96072403,
+ 4981705.85074492,
+ 4926529.74583162,
+ 4973726.6063121,
+ 4926015.68274236,
+ 4937618.79350358,
+ 4926144.19851468,
+ 4926015.68274236,
+ 9575288.06765627,
+ 5100904.22956578,
+ 3501325.10900064,
+ 5093920.89144773,
+ 3505527.05928994,
+ 4002552.92232482,
+ 3512012.80001039,
+ 3514993.26161994,
+ 3562009.79687436,
+ 3869298.39771648,
+ 3509317.94922485,
+ ],
+ [
+ 46651387.10647343,
+ 42191612.28496882,
+ 14767621.68800634,
+ 24849532.38841432,
+ 32260334.11128166,
+ 42191612.28496882,
+ 42196556.46505447,
+ 42191612.28496882,
+ 42275034.47974126,
+ 42191612.28496882,
+ 42219130.91253302,
+ 24227735.90988531,
+ 45035521.54835925,
+ 49371517.94999501,
+ 14778602.03484606,
+ 39909758.65668079,
+ 38691846.52720026,
+ 39834520.43061425,
+ 38650007.36519716,
+ 38655423.2682883,
+ 38648001.77388126,
+ 39313550.93419428,
+ 38722148.63941796,
+ 38647816.9422419,
+ 38651173.48481285,
+ 33700748.42359267,
+ 30195870.8789255,
+ 38679751.48077733,
+ 38643303.01755095,
+ 30061424.26274527,
+ 31140267.73715352,
+ 31839402.91317674,
+ 30181761.07222111,
+ 29847475.57538872,
+ 30133418.66577969,
+ 29844361.11423809,
+ 29914658.78479145,
+ 29845139.72952577,
+ 29844361.11423809,
+ 58012067.61585025,
+ 30903926.75151934,
+ 23061159.87895984,
+ 33550647.3781805,
+ 23088835.64296583,
+ 26362451.35547444,
+ 23131553.38525813,
+ 23151183.92499699,
+ 23460854.06493051,
+ 24271571.95828693,
+ 23113803.99527559,
+ ],
+ ]
+ )
+
+ self.expected_aai = np.array([2.88895461e08, 1.69310367e09])
+ self.expected_aai_per_group = np.array(
+ [2.33513758e08, 5.53817034e07, 1.37114041e09, 3.21963264e08]
+ )
+ self.expected_return_period_metric = np.array(
+ [
+ 0.00000000e00,
+ 0.00000000e00,
+ 7.10925472e09,
+ 4.53975437e10,
+ 1.36547014e10,
+ 7.69981714e10,
+ ]
+ )
+
+ def test_reset_impact_data(self):
+ self.calc_risk_metrics_points._impacts = "A" # type:ignore
+ self.calc_risk_metrics_points._eai_gdf = "B" # type:ignore
+ self.calc_risk_metrics_points._per_date_eai = "C" # type:ignore
+ self.calc_risk_metrics_points._per_date_aai = "D" # type:ignore
+ self.calc_risk_metrics_points._reset_impact_data()
+ self.assertIsNone(self.calc_risk_metrics_points._impacts)
+ self.assertIsNone(self.calc_risk_metrics_points._eai_gdf)
+ self.assertIsNone(self.calc_risk_metrics_points._per_date_aai)
+ self.assertIsNone(self.calc_risk_metrics_points._per_date_eai)
+
+ def test_set_impact_computation_strategy(self):
+ new_impact_computation_strategy = MagicMock(spec=ImpactComputationStrategy)
+ self.calc_risk_metrics_points.impact_computation_strategy = (
+ new_impact_computation_strategy
+ )
+ self.assertEqual(
+ self.calc_risk_metrics_points.impact_computation_strategy,
+ new_impact_computation_strategy,
+ )
+
+ def test_set_impact_computation_strategy_wtype(self):
+ with self.assertRaises(ValueError):
+ self.calc_risk_metrics_points.impact_computation_strategy = "A"
+
+ @patch.object(CalcRiskMetricsPoints, "impact_computation_strategy")
+ def test_impacts_arrays(self, mock_impact_compute):
+ mock_impact_compute.compute_impacts.side_effect = ["A", "B"]
+ results = self.calc_risk_metrics_points.impacts
+ mock_impact_compute.compute_impacts.assert_has_calls(
+ [
+ call(
+ self.mock_snapshot_start.exposure,
+ self.mock_snapshot_start.hazard,
+ self.mock_snapshot_start.impfset,
+ ),
+ call(
+ self.mock_snapshot_end.exposure,
+ self.mock_snapshot_end.hazard,
+ self.mock_snapshot_end.impfset,
+ ),
+ ]
+ )
+ self.assertEqual(results, ["A", "B"])
+
+ def test_per_date_eai(self):
+ np.testing.assert_allclose(
+ self.calc_risk_metrics_points.per_date_eai, self.expected_eai
+ )
+
+ def test_per_date_aai(self):
+ np.testing.assert_allclose(
+ self.calc_risk_metrics_points.per_date_aai,
+ self.expected_aai,
+ )
+
+ def test_eai_gdf(self):
+ result_gdf = self.calc_risk_metrics_points.calc_eai_gdf()
+ self.assertIsInstance(result_gdf, pd.DataFrame)
+ self.assertEqual(
+ result_gdf.shape[0],
+ len(self.mock_snapshot_start.exposure.gdf)
+ + len(self.mock_snapshot_end.exposure.gdf),
+ )
+ expected_columns = [
+ DATE_COL_NAME,
+ COORD_ID_COL_NAME,
+ GROUP_COL_NAME,
+ RISK_COL_NAME,
+ METRIC_COL_NAME,
+ MEASURE_COL_NAME,
+ UNIT_COL_NAME,
+ ]
+ self.assertTrue(
+ all(col in list(result_gdf.columns) for col in expected_columns)
+ )
+ np.testing.assert_allclose(
+ np.array(result_gdf[RISK_COL_NAME].values), self.expected_eai.flatten()
+ )
+ # Check constants and column transformations
+ self.assertEqual(result_gdf[METRIC_COL_NAME].unique(), EAI_METRIC_NAME)
+ self.assertEqual(result_gdf[MEASURE_COL_NAME].iloc[0], NO_MEASURE_VALUE)
+ self.assertEqual(
+ result_gdf[UNIT_COL_NAME].iloc[0],
+ self.mock_snapshot_start.exposure.value_unit,
+ )
+ self.assertEqual(result_gdf[GROUP_COL_NAME].dtype.name, "category")
+ self.assertListEqual(
+ list(result_gdf[GROUP_COL_NAME].cat.categories),
+ list(self.calc_risk_metrics_points._group_id),
+ )
+
+ def test_calc_aai_metric(self):
+ result_df = self.calc_risk_metrics_points.calc_aai_metric()
+ self.assertIsInstance(result_df, pd.DataFrame)
+ self.assertEqual(
+ result_df.shape[0], len(self.calc_risk_metrics_points.snapshots)
+ )
+ expected_columns = [
+ DATE_COL_NAME,
+ GROUP_COL_NAME,
+ RISK_COL_NAME,
+ METRIC_COL_NAME,
+ MEASURE_COL_NAME,
+ UNIT_COL_NAME,
+ ]
+ self.assertTrue(all(col in result_df.columns for col in expected_columns))
+ np.testing.assert_allclose(
+ np.array(result_df[RISK_COL_NAME].values), self.expected_aai
+ )
+ # Check constants and column transformations
+ self.assertEqual(result_df[METRIC_COL_NAME].unique(), AAI_METRIC_NAME)
+ self.assertEqual(result_df[MEASURE_COL_NAME].iloc[0], NO_MEASURE_VALUE)
+ self.assertEqual(
+ result_df[UNIT_COL_NAME].iloc[0],
+ self.mock_snapshot_start.exposure.value_unit,
+ )
+ self.assertEqual(result_df[GROUP_COL_NAME].dtype.name, "category")
+
+ def test_calc_aai_per_group_metric(self):
+ result_df = self.calc_risk_metrics_points.calc_aai_per_group_metric()
+ self.assertIsInstance(result_df, pd.DataFrame)
+ self.assertEqual(
+ result_df.shape[0],
+ len(self.calc_risk_metrics_points.snapshots)
+ * len(self.calc_risk_metrics_points._group_id),
+ )
+ expected_columns = [
+ DATE_COL_NAME,
+ GROUP_COL_NAME,
+ RISK_COL_NAME,
+ METRIC_COL_NAME,
+ MEASURE_COL_NAME,
+ UNIT_COL_NAME,
+ ]
+ self.assertTrue(all(col in result_df.columns for col in expected_columns))
+ np.testing.assert_allclose(
+ np.array(result_df[RISK_COL_NAME].values), self.expected_aai_per_group
+ )
+ # Check constants and column transformations
+ self.assertEqual(result_df[METRIC_COL_NAME].unique(), AAI_METRIC_NAME)
+ self.assertEqual(result_df[MEASURE_COL_NAME].iloc[0], NO_MEASURE_VALUE)
+ self.assertEqual(
+ result_df[UNIT_COL_NAME].iloc[0],
+ self.mock_snapshot_start.exposure.value_unit,
+ )
+ self.assertEqual(result_df[GROUP_COL_NAME].dtype.name, "category")
+ self.assertListEqual(list(result_df[GROUP_COL_NAME].unique()), [0, 1])
+
+ def test_calc_return_periods_metric(self):
+ result_df = self.calc_risk_metrics_points.calc_return_periods_metric(
+ [20, 50, 100]
+ )
+ self.assertIsInstance(result_df, pd.DataFrame)
+ self.assertEqual(
+ result_df.shape[0], len(self.calc_risk_metrics_points.snapshots) * 3
+ )
+ expected_columns = [
+ DATE_COL_NAME,
+ GROUP_COL_NAME,
+ RISK_COL_NAME,
+ METRIC_COL_NAME,
+ MEASURE_COL_NAME,
+ UNIT_COL_NAME,
+ ]
+ self.assertTrue(all(col in result_df.columns for col in expected_columns))
+ np.testing.assert_allclose(
+ np.array(result_df[RISK_COL_NAME].values),
+ self.expected_return_period_metric,
+ )
+ # Check constants and column transformations
+ self.assertListEqual(
+ list(result_df[METRIC_COL_NAME].unique()), ["rp_20", "rp_50", "rp_100"]
+ )
+ self.assertEqual(result_df[MEASURE_COL_NAME].iloc[0], NO_MEASURE_VALUE)
+ self.assertEqual(
+ result_df[UNIT_COL_NAME].iloc[0],
+ self.mock_snapshot_start.exposure.value_unit,
+ )
+ self.assertEqual(result_df[GROUP_COL_NAME].dtype.name, "category")
+
+ @patch.object(Snapshot, "apply_measure")
+ @patch("climada.trajectories.calc_risk_metrics.CalcRiskMetricsPoints")
+ def test_apply_measure(self, mock_CalcRiskMetricPoints, mock_snap_apply_measure):
+ mock_CalcRiskMetricPoints.return_value = MagicMock(spec=CalcRiskMetricsPoints)
+ mock_snap_apply_measure.return_value = 42
+ result = self.calc_risk_metrics_points.apply_measure(self.measure)
+ mock_snap_apply_measure.assert_called_with(self.measure)
+ mock_CalcRiskMetricPoints.assert_called_with(
+ [42, 42],
+ self.calc_risk_metrics_points.impact_computation_strategy,
+ )
+ self.assertEqual(result.measure, self.measure)
+
+
+if __name__ == "__main__":
+ TESTS = unittest.TestLoader().loadTestsFromTestCase(TestCalcRiskMetricsPoints)
+ unittest.TextTestRunner(verbosity=2).run(TESTS)
diff --git a/climada/trajectories/test/test_impact_calc_strat.py b/climada/trajectories/test/test_impact_calc_strat.py
new file mode 100644
index 0000000000..eb5a53a2c0
--- /dev/null
+++ b/climada/trajectories/test/test_impact_calc_strat.py
@@ -0,0 +1,97 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Tests for impact_calc_strat
+
+"""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from climada.engine import Impact
+from climada.entity import ImpactFuncSet
+from climada.entity.exposures import Exposures
+from climada.hazard import Hazard
+from climada.trajectories import Snapshot
+from climada.trajectories.impact_calc_strat import (
+ ImpactCalcComputation,
+ ImpactComputationStrategy,
+)
+
+# --- Fixtures ---
+
+
+@pytest.fixture
+def mock_snapshot():
+ """Provides a snapshot with mocked exposure, hazard, and impact functions."""
+ snap = MagicMock(spec=Snapshot)
+ snap.exposure = MagicMock(spec=Exposures)
+ snap.hazard = MagicMock(spec=Hazard)
+ snap.impfset = MagicMock(spec=ImpactFuncSet)
+ return snap
+
+
+@pytest.fixture
+def strategy():
+ """Provides an instance of the ImpactCalcComputation strategy."""
+ return ImpactCalcComputation()
+
+
+# --- Tests ---
+def test_interface_compliance(strategy):
+ """Ensure the class correctly inherits from the Abstract Base Class."""
+ assert isinstance(strategy, ImpactComputationStrategy)
+ assert isinstance(strategy, ImpactCalcComputation)
+
+
+def test_compute_impacts(strategy, mock_snapshot):
+ """Test that compute_impacts calls the pre-transfer method correctly."""
+ mock_impacts = MagicMock(spec=Impact)
+
+ # We patch the ImpactCalc within trajectories
+ with patch("climada.trajectories.impact_calc_strat.ImpactCalc") as mock_ImpactCalc:
+ mock_ImpactCalc.return_value.impact.return_value = mock_impacts
+ result = strategy.compute_impacts(
+ exp=mock_snapshot.exposure,
+ haz=mock_snapshot.hazard,
+ vul=mock_snapshot.impfset,
+ )
+ mock_ImpactCalc.assert_called_once_with(
+ exposures=mock_snapshot.exposure,
+ impfset=mock_snapshot.impfset,
+ hazard=mock_snapshot.hazard,
+ )
+ mock_ImpactCalc.return_value.impact.assert_called_once()
+ assert result == mock_impacts
+
+
+def test_cannot_instantiate_abstract_base_class():
+ """Ensure ImpactComputationStrategy cannot be instantiated directly."""
+ with pytest.raises(TypeError, match="Can't instantiate abstract class"):
+ ImpactComputationStrategy() # type: ignore
+
+
+@pytest.mark.parametrize("invalid_input", [None, 123, "string"])
+def test_compute_impacts_type_errors(strategy, invalid_input):
+ """
+ Smoke test: Ensure that if ImpactCalc raises errors due to bad input,
+ the strategy correctly propagates them.
+ """
+ with pytest.raises(AttributeError):
+ strategy.compute_impacts(invalid_input, invalid_input, invalid_input)
diff --git a/climada/trajectories/test/test_snapshot.py b/climada/trajectories/test/test_snapshot.py
new file mode 100644
index 0000000000..cecfa39395
--- /dev/null
+++ b/climada/trajectories/test/test_snapshot.py
@@ -0,0 +1,168 @@
+import datetime
+from unittest.mock import MagicMock
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from climada.entity.exposures import Exposures
+from climada.entity.impact_funcs import ImpactFunc, ImpactFuncSet
+from climada.entity.measures.base import Measure
+from climada.hazard import Hazard
+from climada.trajectories.snapshot import Snapshot
+from climada.util.constants import EXP_DEMO_H5, HAZ_DEMO_H5
+
+# --- Fixtures ---
+
+
+@pytest.fixture(scope="module")
+def shared_data():
+ """Load heavy HDF5 data once per module to speed up tests."""
+ exposure = Exposures.from_hdf5(EXP_DEMO_H5)
+ hazard = Hazard.from_hdf5(HAZ_DEMO_H5)
+ impfset = ImpactFuncSet(
+ [
+ ImpactFunc(
+ "TC",
+ 3,
+ intensity=np.array([0, 20]),
+ mdd=np.array([0, 0.5]),
+ paa=np.array([0, 1]),
+ )
+ ]
+ )
+ return exposure, hazard, impfset
+
+
+@pytest.fixture
+def mock_context(shared_data):
+ """Provides the exposure/hazard/impfset and a pre-configured mock measure."""
+ exp, haz, impf = shared_data
+
+ # Setup Mock Measure
+ mock_measure = MagicMock(spec=Measure)
+ mock_measure.name = "Test Measure"
+
+ modified_exp = MagicMock(spec=Exposures)
+ modified_haz = MagicMock(spec=Hazard)
+ modified_imp = MagicMock(spec=ImpactFuncSet)
+
+ mock_measure.apply.return_value = (modified_exp, modified_imp, modified_haz)
+
+ return {
+ "exp": exp,
+ "haz": haz,
+ "imp": impf,
+ "measure": mock_measure,
+ "mod_exp": modified_exp,
+ "mod_haz": modified_haz,
+ "mod_imp": modified_imp,
+ }
+
+
+# --- Tests ---
+
+
+def test_not_from_factory_warning(mock_context):
+ """Test that direct __init__ call raises a warning"""
+ with pytest.warns(UserWarning):
+ Snapshot(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ measure=None,
+ date=2001,
+ )
+
+
+@pytest.mark.parametrize(
+ "input_date,expected",
+ [
+ (2023, datetime.date(2023, 1, 1)),
+ ("2023-01-01", datetime.date(2023, 1, 1)),
+ (datetime.date(2023, 1, 1), datetime.date(2023, 1, 1)),
+ ],
+)
+def test_init_valid_dates(mock_context, input_date, expected):
+ """Test various valid date input formats using parametrization."""
+ snapshot = Snapshot.from_triplet(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ date=input_date,
+ )
+ assert snapshot.date == expected
+
+
+def test_init_invalid_date_format(mock_context):
+ with pytest.raises(ValueError, match="String must be in the format"):
+ Snapshot.from_triplet(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ date="invalid-date",
+ )
+
+
+def test_init_invalid_date_type(mock_context):
+ with pytest.raises(
+ TypeError, match=r"date_arg must be an int, str, or datetime.date"
+ ):
+ Snapshot.from_triplet(exposure=mock_context["exp"], hazard=mock_context["haz"], impfset=mock_context["imp"], date=2023.5) # type: ignore
+
+
+def test_properties(mock_context):
+ snapshot = Snapshot.from_triplet(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ date=2023,
+ )
+
+ # Check that it's a deep copy (new reference)
+ assert snapshot.exposure is not mock_context["exp"]
+ assert snapshot.hazard is not mock_context["haz"]
+
+ assert snapshot.measure is None
+
+ # Check data equality
+ pd.testing.assert_frame_equal(snapshot.exposure.gdf, mock_context["exp"].gdf)
+ assert snapshot.hazard.haz_type == mock_context["haz"].haz_type
+ assert snapshot.impfset == mock_context["imp"]
+
+
+def test_reference(mock_context):
+ snapshot = Snapshot.from_triplet(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ date=2023,
+ ref_only=True,
+ )
+
+ # Check that it is a reference
+ assert snapshot.exposure is mock_context["exp"]
+ assert snapshot.hazard is mock_context["haz"]
+ assert snapshot.impfset is mock_context["imp"]
+ assert snapshot.measure is None
+
+ # Check data equality
+ pd.testing.assert_frame_equal(snapshot.exposure.gdf, mock_context["exp"].gdf)
+ assert snapshot.hazard.haz_type == mock_context["haz"].haz_type
+ assert snapshot.impfset == mock_context["imp"]
+
+
+def test_apply_measure(mock_context):
+ snapshot = Snapshot.from_triplet(
+ exposure=mock_context["exp"],
+ hazard=mock_context["haz"],
+ impfset=mock_context["imp"],
+ date=2023,
+ )
+ new_snapshot = snapshot.apply_measure(mock_context["measure"])
+
+ assert new_snapshot.measure is not None
+ assert new_snapshot.measure.name == "Test Measure"
+ assert new_snapshot.exposure == mock_context["mod_exp"]
+ assert new_snapshot.hazard == mock_context["mod_haz"]
+ assert new_snapshot.impfset == mock_context["mod_imp"]
diff --git a/climada/trajectories/test/test_static_risk_trajectory.py b/climada/trajectories/test/test_static_risk_trajectory.py
new file mode 100644
index 0000000000..ec4016a021
--- /dev/null
+++ b/climada/trajectories/test/test_static_risk_trajectory.py
@@ -0,0 +1,376 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+unit tests for static_risk_trajectory
+
+"""
+
+import datetime
+from itertools import product
+from unittest.mock import MagicMock, call, patch
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from climada.entity.disc_rates.base import DiscRates
+from climada.trajectories.calc_risk_metrics import CalcRiskMetricsPoints
+from climada.trajectories.constants import (
+ AAI_METRIC_NAME,
+ AAI_PER_GROUP_METRIC_NAME,
+ DATE_COL_NAME,
+ EAI_METRIC_NAME,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ RETURN_PERIOD_METRIC_NAME,
+ RISK_COL_NAME,
+)
+from climada.trajectories.impact_calc_strat import ImpactCalcComputation
+from climada.trajectories.snapshot import Snapshot
+from climada.trajectories.static_trajectory import (
+ DEFAULT_ALLGROUP_NAME,
+ DEFAULT_RP,
+ StaticRiskTrajectory,
+)
+from climada.trajectories.trajectory import RiskTrajectory
+
+# --- Fixtures ---
+
+
+@pytest.fixture
+def mock_snapshots():
+ """Provides a list of mock Snapshot objects with sequential dates."""
+ snaps = []
+ for year in [2023, 2024, 2025]:
+ m = MagicMock(spec=Snapshot)
+ m.date = datetime.date(year, 1, 1)
+ snaps.append(m)
+ return snaps
+
+
+@pytest.fixture
+def mock_disc_rates():
+ """Provides a mock DiscRates object."""
+ dr = MagicMock(spec=DiscRates)
+ dr.years = [2023, 2024, 2025]
+ dr.rates = [0.01, 0.02, 0.03]
+ return dr
+
+
+@pytest.fixture
+def rt_basic(mock_snapshots):
+ """A basic StaticRiskTrajectory instance."""
+ return StaticRiskTrajectory(mock_snapshots)
+
+
+@pytest.fixture
+def trajectory_metadata():
+ """Common metadata for DataFrame generation."""
+ return {
+ "dates1": [pd.Timestamp("2023-01-01"), pd.Timestamp("2024-01-01")],
+ "dates2": [pd.Timestamp("2026-01-01")],
+ "groups": ["GroupA", "GroupB", pd.NA],
+ "measures": ["MEAS1", "MEAS2"],
+ "metrics": [AAI_METRIC_NAME],
+ }
+
+
+@pytest.fixture
+def expected_aai_data(trajectory_metadata):
+ """Generates the expected AAI DataFrames used for comparison."""
+ meta = trajectory_metadata
+ all_dates = meta["dates1"] + meta["dates2"]
+
+ df = pd.DataFrame(
+ product(meta["groups"], all_dates, meta["measures"], meta["metrics"]),
+ columns=[GROUP_COL_NAME, DATE_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME],
+ )
+ df[RISK_COL_NAME] = np.arange(len(df)) * 100.0
+
+ # Handle Categories and Nulls
+ df[GROUP_COL_NAME] = df[GROUP_COL_NAME].astype("category")
+ df[GROUP_COL_NAME] = df[GROUP_COL_NAME].cat.add_categories([DEFAULT_ALLGROUP_NAME])
+ df[GROUP_COL_NAME] = df[GROUP_COL_NAME].fillna(DEFAULT_ALLGROUP_NAME)
+
+ cols = [
+ DATE_COL_NAME,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ RISK_COL_NAME,
+ ]
+ return df[cols]
+
+
+@pytest.fixture
+def mock_components():
+ """Provides standard CLIMADA mock objects."""
+ snaps = [
+ MagicMock(spec=Snapshot, date=datetime.date(2023 + i, 1, 1)) for i in range(3)
+ ]
+ strat = MagicMock(spec=ImpactCalcComputation)
+ dr = MagicMock(
+ spec=DiscRates, years=[2023, 2024, 2025, 2026], rates=[0.01, 0.02, 0.03, 0.04]
+ )
+ return {"snaps": snaps, "strat": strat, "disc_rates": dr}
+
+
+# --- Pure RiskTrajectory Tests ---
+
+
+def test_init_basic(rt_basic, mock_snapshots):
+ assert rt_basic.start_date == mock_snapshots[0].date
+ assert rt_basic.end_date == mock_snapshots[-1].date
+ assert rt_basic._risk_disc_rates is None
+ assert rt_basic._all_groups_name == DEFAULT_ALLGROUP_NAME
+ assert rt_basic._return_periods == DEFAULT_RP
+
+ for metric in StaticRiskTrajectory.POSSIBLE_METRICS:
+ assert getattr(rt_basic, f"_{metric}_metrics") is None
+
+
+def test_init_args(mock_snapshots, mock_disc_rates):
+ custom_rp = [10, 20]
+ custom_name = "custom"
+ rt = StaticRiskTrajectory(
+ mock_snapshots,
+ return_periods=custom_rp,
+ all_groups_name=custom_name,
+ risk_disc_rates=mock_disc_rates,
+ )
+ assert rt._risk_disc_rates == mock_disc_rates
+ assert rt._all_groups_name == custom_name
+ assert rt.return_periods == custom_rp
+
+
+# --- Property & Setter Tests ---
+
+
+def test_set_return_periods(rt_basic):
+ with pytest.raises(ValueError):
+ rt_basic.return_periods = "A"
+
+ rt_basic.return_periods = [1, 2]
+ assert rt_basic.return_periods == [1, 2]
+
+
+def test_set_disc_rates(rt_basic, mock_disc_rates):
+ # Mock the reset_metrics method on the instance
+ with patch.object(rt_basic, "_reset_metrics", wraps=rt_basic._reset_metrics) as spy:
+ with pytest.raises(ValueError):
+ rt_basic.risk_disc_rates = "A"
+
+ rt_basic.risk_disc_rates = mock_disc_rates
+ # Once in __init__, once in setter
+ assert spy.call_count == 1
+ assert rt_basic.risk_disc_rates == mock_disc_rates
+
+
+# --- NPV Transformation Tests ---
+
+
+def test_npv_transform_no_group_col(mock_disc_rates):
+ df_input = pd.DataFrame(
+ {
+ "date": pd.to_datetime(["2023-01-01", "2024-01-01"] * 2),
+ "measure": ["m1", "m1", "m2", "m2"],
+ "metric": [AAI_METRIC_NAME] * 4,
+ "risk": [100.0, 200.0, 80.0, 180.0],
+ }
+ )
+
+ with patch(
+ "climada.trajectories.trajectory.RiskTrajectory._calc_npv_cash_flows"
+ ) as mock_calc:
+ # Side effects to simulate discounted values
+ mock_calc.side_effect = [
+ pd.Series(
+ [99.0, 196.0], index=pd.to_datetime(["2023-01-01", "2024-01-01"])
+ ),
+ pd.Series(
+ [79.2, 176.4], index=pd.to_datetime(["2023-01-01", "2024-01-01"])
+ ),
+ ]
+
+ _ = RiskTrajectory.npv_transform(df_input.copy(), mock_disc_rates)
+
+ # Check calls: Grouping should happen by (measure, metric)
+ assert mock_calc.call_count == 2
+ # Verify first group args
+ args, _ = mock_calc.call_args_list[0]
+ assert args[1] == pd.Timestamp("2023-01-01")
+ assert args[2] == mock_disc_rates
+
+
+def test_calc_npv_cash_flows_logic(mock_disc_rates):
+ """Standalone test for the math inside _calc_npv_cash_flows."""
+ cash_flows = pd.Series(
+ [100, 200, 300],
+ index=pd.to_datetime(["2023-01-01", "2024-01-01", "2025-01-01"]),
+ )
+ start_date = datetime.date(2023, 1, 1)
+
+ # NPV Factor: (1 / (1 + rate)) ^ year_delta
+ # 2023: (1/1.01)^0 = 1.0 -> 100
+ # 2024: (1/1.02)^1 = 0.98039... -> 196.078...
+ # 2025: (1/1.03)^2 = 0.94259... -> 282.778...
+
+ result = RiskTrajectory._calc_npv_cash_flows(
+ cash_flows, start_date, mock_disc_rates
+ )
+
+ assert result.iloc[0] == pytest.approx(100.0)
+ assert result.iloc[1] == pytest.approx(200 / 1.02)
+ assert result.iloc[2] == pytest.approx(300 / (1.03**2))
+
+
+def test_calc_npv_cash_flows_invalid_index(mock_disc_rates):
+ cash_flows = pd.Series([100, 200]) # No datetime index
+ with pytest.raises(ValueError, match="PeriodIndex or DatetimeIndex"):
+ RiskTrajectory._calc_npv_cash_flows(
+ cash_flows, datetime.date(2023, 1, 1), mock_disc_rates
+ )
+
+
+# ---- StaticRiskTrajectory tests ---
+
+# --- Metric Computation Tests ---
+
+
+def test_compute_metrics(rt_basic):
+ with patch.object(
+ StaticRiskTrajectory, "_generic_metrics", return_value="42"
+ ) as mock_generic:
+ result = rt_basic._compute_metrics(
+ metric_name="dummy", metric_meth="meth", arg1="A", arg2=12
+ )
+
+ mock_generic.assert_called_once_with(
+ metric_name="dummy", metric_meth="meth", arg1="A", arg2=12
+ )
+ assert result == "42"
+
+
+def test_init_basic_static(mock_components):
+ # Patch the calculator class used inside __init__
+ with patch(
+ "climada.trajectories.static_trajectory.CalcRiskMetricsPoints", autospec=True
+ ) as mock_calc_cls:
+ rt = StaticRiskTrajectory(
+ mock_components["snaps"],
+ impact_computation_strategy=mock_components["strat"],
+ )
+
+ mock_calc_cls.assert_called_once_with(
+ mock_components["snaps"],
+ impact_computation_strategy=mock_components["strat"],
+ )
+ assert rt.start_date == mock_components["snaps"][0].date
+
+
+def test_set_impact_strategy_resets(mock_components):
+ rt = StaticRiskTrajectory(mock_components["snaps"])
+ with patch.object(rt, "_reset_metrics", wraps=rt._reset_metrics) as spy_reset:
+ new_strat = ImpactCalcComputation()
+ rt.impact_computation_strategy = new_strat
+
+ assert rt.impact_computation_strategy == new_strat
+ # Called once in init, once in setter
+ assert spy_reset.call_count == 1
+
+
+# --- Logic & Metric Tests ---
+
+
+def test_generic_metrics_caching_and_npv(mock_components, expected_aai_data):
+ """Tests the complex logic of _generic_metrics including NPV transform and caching."""
+ rt = StaticRiskTrajectory(
+ mock_components["snaps"], risk_disc_rates=mock_components["disc_rates"]
+ )
+
+ # Mock the internal calculator's method
+ mock_calc = MagicMock()
+ mock_calc.calc_aai_metric.return_value = expected_aai_data
+ rt._risk_metrics_calculators = mock_calc
+
+ # Mock NPV transform to return a modified version
+ npv_data = expected_aai_data.copy()
+ npv_data[RISK_COL_NAME] *= 0.9
+ with patch.object(rt, "npv_transform", return_value=npv_data) as mock_npv:
+
+ # First call
+ result = rt._generic_metrics(AAI_METRIC_NAME, "calc_aai_metric")
+
+ mock_calc.calc_aai_metric.assert_called_once()
+ mock_npv.assert_called_once()
+ pd.testing.assert_frame_equal(result, npv_data)
+
+ # Verify Internal Cache
+ assert rt._aai_metrics is not None
+
+ # Second call (should be cached)
+ result2 = rt._generic_metrics(AAI_METRIC_NAME, "calc_aai_metric")
+ assert mock_calc.calc_aai_metric.call_count == 1 # No new call
+ pd.testing.assert_frame_equal(result2, npv_data)
+
+
+@pytest.mark.parametrize(
+ "metric_name, method_name, attr_name",
+ [
+ (EAI_METRIC_NAME, "calc_eai_gdf", "eai_metrics"),
+ (AAI_METRIC_NAME, "calc_aai_metric", "aai_metrics"),
+ (
+ AAI_PER_GROUP_METRIC_NAME,
+ "calc_aai_per_group_metric",
+ "aai_per_group_metrics",
+ ),
+ ],
+)
+def test_metric_wrappers(mock_components, metric_name, method_name, attr_name):
+ """Uses parametrization to test all simple metric wrapper methods at once."""
+ rt = StaticRiskTrajectory(mock_components["snaps"])
+ with patch.object(rt, "_compute_metrics") as mock_compute:
+ wrapper_func = getattr(rt, attr_name)
+ wrapper_func(test_arg="val")
+ mock_compute.assert_called_once_with(
+ metric_name=metric_name, metric_meth=method_name, test_arg="val"
+ )
+
+
+def test_per_date_risk_metrics_aggregation(mock_components):
+ rt = StaticRiskTrajectory(mock_components["snaps"])
+
+ # Setup mock returns for the constituent parts
+ df_aai = pd.DataFrame({METRIC_COL_NAME: ["aai"], RISK_COL_NAME: [100]})
+ df_rp = pd.DataFrame({METRIC_COL_NAME: ["rp"], RISK_COL_NAME: [50]})
+ df_grp = pd.DataFrame({METRIC_COL_NAME: ["grp"], RISK_COL_NAME: [10]})
+
+ with (
+ patch.object(rt, "aai_metrics", return_value=df_aai) as m1,
+ patch.object(rt, "return_periods_metrics", return_value=df_rp) as m2,
+ patch.object(rt, "aai_per_group_metrics", return_value=df_grp) as m3,
+ ):
+
+ result = rt.per_date_risk_metrics()
+ assert len(result) == 3
+ assert list(result[METRIC_COL_NAME]) == ["aai", "rp", "grp"]
+ # Verify it called all three internal methods
+ m1.assert_called_once()
+ m2.assert_called_once()
+ m3.assert_called_once()
diff --git a/climada/trajectories/test/test_trajectory.py b/climada/trajectories/test/test_trajectory.py
new file mode 100644
index 0000000000..4e0259483b
--- /dev/null
+++ b/climada/trajectories/test/test_trajectory.py
@@ -0,0 +1,52 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+unit tests for RiskTrajectory (Being an abstract )
+
+"""
+
+import datetime
+from unittest.mock import MagicMock, call
+
+import pandas as pd
+import pytest
+
+from climada.entity.disc_rates.base import DiscRates
+from climada.trajectories.constants import AAI_METRIC_NAME
+from climada.trajectories.snapshot import Snapshot
+from climada.trajectories.trajectory import (
+ DEFAULT_ALLGROUP_NAME,
+ DEFAULT_RP,
+ RiskTrajectory,
+)
+
+
+@pytest.fixture
+def mock_snapshots():
+ """Provides a list of mock Snapshot objects with sequential dates."""
+ snaps = []
+ for year in [2023, 2024, 2025]:
+ m = MagicMock(spec=Snapshot)
+ m.date = datetime.date(year, 1, 1)
+ snaps.append(m)
+ return snaps
+
+
+def test_abstract():
+ with pytest.raises(TypeError, match="abstract class"):
+ RiskTrajectory(mock_snapshots) # type: ignore
diff --git a/climada/trajectories/trajectory.py b/climada/trajectories/trajectory.py
new file mode 100644
index 0000000000..c64123be92
--- /dev/null
+++ b/climada/trajectories/trajectory.py
@@ -0,0 +1,274 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+This file implements abstract trajectory objects, to factorise the code common to
+interpolated and static trajectories.
+
+"""
+
+import datetime
+import logging
+from abc import ABC, abstractmethod
+from typing import Iterable
+
+import pandas as pd
+
+from climada.entity.disc_rates.base import DiscRates
+from climada.trajectories.constants import (
+ DATE_COL_NAME,
+ DEFAULT_ALLGROUP_NAME,
+ DEFAULT_RP,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ PERIOD_COL_NAME,
+ RISK_COL_NAME,
+ UNIT_COL_NAME,
+)
+from climada.trajectories.snapshot import Snapshot
+
+LOGGER = logging.getLogger(__name__)
+
+__all__ = ["RiskTrajectory"]
+
+DEFAULT_DF_COLUMN_PRIORITY = [
+ DATE_COL_NAME,
+ PERIOD_COL_NAME,
+ GROUP_COL_NAME,
+ MEASURE_COL_NAME,
+ METRIC_COL_NAME,
+ UNIT_COL_NAME,
+]
+INDEXING_COLUMNS = [DATE_COL_NAME, GROUP_COL_NAME, MEASURE_COL_NAME, METRIC_COL_NAME]
+
+
+class RiskTrajectory(ABC):
+ """Base abstract class for risk trajectory objects.
+
+ See concrete implementation :class:`StaticRiskTrajectory` and
+ :class:`InterpolatedRiskTrajectory` for more details.
+
+ """
+
+ _grouper = [MEASURE_COL_NAME, METRIC_COL_NAME]
+ """Results dataframe grouper used in most `groupby()` calls."""
+
+ POSSIBLE_METRICS = []
+ """Class variable listing the risk metrics that can be computed."""
+
+ def __init__(
+ self,
+ snapshots_list: Iterable[Snapshot],
+ *,
+ return_periods: Iterable[int] = DEFAULT_RP,
+ all_groups_name: str = DEFAULT_ALLGROUP_NAME,
+ risk_disc_rates: DiscRates | None = None,
+ ):
+ self._reset_metrics()
+ self._snapshots = sorted(snapshots_list, key=lambda snap: snap.date)
+ self._all_groups_name = all_groups_name
+ self._return_periods = return_periods
+ self.start_date = min((snapshot.date for snapshot in snapshots_list))
+ self.end_date = max((snapshot.date for snapshot in snapshots_list))
+ self._risk_disc_rates = risk_disc_rates
+
+ def _reset_metrics(self) -> None:
+ """Resets the computed metrics to None.
+
+ This method is called to inititialize the `POSSIBLE_METRICS` to `None` during
+ the initialisation.
+
+ It is also called when properties that would change the results of
+ computed metrics (for instance changing the time resolution in
+ :class:`InterpolatedRiskMetrics`)
+
+ """
+ for metric in self.POSSIBLE_METRICS:
+ setattr(self, "_" + metric + "_metrics", None)
+
+ @abstractmethod
+ def _generic_metrics(
+ self, /, metric_name: str, metric_meth: str, **kwargs
+ ) -> pd.DataFrame:
+ """Main method to return the results of a specific metric.
+
+ This method should call the `_generic_metrics()` of its parent and
+ define the part of the computation and treatment that
+ is specific to a child class of :class:`RiskTrajectory`.
+
+ See also
+ --------
+
+ - :method:`_compute_metrics`
+
+ """
+ raise NotImplementedError(
+ f"'_generic_metrics' must be implemented by subclasses of {self.__class__.__name__}"
+ )
+
+ def _compute_metrics(
+ self, /, metric_name: str, metric_meth: str, **kwargs
+ ) -> pd.DataFrame:
+ """Helper method to compute metrics.
+
+ Notes
+ -----
+
+ This method exists for the sake of the children classes for option appraisal, for which
+ `_generic_metrics` can have a different signature and extend on its
+ parent method. This method can stay the same (same signature) for all classes.
+ """
+ return self._generic_metrics(
+ metric_name=metric_name, metric_meth=metric_meth, **kwargs
+ )
+
+ @property
+ def return_periods(self) -> Iterable[int]:
+ """The return period values to use when computing risk period metrics.
+
+ Notes
+ -----
+
+ Changing its value resets the corresponding metric.
+ """
+ return self._return_periods
+
+ @return_periods.setter
+ def return_periods(self, value, /):
+ if not isinstance(value, list):
+ raise ValueError("Return periods need to be a list of int.")
+ if any(not isinstance(i, int) for i in value):
+ raise ValueError("Return periods need to be a list of int.")
+ self._return_periods_metrics = None
+ self._return_periods = value
+
+ @property
+ def risk_disc_rates(self) -> DiscRates | None:
+ """The discount rate applied to compute net present values.
+ None means no discount rate.
+
+ Notes
+ -----
+
+ Changing its value resets all the metrics.
+ """
+ return self._risk_disc_rates
+
+ @risk_disc_rates.setter
+ def risk_disc_rates(self, value, /):
+ if value is not None and not isinstance(value, (DiscRates)):
+ raise ValueError("Risk discount needs to be a `DiscRates` object.")
+
+ self._reset_metrics()
+ self._risk_disc_rates = value
+
+ @classmethod
+ def npv_transform(
+ cls, metric_df: pd.DataFrame, risk_disc_rates: DiscRates
+ ) -> pd.DataFrame:
+ """Apply provided discount rate to the provided metric `DataFrame`.
+
+ Parameters
+ ----------
+ metric_df : pd.DataFrame
+ The `DataFrame` of the metric to discount.
+ risk_disc_rates : DiscRate
+ The discount rate to apply.
+
+ Returns
+ -------
+ pd.DataFrame
+ The discounted risk metric.
+
+ """
+
+ def _npv_group(group, disc):
+ start_date = group.index.get_level_values(DATE_COL_NAME).min()
+ return cls._calc_npv_cash_flows(group, start_date, disc)
+
+ metric_df = metric_df.set_index(DATE_COL_NAME)
+ grouper = cls._grouper
+ if GROUP_COL_NAME in metric_df.columns:
+ grouper = [GROUP_COL_NAME] + grouper
+
+ metric_df[RISK_COL_NAME] = metric_df.groupby(
+ grouper,
+ dropna=False,
+ as_index=False,
+ group_keys=False,
+ observed=True,
+ )[RISK_COL_NAME].transform(_npv_group, risk_disc_rates)
+ metric_df = metric_df.reset_index()
+ return metric_df
+
+ @staticmethod
+ def _calc_npv_cash_flows(
+ cash_flows: pd.DataFrame | pd.Series,
+ start_date: datetime.date,
+ disc_rates: DiscRates | None = None,
+ ):
+ """Apply discount rate to cash flows.
+
+ If it is defined, applies a discount rate `disc` to a given cash flow
+ `cash_flows` assuming present year corresponds to `start_date`.
+
+ Parameters
+ ----------
+ cash_flows : pd.DataFrame
+ The cash flow to apply the discount rate to.
+ start_date : datetime.date
+ The date representing the present.
+ end_date : datetime.date, optional
+ disc : DiscRates, optional
+ The discount rate to apply.
+
+ Returns
+ -------
+
+ A dataframe (copy) of `cash_flows` where values are discounted according to `disc`.
+
+ """
+
+ if not disc_rates:
+ return cash_flows
+
+ if not isinstance(cash_flows.index, (pd.PeriodIndex, pd.DatetimeIndex)):
+ raise ValueError(
+ "cash_flows must be a pandas Series with a PeriodIndex or DatetimeIndex"
+ )
+
+ metric_df = cash_flows.to_frame(name="cash_flow") # type: ignore
+ metric_df["year"] = metric_df.index.year
+
+ # Merge with the discount rates based on the year
+ tmp = metric_df.merge(
+ pd.DataFrame({"year": disc_rates.years, "rate": disc_rates.rates}),
+ on="year",
+ how="left",
+ )
+ tmp.index = metric_df.index
+ metric_df = tmp.copy()
+ metric_df["discount_factor"] = (1 / (1 + metric_df["rate"])) ** (
+ metric_df.index.year - start_date.year
+ )
+
+ # Apply the discount factors to the cash flows
+ metric_df["npv_cash_flow"] = (
+ metric_df["cash_flow"] * metric_df["discount_factor"]
+ )
+ return metric_df["npv_cash_flow"]
diff --git a/climada/util/dataframe_handling.py b/climada/util/dataframe_handling.py
new file mode 100644
index 0000000000..b5ac6bef97
--- /dev/null
+++ b/climada/util/dataframe_handling.py
@@ -0,0 +1,63 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Define functions to handle with coordinates
+"""
+
+import pandas as pd
+
+
+def reorder_dataframe_columns(
+ df: pd.DataFrame, priority_order: list[str], keep_remaining: bool = True
+) -> pd.DataFrame | pd.Series:
+ """
+ Applies a column priority list to a DataFrame to reorder its columns.
+
+ This function is robust to cases where:
+ 1. Columns in 'priority_order' are not in the DataFrame (they are ignored).
+ 2. Columns in the DataFrame are not in 'priority_order'.
+
+ Parameters
+ ----------
+ df: pd.DataFrame
+ The input DataFrame.
+ priority_order: list[str]
+ A list of strings defining the desired column
+ order. Columns listed first have higher priority.
+ keep_remaining: bool
+ If True, any columns in the DataFrame but NOT in
+ 'priority_order' will be appended to the end in their
+ original relative order. If False, these columns
+ are dropped.
+
+ Returns:
+ pd.DataFrame: The DataFrame with columns reordered according to the priority list.
+ """
+
+ present_priority_columns = [col for col in priority_order if col in df.columns]
+
+ new_column_order = present_priority_columns
+
+ if keep_remaining:
+ remaining_columns = [
+ col for col in df.columns if col not in present_priority_columns
+ ]
+
+ new_column_order.extend(remaining_columns)
+
+ return df[new_column_order]
diff --git a/doc/api/climada/climada.rst b/doc/api/climada/climada.rst
index 557532912f..2e8d053946 100644
--- a/doc/api/climada/climada.rst
+++ b/doc/api/climada/climada.rst
@@ -7,4 +7,5 @@ Software documentation per package
climada.engine
climada.entity
climada.hazard
+ climada.trajectories
climada.util
diff --git a/doc/api/climada/climada.trajectories.impact_calc_strat.rst b/doc/api/climada/climada.trajectories.impact_calc_strat.rst
new file mode 100644
index 0000000000..1bf211b4c0
--- /dev/null
+++ b/doc/api/climada/climada.trajectories.impact_calc_strat.rst
@@ -0,0 +1,7 @@
+climada\.trajectories\.impact_calc_strat module
+----------------------------------------
+
+.. automodule:: climada.trajectories.impact_calc_strat
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/doc/api/climada/climada.trajectories.rst b/doc/api/climada/climada.trajectories.rst
new file mode 100644
index 0000000000..67b37809a6
--- /dev/null
+++ b/doc/api/climada/climada.trajectories.rst
@@ -0,0 +1,9 @@
+
+climada\.trajectories module
+============================
+
+.. toctree::
+
+ climada.trajectories.snapshot
+ climada.trajectories.trajectories
+ climada.trajectories.impact_calc_strat
diff --git a/doc/api/climada/climada.trajectories.snapshot.rst b/doc/api/climada/climada.trajectories.snapshot.rst
new file mode 100644
index 0000000000..ba0faf57ac
--- /dev/null
+++ b/doc/api/climada/climada.trajectories.snapshot.rst
@@ -0,0 +1,7 @@
+climada\.trajectories\.snapshot module
+----------------------------------------
+
+.. automodule:: climada.trajectories.snapshot
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/doc/api/climada/climada.trajectories.trajectories.rst b/doc/api/climada/climada.trajectories.trajectories.rst
new file mode 100644
index 0000000000..4abe8acb15
--- /dev/null
+++ b/doc/api/climada/climada.trajectories.trajectories.rst
@@ -0,0 +1,15 @@
+climada\.trajectories\.static_trajectory module
+----------------------------------------
+
+.. automodule:: climada.trajectories.static_trajectory
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+climada\.trajectories\.trajectory module
+----------------------------------------
+
+.. automodule:: climada.trajectories.trajectory
+ :members:
+ :undoc-members:
+ :show-inheritance: