diff --git a/climada/trajectories/__init__.py b/climada/trajectories/__init__.py new file mode 100644 index 0000000000..5e0016c39d --- /dev/null +++ b/climada/trajectories/__init__.py @@ -0,0 +1,29 @@ +""" +This file is part of CLIMADA. + +Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. + +CLIMADA is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free +Software Foundation, version 3. + +CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along +with CLIMADA. If not, see . + +--- + +This module implements risk trajectory objects which enable computation and +possibly interpolation of risk metric over multiple dates. + +""" + +from .interpolation import AllLinearStrategy, ExponentialExposureStrategy + +__all__ = [ + "AllLinearStrategy", + "ExponentialExposureStrategy", +] diff --git a/climada/trajectories/interpolation.py b/climada/trajectories/interpolation.py new file mode 100644 index 0000000000..45fc18f575 --- /dev/null +++ b/climada/trajectories/interpolation.py @@ -0,0 +1,473 @@ +""" +This file is part of CLIMADA. + +Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. + +CLIMADA is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free +Software Foundation, version 3. + +CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along +with CLIMADA. If not, see . + +--- + +This modules implements different sparce matrices and numpy arrays +interpolation approaches. + +""" + +import logging +from abc import ABC +from collections.abc import Callable +from typing import Any, Dict, List, Optional + +import numpy as np +from scipy import sparse + +LOGGER = logging.getLogger(__name__) + +__all__ = [ + "AllLinearStrategy", + "ExponentialExposureStrategy", + "linear_convex_combination", + "linear_interp_matrix_elemwise", + "exponential_convex_combination", + "exponential_interp_matrix_elemwise", +] + + +def linear_interp_matrix_elemwise( + mat_start: sparse.csr_matrix, + mat_end: sparse.csr_matrix, + number_of_interpolation_points: int, +) -> List[sparse.csr_matrix]: + r""" + Linearly interpolates between two sparse impact matrices. + + Creates a sequence of matrices representing a linear transition from a starting + matrix to an ending matrix. The interpolation includes both the start and end + points. + + Parameters + ---------- + mat_start : scipy.sparse.csr_matrix + The starting impact matrix. Must have a shape compatible with `mat_end` + for arithmetic operations. + mat_end : scipy.sparse.csr_matrix + The ending impact matrix. Must have a shape compatible with `mat_start` + for arithmetic operations. + number_of_interpolation_points : int + The total number of matrices to return, including the start and end points. + Must be $\ge 2$. + + Returns + ------- + list of scipy.sparse.csr_matrix + A list of matrices, where the first element is `mat_start` and the last + element is `mat_end`. The total length of the list is + `number_of_interpolation_points`. + + Notes + ----- + The formula used for interpolation at proportion $p$ is: + $$M_p = M_{start} \cdot (1 - p) + M_{end} \cdot p$$ + The proportions $p$ range from 0 to 1, inclusive. + """ + + return [ + mat_start + prop * (mat_end - mat_start) + for prop in np.linspace(0, 1, number_of_interpolation_points) + ] + + +def exponential_interp_matrix_elemwise( + mat_start: sparse.csr_matrix, + mat_end: sparse.csr_matrix, + number_of_interpolation_points: int, +) -> List[sparse.csr_matrix]: + r""" + Exponentially interpolates between two "impact matrices". + + This function performs interpolation in a logarithmic space, effectively + achieving an exponential-like transition between `mat_start` and `mat_end`. + It is designed for objects that wrap NumPy arrays and expose them via a + `.data` attribute. + + Parameters + ---------- + mat_start : object + The starting matrix object. Must have a `.data` attribute that is a + NumPy array of positive values. + mat_end : object + The ending matrix object. Must have a `.data` attribute that is a + NumPy array of positive values and have a compatible shape with `mat_start`. + number_of_interpolation_points : int + The total number of matrix objects to return, including the start and + end points. Must be $\ge 2$. + + Returns + ------- + list of object + A list of interpolated matrix objects. The first element corresponds to + `mat_start` and the last to `mat_end` (after the conversion/reversion). + The list length is `number_of_interpolation_points`. + + Notes + ----- + The interpolation is achieved by: + + 1. Mapping the matrix data to a transformed logarithmic space: + $$M'_{i} = \ln(M_{i})}$$ + (where $\ln$ is the natural logarithm, and $\epsilon$ is added to $M_{i}$ + to prevent $\ln(0)$). + 2. Performing standard linear interpolation on the transformed matrices + $M'_{start}$ and $M'_{end}$ to get $M'_{interp}$: + $$M'_{interp} = M'_{start} \cdot (1 - \text{ratio}) + M'_{end} \cdot \text{ratio}$$ + 3. Mapping the result back to the original domain: + $$M_{interp} = \exp(M'_{interp}$$ + """ + + mat_start = mat_start.copy() + mat_end = mat_end.copy() + mat_start.data = np.log(mat_start.data + np.finfo(float).eps) + mat_end.data = np.log(mat_end.data + np.finfo(float).eps) + + # Perform linear interpolation in the logarithmic domain + res = [] + num_points = number_of_interpolation_points + for point in range(num_points): + ratio = point / (num_points - 1) + mat_interpolated = mat_start * (1 - ratio) + ratio * mat_end + mat_interpolated.data = np.exp(mat_interpolated.data) + res.append(mat_interpolated) + return res + + +def linear_convex_combination(arr_start: np.ndarray, arr_end: np.ndarray) -> np.ndarray: + r""" + Performs a linear convex combination between two n x m NumPy arrays over their + first dimension (n rows). + + This function interpolates each metric (column) linearly across the time steps + (rows), including both the start and end states. + + Parameters + ---------- + arr_start : numpy.ndarray + The starting array of metrics. The first dimension (rows) is assumed to + represent the interpolation steps (e.g., dates/time points). + arr_end : numpy.ndarray + The ending array of metrics. Must have the exact same shape as `arr_start`. + + Returns + ------- + numpy.ndarray + An array with the same shape as `arr_start` and `arr_end`. The values + in the first dimension transition linearly from those in `arr_start` + to those in `arr_end`. + + Raises + ------ + ValueError + If `arr_start` and `arr_end` do not have the same shape. + + Example + -------- + >>> arr_start = [ [ 1, 1], [1, 2], [10, 20] ] + >>> arr_end = [ [2, 2], [5, 6], [10, 30] ] + >>> linear_interp_arrays(arr_start, arr_end) + >>> [[1, 1], [3, 4], [10, 30]] + + Notes + ----- + The interpolation is performed element-wise along the first dimension + (axis 0). For each row $i$ and proportion $p_i$, the result $R_i$ is calculated as: + + $$R_i = arr\_start_i \cdot (1 - p_i) + arr\_end_i \cdot p_i$$ + + where $p_i$ is generated by $\text{np.linspace}(0, 1, n)$ and $n$ is the + size of the first dimension ($\text{arr\_start.shape}[0]$). + """ + if arr_start.shape != arr_end.shape: + raise ValueError( + f"Cannot interpolate arrays of different shapes: {arr_start.shape} and {arr_end.shape}." + ) + interpolation_range = arr_start.shape[0] + prop1 = np.linspace(0, 1, interpolation_range) + prop0 = 1 - prop1 + if arr_start.ndim > 1: + prop0, prop1 = prop0.reshape(-1, 1), prop1.reshape(-1, 1) + + return np.multiply(arr_start, prop0) + np.multiply(arr_end, prop1) + + +def exponential_convex_combination( + arr_start: np.ndarray, arr_end: np.ndarray +) -> np.ndarray: + r""" + Performs exponential convex combination between two NumPy arrays over their first dimension. + + This function achieves an exponential-like transition by performing linear + interpolation in the logarithmic space. + + Parameters + ---------- + arr_start : numpy.ndarray + The starting array of metrics. Values must be positive. + arr_end : numpy.ndarray + The ending array of metrics. Must have the exact same shape as `arr_start`. + + Returns + ------- + numpy.ndarray + An array with the same shape as `arr_start` and `arr_end`. The values + in the first dimension transition exponentially from those in `arr_start` + to those in `arr_end`. + + Raises + ------ + ValueError + If `arr_start` and `arr_end` do not have the same shape. + + See Also + --------- + linear_interp_arrays: linear version of the interpolation. + + Notes + ----- + The interpolation is performed by transforming the arrays to a logarithmic + domain, linearly interpolating, and then transforming back. + + The formula for the interpolated result $R$ at proportion $\text{prop}$ is: + $$ + R = \exp \left( + \ln(A_{start}) \cdot (1 - \text{prop}) + + \ln(A_{end}) \cdot \text{prop} + \right) + $$ + where $A_{start}$ and $A_{end}$ are the input arrays (with $\epsilon$ added + to prevent $\ln(0)$) and $\text{prop}$ ranges from 0 to 1. + """ + if arr_start.shape != arr_end.shape: + raise ValueError( + f"Cannot interpolate arrays of different shapes: {arr_start.shape} and {arr_end.shape}." + ) + interpolation_range = arr_start.shape[0] + + prop1 = np.linspace(0, 1, interpolation_range) + prop0 = 1 - prop1 + if arr_start.ndim > 1: + prop0, prop1 = prop0.reshape(-1, 1), prop1.reshape(-1, 1) + + # Perform log transformation, linear interpolation, and exponential back-transformation + log_arr_start = np.log(arr_start + np.finfo(float).eps) + log_arr_end = np.log(arr_end + np.finfo(float).eps) + + interpolated_log_arr = np.multiply(log_arr_start, prop0) + np.multiply( + log_arr_end, prop1 + ) + + return np.exp(interpolated_log_arr) + + +class ImpactInterpolationStrategy(ABC): + r""" + Base abstract class for defining a set of interpolation strategies for impact outputs. + + This class serves as a blueprint for implementing specific interpolation + methods (e.g., 'Linear', 'Exponential') describing how impact outputs + should evolve between two points in time. + + Impacts result from three dimensions—Exposure, Hazard, and Vulnerability— + each of which may change differently over time. Consequently, a distinct + interpolation strategy is defined for each dimension. + + Exposure interpolation differs from Hazard and Vulnerability interpolation. + Changes in exposure do not alter the shape of the impact matrices, which + allows direct interpolation of the matrices themselves. For the Exposure + dimension, interpolation therefore consists of generating intermediate + impact matrices between the two time points, with exposure evolving while + hazard and vulnerability remain fixed (to either the first or second point). + + In contrast, changes in Hazard may alter the + set of events between the two time points, making direct interpolation of + impact matrices impossible. Instead, impacts are first aggregated over the + event dimension (i.e. the EAI metric). The evolution of impacts is then + interpolated as a convex combination of metric sequences computed from two + scenarios: one with hazard fixed at the initial time point and one with + hazard fixed at the final time point. + + The same aggregation-based interpolation approach is applied to the + Vulnerability dimension. + + Attributes + ---------- + exposure_interp : Callable + The function used to interpolate sparse impact matrices over time + with changing exposure dimension. + Signature: (mat_start, mat_end, num_points, **kwargs) -> list[sparse.csr_matrix]. + hazard_interp : Callable + The function used to interpolate NumPy arrays of metrics over time + with changing hazard dimension. + Signature: (arr_start, arr_end, **kwargs) -> np.ndarray. + vulnerability_interp : Callable + The function used to interpolate NumPy arrays of metrics over time + with changing vulnerability dimension. + Signature: (arr_start, arr_end, **kwargs) -> np.ndarray. + """ + + exposure_interp: Callable + hazard_interp: Callable + vulnerability_interp: Callable + + def interp_over_exposure_dim( + self, + imp_E0: sparse.csr_matrix, + imp_E1: sparse.csr_matrix, + interpolation_range: int, + /, + **kwargs: Optional[Dict[str, Any]], + ) -> List[sparse.csr_matrix]: + """ + Interpolates between two impact matrices using the defined strategy for the exposure + dimension. + + This method calls the function assigned to :attr:`exposure_interp` to generate + a sequence of impact matrices of length "interpolation_range". + + Parameters + ---------- + imp_E0 : scipy.sparse.csr_matrix + A sparse matrix of the impacts at the start of the range. + imp_E1 : scipy.sparse.csr_matrix + A sparse matrix of the impacts at the end of the range. + interpolation_range : int + The total number of time points to interpolate, including the start and end. + **kwargs : Optional[Dict[str, Any]] + Keyword arguments to pass to the underlying :attr:`exposure_interp` function. + + Returns + ------- + list of scipy.sparse.csr_matrix + A list of ``interpolation_range`` interpolated impact matrices. + + Raises + ------ + ValueError + If the underlying interpolation function raises a ``ValueError`` + indicating incompatible matrix shapes. + """ + try: + res = self.exposure_interp(imp_E0, imp_E1, interpolation_range, **kwargs) + except ValueError as err: + if str(err) == "inconsistent shapes": + raise ValueError( + "Tried to interpolate impact matrices of different shapes. " + "A possible reason could be Exposures of different shapes." + ) from err + + raise err + + return res + + def interp_over_hazard_dim( + self, + metric_0: np.ndarray, + metric_1: np.ndarray, + /, + **kwargs: Optional[Dict[str, Any]], + ) -> np.ndarray: + """ + Generates the convex combination between two arrays of metrics using + the defined interpolation strategy for the hazard dimension. + + This method calls the function assigned to :attr:`hazard_interp`. + + Parameters + ---------- + metric_0 : numpy.ndarray + The starting array of metrics. + metric_1 : numpy.ndarray + The ending array of metrics. Must have the same shape as ``metric_0``. + **kwargs : Optional [Dict[str, Any]] + Keyword arguments to pass to the underlying :attr:`hazard_interp` function. + + Returns + ------- + numpy.ndarray + The resulting interpolated array. + """ + return self.hazard_interp(metric_0, metric_1, **kwargs) + + def interp_over_vulnerability_dim( + self, + metric_0: np.ndarray, + metric_1: np.ndarray, + /, + **kwargs: Optional[Dict[str, Any]], + ) -> np.ndarray: + """ + Generates the convex combination between two arrays of metrics using + the defined interpolation strategy for the hazard dimension. + + This method calls the function assigned to :attr:`vulnerability_interp`. + + Parameters + ---------- + metric_0 : numpy.ndarray + The starting array of metrics. + metric_1 : numpy.ndarray + The ending array of metrics. Must have the same shape as ``metric_0``. + **kwargs : Optional[Dict[str, Any]] + Keyword arguments to pass to the underlying :attr:`vulnerability_interp` function. + + Returns + ------- + numpy.ndarray + The resulting interpolated array. + """ + # Note: Assuming the Callable takes the exact positional arguments + return self.vulnerability_interp(metric_0, metric_1, **kwargs) + + +class CustomImpactInterpolationStrategy(ImpactInterpolationStrategy): + r"""Interface for interpolation strategies. + + This is the class to use to define custom interpolation strategies. + """ + + def __init__( + self, + exposure_interp: Callable, + hazard_interp: Callable, + vulnerability_interp: Callable, + ) -> None: + super().__init__() + self.exposure_interp = exposure_interp + self.hazard_interp = hazard_interp + self.vulnerability_interp = vulnerability_interp + + +class AllLinearStrategy(ImpactInterpolationStrategy): + r"""Linear interpolation strategy over all dimensions.""" + + def __init__(self) -> None: + super().__init__() + self.exposure_interp = linear_interp_matrix_elemwise + self.hazard_interp = linear_convex_combination + self.vulnerability_interp = linear_convex_combination + + +class ExponentialExposureStrategy(ImpactInterpolationStrategy): + r"""Exponential interpolation strategy for exposure and linear for Hazard and Vulnerability.""" + + def __init__(self) -> None: + super().__init__() + self.exposure_interp = exponential_interp_matrix_elemwise + self.hazard_interp = linear_convex_combination + self.vulnerability_interp = linear_convex_combination diff --git a/climada/trajectories/test/test_interpolation.py b/climada/trajectories/test/test_interpolation.py new file mode 100644 index 0000000000..da6f614fe1 --- /dev/null +++ b/climada/trajectories/test/test_interpolation.py @@ -0,0 +1,217 @@ +""" +This file is part of CLIMADA. + +Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. + +CLIMADA is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free +Software Foundation, version 3. + +CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along +with CLIMADA. If not, see . + +--- + +Tests for interpolation + +""" + +from unittest.mock import MagicMock + +import numpy as np +import pytest +from scipy.sparse import csr_matrix + +from climada.trajectories.interpolation import ( + AllLinearStrategy, + CustomImpactInterpolationStrategy, + ExponentialExposureStrategy, + exponential_convex_combination, + exponential_interp_matrix_elemwise, + linear_convex_combination, + linear_interp_matrix_elemwise, +) + +# --- Fixtures --- + + +@pytest.fixture +def interpolation_data(): + """Provides common matrices and constants for interpolation tests.""" + return { + "imp_mat0": csr_matrix(np.array([[1, 2], [3, 4]])), + "imp_mat1": csr_matrix(np.array([[5, 6], [7, 8]])), + "imp_mat2": csr_matrix(np.array([[5, 6, 7], [8, 9, 10]])), + "time_points": 5, + "rtol": 1e-5, + "atol": 1e-8, + "dummy_metric_0": np.array([10, 20, 30]), + "dummy_metric_1": np.array([100, 200, 300]), + "dummy_matrix_0": csr_matrix([[1, 2], [3, 4]]), + "dummy_matrix_1": csr_matrix([[10, 20], [30, 40]]), + } + + +# --- Tests for Interpolation Functions --- + + +def test_linear_interp_arrays(interpolation_data): + arr_start = np.array([10, 50, 100]) + arr_end = np.array([20, 100, 200]) + expected = np.array([10.0, 75.0, 200.0]) + result = linear_convex_combination(arr_start, arr_end) + np.testing.assert_allclose( + result, + expected, + rtol=interpolation_data["rtol"], + atol=interpolation_data["atol"], + ) + + +@pytest.mark.parametrize( + "func", [linear_convex_combination, exponential_convex_combination] +) +def test_convex_combination_shape_error(func): + arr_start = np.array([10, 100, 5]) + arr_end = np.array([20, 200]) + with pytest.raises(ValueError, match="different shapes"): + func(arr_start, arr_end) + + +def test_exponential_convex_combination_2d(interpolation_data): + arr_start = np.array([[1, 10, 100]] * 3) + arr_end = np.array([[2, 20, 200]] * 3) + expected = np.array( + [[1.0, 10.0, 100.0], [1.4142136, 14.142136, 141.42136], [2, 20, 200]] + ) + result = exponential_convex_combination(arr_start, arr_end) + np.testing.assert_allclose( + result, + expected, + rtol=interpolation_data["rtol"], + atol=interpolation_data["atol"], + ) + + +@pytest.mark.parametrize( + "func", [linear_convex_combination, exponential_convex_combination] +) +def test_convex_combinations_start_equals_end(interpolation_data, func): + """Test that if start and end are identical, the result is the same array.""" + arr = np.array([5.0, 5.0]) + result = func(arr, arr) + np.testing.assert_allclose(result, arr, rtol=interpolation_data["rtol"]) + + +@pytest.mark.parametrize( + "func,expected", + [ + ( + linear_interp_matrix_elemwise, + np.array( + [ + [[1.0, 2.0], [3.0, 4.0]], + [[2.0, 3.0], [4.0, 5.0]], + [[3.0, 4.0], [5.0, 6.0]], + [[4.0, 5.0], [6.0, 7.0]], + [[5.0, 6.0], [7.0, 8.0]], + ] + ), + ), + ( + exponential_interp_matrix_elemwise, + np.array( + [ + [[1.0, 2.0], [3.0, 4.0]], + [[1.49534878, 2.63214803], [3.70779275, 4.75682846]], + [[2.23606798, 3.46410162], [4.58257569, 5.65685425]], + [[3.34370152, 4.55901411], [5.66374698, 6.72717132]], + [[5.0, 6.0], [7.0, 8.0]], + ] + ), + ), + ], +) +def test_impmat_interpolate(interpolation_data, func, expected): + data = interpolation_data + result = func(data["imp_mat0"], data["imp_mat1"], data["time_points"]) + + assert len(result) == data["time_points"] + assert all(isinstance(mat, csr_matrix) for mat in result) + + dense = np.array([r.todense() for r in result]) + np.testing.assert_array_almost_equal(dense, expected) + + +# --- Tests for Interpolation Strategies --- + + +def test_custom_strategy_init(): + mock_func = lambda a, b, r: a + b + strategy = CustomImpactInterpolationStrategy(mock_func, mock_func, mock_func) + assert strategy.exposure_interp == mock_func + assert strategy.hazard_interp == mock_func + assert strategy.vulnerability_interp == mock_func + + +def test_custom_strategy_exposure_dim_error(interpolation_data): + mock_exposure = MagicMock(side_effect=ValueError("inconsistent shapes")) + strategy = CustomImpactInterpolationStrategy( + mock_exposure, linear_convex_combination, linear_convex_combination + ) + + with pytest.raises( + ValueError, match="Tried to interpolate impact matrices of different shape" + ): + strategy.interp_over_exposure_dim( + interpolation_data["dummy_matrix_0"], csr_matrix(np.array([[1]])), 3 + ) + + +# --- Tests for Concrete Strategies --- + + +def test_all_linear_strategy(interpolation_data): + data = interpolation_data + strategy = AllLinearStrategy() + + # Test property assignment + assert strategy.exposure_interp == linear_interp_matrix_elemwise + + # Test Hazard dim + result_haz = strategy.interp_over_hazard_dim( + data["dummy_metric_0"], data["dummy_metric_1"] + ) + expected_haz = linear_convex_combination( + data["dummy_metric_0"], data["dummy_metric_1"] + ) + np.testing.assert_allclose(result_haz, expected_haz) + + # Test Exposure dim + result_exp = strategy.interp_over_exposure_dim( + data["dummy_matrix_0"], data["dummy_matrix_1"], 3 + ) + assert len(result_exp) == 3 + # Check midpoint (index 1) manually + expected_mid = csr_matrix([[5.5, 11], [16.5, 22]]) + np.testing.assert_allclose(result_exp[1].data, expected_mid.data) + + +def test_exponential_exposure_strategy(interpolation_data): + data = interpolation_data + strategy = ExponentialExposureStrategy() + + result_exp = strategy.interp_over_exposure_dim( + data["dummy_matrix_0"], data["dummy_matrix_1"], 3 + ) + + # Midpoint should be geometric mean for exponential strategy + # sqrt(1*10) = 3.162278 + expected_mid_data = np.array([3.162278, 6.324555, 9.486833, 12.649111]) + np.testing.assert_allclose( + result_exp[1].data, expected_mid_data, rtol=data["rtol"], atol=data["atol"] + ) diff --git a/doc/api/climada/climada.trajectories.interpolation.rst b/doc/api/climada/climada.trajectories.interpolation.rst new file mode 100644 index 0000000000..98e1ec7b32 --- /dev/null +++ b/doc/api/climada/climada.trajectories.interpolation.rst @@ -0,0 +1,7 @@ +climada\.trajectories\.interpolation module +---------------------------------------- + +.. automodule:: climada.trajectories.interpolation + :members: + :undoc-members: + :show-inheritance: diff --git a/doc/api/climada/climada.trajectories.rst b/doc/api/climada/climada.trajectories.rst new file mode 100644 index 0000000000..ca449199d5 --- /dev/null +++ b/doc/api/climada/climada.trajectories.rst @@ -0,0 +1,7 @@ + +climada\.trajectories module +============================ + +.. toctree:: + + climada.trajectories.interpolation