diff --git a/climada/engine/__init__.py b/climada/engine/__init__.py
index ef8292f75d..1970f7706b 100755
--- a/climada/engine/__init__.py
+++ b/climada/engine/__init__.py
@@ -22,3 +22,4 @@
from .cost_benefit import *
from .impact import *
from .impact_calc import *
+from .impact_forecast import ImpactForecast
diff --git a/climada/engine/impact.py b/climada/engine/impact.py
index d8c944c7b9..71ab1c06a7 100644
--- a/climada/engine/impact.py
+++ b/climada/engine/impact.py
@@ -1431,6 +1431,8 @@ def write_attribute(group, name, value):
def write_dataset(group, name, value):
"""Write a dataset"""
+ if name == "lead_time":
+ value = value.astype("timedelta64[ns]").astype("int64")
group.create_dataset(name, data=value, dtype=_str_type_helper(value))
def write_dict(group, name, value):
@@ -1618,7 +1620,9 @@ def read_excel(self, *args, **kwargs):
self.__dict__ = Impact.from_excel(*args, **kwargs).__dict__
@classmethod
- def from_hdf5(cls, file_path: Union[str, Path]):
+ def from_hdf5(
+ cls, file_path: Union[str, Path], *, add_scalar_attrs=None, add_array_attrs=None
+ ):
"""Create an impact object from an H5 file.
This assumes a specific layout of the file. If values are not found in the
@@ -1663,6 +1667,10 @@ def from_hdf5(cls, file_path: Union[str, Path]):
----------
file_path : str or Path
The file path of the file to read.
+ add_scalar_attrs : Iterable of str, optional
+ Scalar attributes to read from file. Defaults to None.
+ add_array_attrs : Iterable of str, optional
+ Array attributes to read from file. Defaults to None.
Returns
-------
@@ -1691,7 +1699,10 @@ def from_hdf5(cls, file_path: Union[str, Path]):
# Scalar attributes
scalar_attrs = set(
("crs", "tot_value", "unit", "aai_agg", "frequency_unit", "haz_type")
- ).intersection(file.attrs.keys())
+ )
+ if add_scalar_attrs is not None:
+ scalar_attrs = scalar_attrs.union(add_scalar_attrs)
+ scalar_attrs = scalar_attrs.intersection(file.attrs.keys())
kwargs.update({attr: file.attrs[attr] for attr in scalar_attrs})
# Array attributes
@@ -1699,9 +1710,16 @@ def from_hdf5(cls, file_path: Union[str, Path]):
# invalidated once we close the file.
array_attrs = set(
("event_id", "date", "coord_exp", "eai_exp", "at_event", "frequency")
- ).intersection(file.keys())
+ )
+ if add_array_attrs is not None:
+ array_attrs = array_attrs.union(add_array_attrs)
+ array_attrs = array_attrs.intersection(file.keys())
kwargs.update({attr: file[attr][:] for attr in array_attrs})
-
+ # correct lead_time attribut to timedelta
+ if "lead_time" in kwargs:
+ kwargs["lead_time"] = np.array(file["lead_time"][:]).astype(
+ "timedelta64[ns]"
+ )
# Special handling for 'event_name' because it should be a list of strings
if "event_name" in file:
# pylint: disable=no-member
@@ -2208,9 +2226,12 @@ def stack_attribute(attr_name: str) -> np.ndarray:
imp_mat = sparse.vstack(imp_mats)
# Concatenate other attributes
- kwargs = {
- attr: stack_attribute(attr) for attr in ("date", "frequency", "at_event")
- }
+ concat_attrs = {
+ name.lstrip("_") # Private attributes with getter/setter
+ for name, value in first_imp.__dict__.items()
+ if isinstance(value, np.ndarray)
+ }.difference(("event_id", "coord_exp", "eai_exp", "aai_agg"))
+ kwargs = {attr: stack_attribute(attr) for attr in concat_attrs}
# Get remaining attributes from first impact object in list
return cls(
diff --git a/climada/engine/impact_calc.py b/climada/engine/impact_calc.py
index 0586166173..f58316bf56 100644
--- a/climada/engine/impact_calc.py
+++ b/climada/engine/impact_calc.py
@@ -29,6 +29,8 @@
from climada import CONFIG
from climada.engine.impact import Impact
+from climada.engine.impact_forecast import ImpactForecast
+from climada.hazard.forecast import HazardForecast
LOGGER = logging.getLogger(__name__)
@@ -217,7 +219,7 @@ def _return_impact(self, imp_mat_gen, save_mat):
Returns
-------
- Impact
+ Impact or ImpactForecast
Impact Object initialize from the impact matrix
See Also
@@ -230,12 +232,31 @@ def _return_impact(self, imp_mat_gen, save_mat):
at_event, eai_exp, aai_agg = self.risk_metrics(
imp_mat, self.hazard.frequency
)
+ if isinstance(self.hazard, HazardForecast):
+ eai_exp = np.full_like(eai_exp, np.nan, dtype=eai_exp.dtype)
+ aai_agg = np.full_like(aai_agg, np.nan, dtype=aai_agg.dtype)
+ LOGGER.warning(
+ "eai_exp and aai_agg are undefined with forecasts. "
+ "Setting them to NaN arrays."
+ )
+
else:
+ if isinstance(self.hazard, HazardForecast):
+ raise ValueError(
+ "Saving impact matrix is required when using HazardForecast."
+ "Please set save_mat=True."
+ )
imp_mat = None
at_event, eai_exp, aai_agg = self.stitch_risk_metrics(imp_mat_gen)
- return Impact.from_eih(
+
+ impact = Impact.from_eih(
self.exposures, self.hazard, at_event, eai_exp, aai_agg, imp_mat
)
+ if isinstance(self.hazard, HazardForecast):
+ return ImpactForecast.from_impact(
+ impact, self.hazard.lead_time, self.hazard.member
+ )
+ return impact
def _return_empty(self, save_mat):
"""
@@ -248,21 +269,37 @@ def _return_empty(self, save_mat):
Returns
-------
- Impact
+ Impact or ImpactForecast
Empty impact object with correct array sizes.
"""
at_event = np.zeros(self.n_events)
- eai_exp = np.zeros(self.n_exp_pnt)
- aai_agg = 0.0
+ if isinstance(self.hazard, HazardForecast):
+ eai_exp = np.full(self.n_exp_pnt, np.nan)
+ aai_agg = np.nan
+ else:
+ eai_exp = np.zeros(self.n_exp_pnt)
+ aai_agg = 0.0
+
if save_mat:
imp_mat = sparse.csr_matrix(
(self.n_events, self.n_exp_pnt), dtype=np.float64
)
else:
+ if isinstance(self.hazard, HazardForecast):
+ raise ValueError(
+ "Saving impact matrix is required when using HazardForecast. "
+ "Please set save_mat=True."
+ )
imp_mat = None
- return Impact.from_eih(
+
+ impact = Impact.from_eih(
self.exposures, self.hazard, at_event, eai_exp, aai_agg, imp_mat
)
+ if isinstance(self.hazard, HazardForecast):
+ return ImpactForecast.from_impact(
+ impact, self.hazard.lead_time, self.hazard.member
+ )
+ return impact
def minimal_exp_gdf(
self, impf_col, assign_centroids, ignore_cover, ignore_deductible
diff --git a/climada/engine/impact_forecast.py b/climada/engine/impact_forecast.py
new file mode 100644
index 0000000000..3994f38ea3
--- /dev/null
+++ b/climada/engine/impact_forecast.py
@@ -0,0 +1,467 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Define Forecast variant of Impact.
+"""
+
+import logging
+from pathlib import Path
+from typing import Union
+
+import numpy as np
+import scipy.sparse as sparse
+
+from ..util import log_level
+from ..util.checker import size
+from ..util.forecast import Forecast
+from .impact import Impact
+
+LOGGER = logging.getLogger(__name__)
+
+
+class ImpactForecast(Forecast, Impact):
+ """An impact object with forecast information"""
+
+ def __init__(
+ self,
+ *,
+ lead_time: np.ndarray | None,
+ member: np.ndarray | None,
+ **impact_kwargs,
+ ):
+ """Initialize the impact forecast.
+
+ Parameters
+ ----------
+ lead_time : np.ndarray, optional
+ The lead time associated with each event entry, given as timedelta64 type
+ member : np.ndarray, optional
+ The ensemble member associated with each event entry, given as integers
+ impact_kwargs
+ Keyword-arguments passed to ~:py:class`climada.engine.impact.Impact`.
+ """
+ super().__init__(lead_time=lead_time, member=member, **impact_kwargs)
+ self._check_sizes()
+
+ @classmethod
+ def from_impact(
+ cls, impact: Impact, lead_time: np.ndarray | None, member: np.ndarray | None
+ ):
+ """Create an impact forecast from an impact object and forecast information.
+
+ Parameters
+ ----------
+ impact : climada.engine.impact.Impact
+ The impact object whose data to use in the forecast object
+ lead_time : np.ndarray, optional
+ The lead time associated with each event entry, given as timedelta64 type
+ member : np.ndarray, optional
+ The ensemble member associated with each event entry, given as integers
+ """
+ with log_level("WARNING", "climada.engine.impact"):
+ return cls(
+ lead_time=lead_time,
+ member=member,
+ event_id=impact.event_id,
+ event_name=impact.event_name,
+ date=impact.date,
+ frequency=impact.frequency,
+ frequency_unit=impact.frequency_unit,
+ coord_exp=impact.coord_exp,
+ crs=impact.crs,
+ eai_exp=impact.eai_exp,
+ at_event=impact.at_event,
+ tot_value=impact.tot_value,
+ aai_agg=impact.aai_agg,
+ unit=impact.unit,
+ imp_mat=impact.imp_mat,
+ haz_type=impact.haz_type,
+ )
+
+ @property
+ def at_event(self):
+ """Get the total impact for each member/lead_time combination."""
+ LOGGER.warning(
+ "at_event gives the total impact for one specific combination of member and "
+ "lead_time."
+ )
+ return self._at_event
+
+ @at_event.setter
+ def at_event(self, value):
+ """Set the total impact for each member/lead_time combination."""
+ self._at_event = value
+
+ def local_exceedance_impact(
+ self,
+ return_periods=(25, 50, 100, 250),
+ method="interpolate",
+ min_impact=0,
+ log_frequency=True,
+ log_impact=True,
+ bin_decimals=None,
+ ):
+ """Compution of local exceedance impact for given return periods is not
+ implemented for ImpactForecast.
+
+ See Also
+ --------
+ See :py:meth:`~climada.engine.impact.Impact.local_exceedance_impact`
+
+ Raises
+ ------
+ NotImplementedError
+ """
+
+ LOGGER.error("local_exceedance_impact is not defined for ImpactForecast")
+ raise NotImplementedError(
+ "local_exceedance_impact is not defined for ImpactForecast"
+ )
+
+ def local_return_period(
+ self,
+ threshold_impact=(1000.0, 10000.0),
+ method="interpolate",
+ min_impact=0,
+ log_frequency=True,
+ log_impact=True,
+ bin_decimals=None,
+ ):
+ """Compution of local return period for given impact thresholds is not
+ implemented for ImpactForecast.
+
+ See Also
+ --------
+ See :py:meth:`~climada.engine.impact.Impact.local_return_period`
+
+ Raises
+ -------
+ NotImplementedError
+ """
+
+ LOGGER.error("local_return_period is not defined for ImpactForecast")
+ raise NotImplementedError(
+ "local_return_period is not defined for ImpactForecast"
+ )
+
+ def calc_freq_curve(self, return_per=None):
+ """Computation of the impact exceedance frequency curve is not
+ implemented for ImpactForecast.
+
+ See Also
+ --------
+ See :py:meth:`~climada.engine.impact.Impact.calc_freq_curve`
+
+ Raises
+ ------
+ NotImplementedError
+ """
+
+ LOGGER.error("calc_freq_curve is not defined for ImpactForecast")
+ raise NotImplementedError("calc_freq_curve is not defined for ImpactForecast")
+
+ @classmethod
+ def from_hdf5(cls, file_path: Union[str, Path]):
+ """Create an ImpactForecast object from an H5 file.
+
+ This assumes a specific layout of the file. If values are not found in the
+ expected places, they will be set to the default values for an ``Impact`` object.
+
+ The following H5 file structure is assumed (H5 groups are terminated with ``/``,
+ attributes are denoted by ``.attrs/``)::
+
+ file.h5
+ ├─ at_event
+ ├─ coord_exp
+ ├─ eai_exp
+ ├─ event_id
+ ├─ event_name
+ ├─ frequency
+ ├─ imp_mat
+ ├─ lead_time
+ ├─ member
+ ├─ .attrs/
+ │ ├─ aai_agg
+ │ ├─ crs
+ │ ├─ frequency_unit
+ │ ├─ haz_type
+ │ ├─ tot_value
+ │ ├─ unit
+
+ As per the :py:func:`climada.engine.impact.Impact.__init__`, any of these entries
+ is optional. If it is not found, the default value will be used when constructing
+ the Impact.
+
+ The impact matrix ``imp_mat`` can either be an H5 dataset, in which case it is
+ interpreted as dense representation of the matrix, or an H5 group, in which case
+ the group is expected to contain the following data for instantiating a
+ `scipy.sparse.csr_matrix `_::
+
+ imp_mat/
+ ├─ data
+ ├─ indices
+ ├─ indptr
+ ├─ .attrs/
+ │ ├─ shape
+
+ Parameters
+ ----------
+ file_path : str or Path
+ The file path of the file to read.
+
+ Returns
+ -------
+ imp : ImpactForecast
+ ImpactForecast with data from the given file
+ """
+ return super().from_hdf5(file_path, add_array_attrs={"member", "lead_time"})
+
+ def _check_sizes(self):
+ """Check sizes of forecast data vs. impact data.
+
+ Raises
+ ------
+ ValueError
+ If the sizes of the forecast data do not match the
+ :py:attr:`~climada.engine.impact.Impact.event_id`
+ """
+ num_entries = len(self.event_id)
+ size(exp_len=num_entries, var=self.member, var_name="Forecast.member")
+ size(exp_len=num_entries, var=self.lead_time, var_name="Forecast.lead_time")
+
+ def _reduce_attrs(self, event_name: str):
+ """
+ Reduce the attributes of an ImpactForecast to a single value.
+
+ Attributes are modified as follows:
+ - lead_time: set to NaT
+ - member: set to -1
+ - event_id: set to 0
+ - event_name: set to the name of the reduction method (default)
+ - date: set to 0
+ - frequency: set to 1
+
+ Parameters
+ ----------
+ event_name : str
+ The event name given to the reduced data.
+ """
+ reduced_attrs = {
+ "lead_time": np.array([np.timedelta64("NaT")]),
+ "member": np.array([-1]),
+ "event_id": np.array([0]),
+ "event_name": np.array([event_name]),
+ "date": np.array([0]),
+ "frequency": np.array([1]),
+ }
+
+ return reduced_attrs
+
+ def min(self):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the minimum
+ value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ ImpactForecast
+ An ImpactForecast object with the min impact matrix and at_event.
+ """
+ red_imp_mat = self.imp_mat.min(axis=0).tocsr()
+ red_at_event = np.array([red_imp_mat.sum()])
+ return ImpactForecast(
+ frequency_unit=self.frequency_unit,
+ coord_exp=self.coord_exp,
+ crs=self.crs,
+ eai_exp=self.eai_exp,
+ at_event=red_at_event,
+ tot_value=self.tot_value,
+ aai_agg=self.aai_agg,
+ unit=self.unit,
+ imp_mat=red_imp_mat,
+ haz_type=self.haz_type,
+ **self._reduce_attrs("min"),
+ )
+
+ def max(self):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the maximum
+ value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ ImpactForecast
+ An ImpactForecast object with the max impact matrix and at_event.
+ """
+ red_imp_mat = self.imp_mat.max(axis=0).tocsr()
+ red_at_event = np.array([red_imp_mat.sum()])
+ return ImpactForecast(
+ frequency_unit=self.frequency_unit,
+ coord_exp=self.coord_exp,
+ crs=self.crs,
+ eai_exp=self.eai_exp,
+ at_event=red_at_event,
+ tot_value=self.tot_value,
+ aai_agg=self.aai_agg,
+ unit=self.unit,
+ imp_mat=red_imp_mat,
+ haz_type=self.haz_type,
+ **self._reduce_attrs("max"),
+ )
+
+ def mean(self):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the mean value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ ImpactForecast
+ An ImpactForecast object with the mean impact matrix and at_event.
+ """
+ red_imp_mat = sparse.csr_matrix(self.imp_mat.mean(axis=0))
+ red_at_event = np.array([red_imp_mat.sum()])
+ return ImpactForecast(
+ frequency_unit=self.frequency_unit,
+ coord_exp=self.coord_exp,
+ crs=self.crs,
+ eai_exp=self.eai_exp,
+ at_event=red_at_event,
+ tot_value=self.tot_value,
+ aai_agg=self.aai_agg,
+ unit=self.unit,
+ imp_mat=red_imp_mat,
+ haz_type=self.haz_type,
+ **self._reduce_attrs("mean"),
+ )
+
+ def select(
+ self,
+ event_ids=None,
+ event_names=None,
+ dates=None,
+ coord_exp=None,
+ reset_frequency=False,
+ member=None,
+ lead_time=None,
+ ):
+ """Select entries based on the parameters and return a new instance.
+ The selection will contain the intersection of all given parameters.
+
+ Parameters
+ ----------
+ member : Sequence of ints
+ Ensemble members to select
+ lead_time : Sequence of numpy.timedelta64
+ Lead times to select
+
+ See Also
+ --------
+ :py:meth:`~climada.engine.impact.Impact.select`
+ """
+ if member is not None or lead_time is not None:
+ mask_member = (
+ self.idx_member(member)
+ if member is not None
+ else np.full_like(self.member, True, dtype=bool)
+ )
+ mask_lead_time = (
+ self.idx_lead_time(lead_time)
+ if lead_time is not None
+ else np.full_like(self.lead_time, True, dtype=bool)
+ )
+ event_id_from_forecast_mask = np.asarray(self.event_id)[
+ (mask_member & mask_lead_time)
+ ]
+ event_ids = (
+ np.intersect1d(event_ids, event_id_from_forecast_mask)
+ if event_ids is not None
+ else event_id_from_forecast_mask
+ )
+
+ return super().select(
+ event_ids=event_ids,
+ event_names=event_names,
+ dates=dates,
+ coord_exp=coord_exp,
+ reset_frequency=reset_frequency,
+ )
+
+ def _quantile(self, q: float, event_name: str | None = None):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the quantile value.
+ """
+ red_imp_mat = sparse.csr_matrix(np.quantile(self.imp_mat.toarray(), q, axis=0))
+ red_at_event = np.array([red_imp_mat.sum()])
+ if event_name is None:
+ event_name = f"quantile_{q}"
+ return ImpactForecast(
+ frequency_unit=self.frequency_unit,
+ coord_exp=self.coord_exp,
+ crs=self.crs,
+ eai_exp=self.eai_exp,
+ at_event=red_at_event,
+ tot_value=self.tot_value,
+ aai_agg=self.aai_agg,
+ unit=self.unit,
+ imp_mat=red_imp_mat,
+ haz_type=self.haz_type,
+ **self._reduce_attrs(event_name),
+ )
+
+ def quantile(self, q: float):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the quantile value.
+
+ Parameters
+ ----------
+ q : float
+ The quantile to compute, which must be between 0 and 1.
+
+ Returns
+ -------
+ ImpactForecast
+ An ImpactForecast object with the quantile impact matrix and at_event.
+ """
+ return self._quantile(q=q)
+
+ def median(self):
+ """
+ Reduce the impact matrix and at_event of an ImpactForecast to the median value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ ImpactForecast
+ An ImpactForecast object with the median impact matrix and at_event.
+ """
+ return self._quantile(q=0.5, event_name="median")
diff --git a/climada/engine/test/test_impact.py b/climada/engine/test/test_impact.py
index 38b3def3d8..f91fc2de98 100644
--- a/climada/engine/test/test_impact.py
+++ b/climada/engine/test/test_impact.py
@@ -47,26 +47,30 @@
STR_DT = h5py.special_dtype(vlen=str)
-def dummy_impact():
- """Return an impact object for testing"""
- return Impact(
- event_id=np.arange(6) + 10,
- event_name=[0, 1, "two", "three", 30, 31],
- date=np.arange(6),
- coord_exp=np.array([[1, 2], [1.5, 2.5]]),
- crs=DEF_CRS,
- eai_exp=np.array([7.2, 7.2]),
- at_event=np.array([0, 2, 4, 6, 60, 62]),
- frequency=np.array([1 / 6, 1 / 6, 1, 1, 1 / 30, 1 / 30]),
- tot_value=7,
- aai_agg=14.4,
- unit="USD",
- frequency_unit="1/month",
- imp_mat=sparse.csr_matrix(
+def impact_kwargs():
+ return {
+ "event_id": np.arange(6) + 10,
+ "event_name": [0, 1, "two", "three", 30, 31],
+ "date": np.arange(6),
+ "coord_exp": np.array([[1, 2], [1.5, 2.5]]),
+ "crs": DEF_CRS,
+ "eai_exp": np.array([7.2, 7.2]),
+ "at_event": np.array([0, 2, 4, 6, 60, 62]),
+ "frequency": np.array([1 / 6, 1 / 6, 1, 1, 1 / 30, 1 / 30]),
+ "tot_value": 7,
+ "aai_agg": 14.4,
+ "unit": "USD",
+ "frequency_unit": "1/month",
+ "imp_mat": sparse.csr_matrix(
np.array([[0, 0], [1, 1], [2, 2], [3, 3], [30, 30], [31, 31]])
),
- haz_type="TC",
- )
+ "haz_type": "TC",
+ }
+
+
+def dummy_impact():
+ """Return an impact object for testing"""
+ return Impact(**impact_kwargs())
def dummy_impact_yearly():
diff --git a/climada/engine/test/test_impact_calc.py b/climada/engine/test/test_impact_calc.py
index bd606c6e19..dd69de7249 100644
--- a/climada/engine/test/test_impact_calc.py
+++ b/climada/engine/test/test_impact_calc.py
@@ -26,14 +26,18 @@
import geopandas as gpd
import numpy as np
+import pandas as pd
+import pytest
from scipy import sparse
from climada import CONFIG
from climada.engine import Impact, ImpactCalc
from climada.engine.impact_calc import LOGGER as ILOG
+from climada.engine.impact_forecast import ImpactForecast
from climada.entity import Exposures, ImpactFunc, ImpactFuncSet, ImpfTropCyclone
from climada.entity.entity_def import Entity
from climada.hazard.base import Centroids, Hazard
+from climada.hazard.forecast import HazardForecast
from climada.test import get_test_file
from climada.util.api_client import Client
from climada.util.config import Config
@@ -46,17 +50,102 @@
DATA_FOLDER.mkdir(exist_ok=True)
-def check_impact(self, imp, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array=None):
- """Test properties of imapcts"""
- self.assertEqual(len(haz.event_id), len(imp.at_event))
- self.assertIsInstance(imp, Impact)
+@pytest.fixture(params=[50, 1, 0])
+def exposure(request):
+ n_exp = request.param
+ lats = np.linspace(-10, 10, n_exp)
+ lons = np.linspace(-10, 10, n_exp)
+ data = gpd.GeoDataFrame(
+ {
+ "impf_TC": 1,
+ "value": 1,
+ },
+ index=range(n_exp),
+ geometry=gpd.points_from_xy(lons, lats),
+ crs="EPSG:4326",
+ )
+ exposures = Exposures(data=data)
+ return exposures
+
+
+@pytest.fixture
+def hazard(exposure):
+ n_events = 10
+ centroids = Centroids(
+ lat=exposure.gdf.geometry.x,
+ lon=exposure.gdf.geometry.y,
+ )
+ intensity = sparse.csr_matrix(
+ np.ones((n_events, exposure.gdf.shape[0])) * 50
+ ) # uniform intensity
+ haz = Hazard()
+ haz.event_id = np.arange(n_events)
+ haz.event_name = haz.event_id.tolist()
+ haz.haz_type = "TC"
+ haz.date = haz.event_id
+ haz.frequency_unit = "m/s"
+ haz.centroids = centroids
+ haz.intensity = intensity
+ haz.frequency = 1 / 10 * np.ones(n_events) # uniform frequency (10 n_events)
+ return haz
+
+
+@pytest.fixture
+def hazard_forecast(hazard):
+ n_events = hazard.size
+ lead_time = pd.timedelta_range("1h", periods=n_events).to_numpy()
+ member = np.arange(n_events)
+ haz_fc = HazardForecast.from_hazard(
+ hazard=hazard,
+ lead_time=lead_time,
+ member=member,
+ )
+ return haz_fc
+
+
+@pytest.fixture
+def impact_func_set(exposure, hazard):
+ step_impf = ImpactFunc()
+ step_impf.id = 1
+ try:
+ step_impf.id = exposure.data[f"impf_{hazard.haz_type}"].unique()[0]
+ except IndexError:
+ pass
+ step_impf.haz_type = hazard.haz_type
+ step_impf.name = "fixture step function"
+ step_impf.intensity_unit = ""
+ step_impf.intensity = np.array([0, 0.495, 0.4955, 0.5, 1, 10])
+ step_impf.mdd = np.array([0, 0, 0, 1, 1, 1])
+ step_impf.paa = np.sort(np.linspace(1, 1, num=6))
+ return ImpactFuncSet([step_impf])
+
+
+@pytest.fixture
+def impact_calc(exposure, hazard):
+ imp_mat = np.ones((len(hazard.event_id), exposure.gdf.shape[0]))
+ aai_agg = np.sum(exposure.gdf["value"]) * hazard.frequency[0]
+ eai_exp = np.ones(exposure.gdf.shape[0]) * hazard.frequency[0]
+ at_event = np.ones(hazard.size) * np.sum(exposure.gdf["value"])
+ return {
+ "imp_mat": imp_mat,
+ "aai_agg": aai_agg,
+ "eai_exp": eai_exp,
+ "at_event": at_event,
+ }
+
+
+def check_impact(imp, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array=None):
+ """Test properties of impacts"""
+ # NOTE: Correctly compares NaNs!
+ assert len(haz.event_id) == len(imp.at_event)
+ assert isinstance(imp, Impact)
np.testing.assert_allclose(imp.coord_exp[:, 0], exp.latitude)
np.testing.assert_allclose(imp.coord_exp[:, 1], exp.longitude)
- self.assertAlmostEqual(imp.aai_agg, aai_agg, 3)
+ np.testing.assert_allclose(imp.aai_agg, aai_agg, rtol=1e-3)
np.testing.assert_allclose(imp.eai_exp, eai_exp, rtol=1e-5)
np.testing.assert_allclose(imp.at_event, at_event, rtol=1e-5)
if imp_mat_array is not None:
- np.testing.assert_allclose(imp.imp_mat.toarray().ravel(), imp_mat_array.ravel())
+ np.testing.assert_allclose(imp.imp_mat.todense(), imp_mat_array)
class TestImpactCalc(unittest.TestCase):
@@ -300,7 +389,7 @@ def test_calc_impact_RF_pass(self):
]
)
# fmt: on
- check_impact(self, impact, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array)
+ check_impact(impact, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array)
def test_empty_impact(self):
"""Check that empty impact is returned if no centroids match the exposures"""
@@ -311,11 +400,11 @@ def test_empty_impact(self):
aai_agg = 0.0
eai_exp = np.zeros(len(exp.gdf))
at_event = np.zeros(HAZ.size)
- check_impact(self, impact, HAZ, exp, aai_agg, eai_exp, at_event, None)
+ check_impact(impact, HAZ, exp, aai_agg, eai_exp, at_event, None)
impact = icalc.impact(save_mat=True, assign_centroids=False)
imp_mat_array = sparse.csr_matrix((HAZ.size, len(exp.gdf))).toarray()
- check_impact(self, impact, HAZ, exp, aai_agg, eai_exp, at_event, imp_mat_array)
+ check_impact(impact, HAZ, exp, aai_agg, eai_exp, at_event, imp_mat_array)
def test_single_event_impact(self):
"""Check impact for single event"""
@@ -325,11 +414,11 @@ def test_single_event_impact(self):
aai_agg = 0.0
eai_exp = np.zeros(len(ENT.exposures.gdf))
at_event = np.array([0])
- check_impact(self, impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, None)
+ check_impact(impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, None)
impact = icalc.impact(save_mat=True, assign_centroids=False)
imp_mat_array = sparse.csr_matrix((haz.size, len(ENT.exposures.gdf))).toarray()
check_impact(
- self, impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, imp_mat_array
+ impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, imp_mat_array
)
def test_calc_impact_save_mat_pass(self):
@@ -603,7 +692,47 @@ def test_single_exp_zero_mdr(self):
imp = ImpactCalc(exp, impf_set, haz).impact(
assign_centroids=False, save_mat=True
)
- check_impact(self, imp, haz, exp, aai_agg, eai_exp, at_event, at_event)
+ check_impact(imp, haz, exp, aai_agg, eai_exp, at_event, np.array([at_event]).T)
+
+
+class TestImpactCalcForecast:
+ """Test impact calc for forecast hazard"""
+
+ @pytest.fixture
+ def impact_calc_forecast(self, impact_calc):
+ """Write NaNs to attributes that are not used"""
+ impact_calc["aai_agg"] = np.full_like(impact_calc["aai_agg"], np.nan)
+ impact_calc["eai_exp"] = np.full_like(impact_calc["eai_exp"], np.nan)
+
+ def test_impact_forecast(
+ self,
+ exposure,
+ hazard_forecast,
+ impact_func_set,
+ impact_calc,
+ impact_calc_forecast,
+ ):
+ """Test that ImpactForecast is returned correctly"""
+ impact = ImpactCalc(exposure, impact_func_set, hazard_forecast).impact(
+ assign_centroids=True, save_mat=True
+ )
+ # check that impact is indeed ImpactForecast
+ impact_calc["imp_mat_array"] = impact_calc.pop("imp_mat")
+ check_impact(imp=impact, haz=hazard_forecast, exp=exposure, **impact_calc)
+ assert isinstance(impact, ImpactForecast)
+ np.testing.assert_array_equal(impact.lead_time, hazard_forecast.lead_time)
+ assert impact.lead_time.dtype == hazard_forecast.lead_time.dtype
+ np.testing.assert_array_equal(impact.member, hazard_forecast.member)
+
+ def test_impact_forecast_empty_impmat_error(
+ self, hazard_forecast, exposure, impact_func_set
+ ):
+ """Test that error is raised when trying to compute impact forecast
+ without saving impact matrix
+ """
+ icalc = ImpactCalc(exposure, impact_func_set, hazard_forecast)
+ with pytest.raises(ValueError, match="Saving impact matrix is required"):
+ icalc.impact(assign_centroids=True, save_mat=False)
class TestImpactMatrixCalc(unittest.TestCase):
@@ -901,6 +1030,7 @@ def test_skip_mat(self, from_eih_mock):
# Execute Tests
if __name__ == "__main__":
TESTS = unittest.TestLoader().loadTestsFromTestCase(TestImpactCalc)
+ TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactCalcForecast))
TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestReturnImpact))
TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactMatrix))
TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactMatrixCalc))
diff --git a/climada/engine/test/test_impact_forecast.py b/climada/engine/test/test_impact_forecast.py
new file mode 100644
index 0000000000..ac47deab99
--- /dev/null
+++ b/climada/engine/test/test_impact_forecast.py
@@ -0,0 +1,346 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Tests for Impact Forecast.
+"""
+
+import numpy as np
+import numpy.testing as npt
+import pandas as pd
+import pytest
+from scipy.sparse import csr_matrix
+
+from climada.engine import Impact, ImpactForecast
+
+from .test_impact import impact_kwargs as imp_kwargs
+
+
+@pytest.fixture
+def impact_kwargs():
+ return imp_kwargs()
+
+
+@pytest.fixture
+def impact(impact_kwargs):
+ return Impact(**impact_kwargs)
+
+
+@pytest.fixture
+def lead_time(impact_kwargs):
+ return pd.timedelta_range(
+ start="1 day", periods=len(impact_kwargs["event_id"])
+ ).to_numpy()
+
+
+@pytest.fixture
+def member(impact_kwargs):
+ return np.arange(len(impact_kwargs["event_id"]))
+
+
+@pytest.fixture
+def impact_forecast(impact, lead_time, member):
+ return ImpactForecast.from_impact(impact, lead_time=lead_time, member=member)
+
+
+class TestImpactForecastInit:
+ def assert_impact_kwargs(self, impact: Impact, **kwargs):
+ for key, value in kwargs.items():
+ attr = getattr(impact, key)
+ if isinstance(value, (np.ndarray, list)):
+ npt.assert_array_equal(attr, value)
+ elif isinstance(value, csr_matrix):
+ npt.assert_array_equal(attr.todense(), value.todense())
+ else:
+ assert attr == value
+
+ def test_impact_forecast_init(self, impact_kwargs, lead_time, member):
+ forecast1 = ImpactForecast(
+ lead_time=lead_time,
+ member=member,
+ **impact_kwargs,
+ )
+ npt.assert_array_equal(forecast1.lead_time, lead_time)
+ npt.assert_array_equal(forecast1.member, member)
+ self.assert_impact_kwargs(forecast1, **impact_kwargs)
+
+ def test_impact_forecast_init_error(self, impact, impact_kwargs, lead_time, member):
+ with pytest.raises(ValueError, match="Forecast.lead_time"):
+ ImpactForecast(lead_time=lead_time[:-2], member=member, **impact_kwargs)
+ with pytest.raises(ValueError, match="Forecast.member"):
+ ImpactForecast.from_impact(impact, lead_time=lead_time, member=member[1:])
+
+ def test_impact_forecast_from_impact(
+ self, impact_forecast, impact_kwargs, lead_time, member
+ ):
+ npt.assert_array_equal(impact_forecast.lead_time, lead_time)
+ npt.assert_array_equal(impact_forecast.member, member)
+ self.assert_impact_kwargs(impact_forecast, **impact_kwargs)
+
+
+class TestSelect:
+
+ @pytest.mark.parametrize(
+ "var, var_select",
+ [("event_id", "event_ids"), ("event_name", "event_names"), ("date", "dates")],
+ )
+ def test_base_class_select(
+ self, impact_forecast, lead_time, member, impact_kwargs, var, var_select
+ ):
+ """Check if Impact.select works on the derived class"""
+ select_mask = np.array([2, 1])
+ ordered_select_mask = np.array([1, 2])
+ if var == "date":
+ # Date needs to be a valid delta
+ select_mask = np.array([1, 2])
+ ordered_select_mask = np.array([1, 2])
+
+ var_value = np.array(impact_kwargs[var])[select_mask]
+ # event_name is a list, convert to numpy array for indexing
+ impact_fc = impact_forecast.select(**{var_select: var_value})
+ # NOTE: Events keep their original order
+ npt.assert_array_equal(
+ impact_fc.event_id,
+ impact_forecast.event_id[ordered_select_mask],
+ )
+ npt.assert_array_equal(
+ impact_fc.event_name,
+ np.array(impact_forecast.event_name)[ordered_select_mask],
+ )
+ npt.assert_array_equal(
+ impact_fc.date, impact_forecast.date[ordered_select_mask]
+ )
+ npt.assert_array_equal(
+ impact_fc.frequency, impact_forecast.frequency[ordered_select_mask]
+ )
+ npt.assert_array_equal(impact_fc.member, member[ordered_select_mask])
+ npt.assert_array_equal(impact_fc.lead_time, lead_time[ordered_select_mask])
+ npt.assert_array_equal(
+ impact_fc.imp_mat.todense(),
+ impact_forecast.imp_mat.todense()[ordered_select_mask],
+ )
+
+ def test_impact_forecast_select_exposure(
+ self, impact_forecast, lead_time, member, impact_kwargs
+ ):
+ """Check if Impact.select works on the derived class"""
+ exp_col = 0
+ select_mask = np.array([exp_col])
+ coord_exp = impact_kwargs["coord_exp"][select_mask]
+ impact_fc = impact_forecast.select(coord_exp=coord_exp)
+ npt.assert_array_equal(impact_fc.member, member)
+ npt.assert_array_equal(impact_fc.lead_time, lead_time)
+ npt.assert_array_equal(
+ impact_fc.imp_mat.todense(), impact_forecast.imp_mat.todense()[:, exp_col]
+ )
+
+ def test_derived_select_single(self, impact_forecast, lead_time, member):
+ imp_fc_select = impact_forecast.select(member=[2, 0])
+ idx = np.array([0, 2])
+ npt.assert_array_equal(imp_fc_select.event_id, impact_forecast.event_id[idx])
+ npt.assert_array_equal(imp_fc_select.member, member[idx])
+ npt.assert_array_equal(imp_fc_select.lead_time, lead_time[idx])
+
+ imp_fc_select = impact_forecast.select(lead_time=lead_time[np.array([2, 0])])
+ npt.assert_array_equal(imp_fc_select.event_id, impact_forecast.event_id[idx])
+ npt.assert_array_equal(imp_fc_select.member, member[idx])
+ npt.assert_array_equal(imp_fc_select.lead_time, lead_time[idx])
+
+ def test_derived_select_intersections(
+ self, impact_forecast, lead_time, member, impact_kwargs
+ ):
+ imp_fc_select = impact_forecast.select(event_ids=[10, 14], member=[0, 1, 2])
+ npt.assert_array_equal(
+ imp_fc_select.event_id, impact_forecast.event_id[np.array([0])]
+ )
+
+ imp_fc_select = impact_forecast.select(
+ event_ids=[10, 11, 13], member=[0, 1, 2], lead_time=lead_time[1:3]
+ )
+ npt.assert_array_equal(
+ imp_fc_select.event_id, impact_forecast.event_id[np.array([1])]
+ )
+
+ # Test "outer"
+ impact_forecast2 = ImpactForecast(
+ lead_time=lead_time,
+ member=np.zeros_like(member, dtype="int"),
+ **impact_kwargs,
+ )
+ imp_fc_select = impact_forecast2.select(event_ids=[10, 11, 13], member=[0])
+ npt.assert_array_equal(imp_fc_select.event_id, [10, 11, 13])
+ npt.assert_array_equal(imp_fc_select.member, [0, 0, 0])
+
+ def test_no_select(self, impact_forecast, impact_kwargs):
+ imp_fc_select = impact_forecast.select()
+ npt.assert_array_equal(
+ imp_fc_select.imp_mat.todense(), impact_forecast.imp_mat.todense()
+ )
+
+ num_centroids = len(impact_kwargs["coord_exp"])
+ imp_fc_select = impact_forecast.select(event_names=["aaaaa", "foo"])
+ assert imp_fc_select.imp_mat.shape == (0, num_centroids)
+ imp_fc_select = impact_forecast.select(event_ids=[-1, 1002])
+ assert imp_fc_select.imp_mat.shape == (0, num_centroids)
+ imp_fc_select = impact_forecast.select(member=[-1])
+ assert imp_fc_select.imp_mat.shape == (0, num_centroids)
+ imp_fc_select = impact_forecast.select(np.timedelta64("3", "Y"))
+ assert imp_fc_select.imp_mat.shape == (0, num_centroids)
+
+
+def test_impact_forecast_concat(impact_forecast, member, lead_time):
+ """Check if Impact.concat works on the derived class"""
+ impact_fc = ImpactForecast.concat(
+ [impact_forecast, impact_forecast], reset_event_ids=True
+ )
+ npt.assert_array_equal(impact_fc.member, np.concatenate([member, member]))
+ npt.assert_array_equal(impact_fc.lead_time, np.concatenate([lead_time, lead_time]))
+ npt.assert_array_equal(
+ impact_fc.event_id, np.arange(impact_fc.imp_mat.shape[0]) + 1
+ )
+ npt.assert_array_equal(impact_fc.event_name, impact_forecast.event_name * 2)
+ npt.assert_array_equal(
+ impact_fc.imp_mat.toarray(),
+ np.vstack(
+ (impact_forecast.imp_mat.toarray(), impact_forecast.imp_mat.toarray())
+ ),
+ )
+
+
+def test_impact_forecast_blocked_methods(impact_forecast):
+ """Check if ImpactForecast.exceedance_freq_curve raises NotImplementedError"""
+ with pytest.raises(NotImplementedError):
+ impact_forecast.local_exceedance_impact(np.array([10, 50, 100]))
+
+ with pytest.raises(NotImplementedError):
+ impact_forecast.local_return_period(np.array([10, 50, 100]))
+
+ with pytest.raises(NotImplementedError):
+ impact_forecast.calc_freq_curve(np.array([10, 50, 100]))
+
+
+@pytest.mark.parametrize("dense", [True, False])
+def test_write_read_hdf5(impact_forecast, tmp_path, dense):
+
+ file_name = tmp_path / "test_hazard_forecast.h5"
+ # replace dummy_impact event_names with strings
+ impact_forecast.event_name = [str(name) for name in impact_forecast.event_name]
+ impact_forecast.write_hdf5(file_name, dense_imp_mat=dense)
+
+ def compare_attr(obj, attr):
+ actual = getattr(obj, attr)
+ expected = getattr(impact_forecast, attr)
+ if isinstance(actual, csr_matrix):
+ npt.assert_array_equal(actual.todense(), expected.todense())
+ else:
+ npt.assert_array_equal(actual, expected)
+
+ # Read ImpactForecast
+ impact_forecast_read = ImpactForecast.from_hdf5(file_name)
+ assert impact_forecast_read.lead_time.dtype.kind == np.dtype("timedelta64").kind
+ for attr in impact_forecast.__dict__.keys():
+ compare_attr(impact_forecast_read, attr)
+
+ # Read Impact
+ impact_read = Impact.from_hdf5(file_name)
+ for attr in impact_read.__dict__.keys():
+ compare_attr(impact_read, attr)
+ assert "member" not in impact_read.__dict__
+ assert "lead_time" not in impact_read.__dict__
+
+
+@pytest.fixture
+def impact_forecast_stats(impact_kwargs, lead_time, member):
+ max_index = 4
+ for key, val in impact_kwargs.items():
+ if isinstance(val, (np.ndarray, list)):
+ impact_kwargs[key] = val[:max_index]
+ elif isinstance(val, csr_matrix):
+ impact_kwargs[key] = val[:max_index, :]
+ impact_kwargs["imp_mat"] = csr_matrix([[1, 0], [0, 1], [3, 2], [2, 3]])
+ impact_kwargs["at_event"] = np.array([1, 1, 5, 5])
+ return ImpactForecast(
+ lead_time=lead_time[:max_index], member=member[:max_index], **impact_kwargs
+ )
+
+
+@pytest.mark.parametrize("attr", ["min", "mean", "max"])
+def test_impact_forecast_min_mean_max(impact_forecast_stats, attr):
+ """Check mean, min, and max methods for ImpactForecast"""
+ imp_fc_reduced = getattr(impact_forecast_stats, attr)()
+
+ # assert imp_mat
+ npt.assert_array_equal(
+ imp_fc_reduced.imp_mat.todense(),
+ getattr(impact_forecast_stats.imp_mat.todense(), attr)(axis=0),
+ )
+ at_event_expected = {"min": [0], "mean": [3], "max": [6]}
+ npt.assert_array_equal(imp_fc_reduced.at_event, at_event_expected[attr])
+
+ # check that attributes where reduced correctly
+ npt.assert_array_equal(np.isnat(imp_fc_reduced.lead_time), [True])
+ npt.assert_array_equal(imp_fc_reduced.member, [-1])
+ npt.assert_array_equal(imp_fc_reduced.event_name, [attr])
+ npt.assert_array_equal(imp_fc_reduced.event_id, [0])
+ npt.assert_array_equal(imp_fc_reduced.frequency, [1])
+ npt.assert_array_equal(imp_fc_reduced.date, [0])
+
+
+@pytest.mark.parametrize("quantile", [0.3, 0.6, 0.8])
+def test_impact_forecast_quantile(impact_forecast, quantile):
+ """Check quantile method for ImpactForecast"""
+ imp_fcst_quantile = impact_forecast.quantile(q=quantile)
+
+ # assert imp_mat
+ npt.assert_array_equal(
+ imp_fcst_quantile.imp_mat.toarray().squeeze(),
+ np.quantile(impact_forecast.imp_mat.toarray(), quantile, axis=0),
+ )
+ # assert at_event
+ npt.assert_array_equal(
+ imp_fcst_quantile.at_event,
+ np.quantile(impact_forecast.at_event, quantile, axis=0).sum(),
+ )
+
+ # check that attributes where reduced correctly
+ npt.assert_array_equal(imp_fcst_quantile.member, np.array([-1]))
+ npt.assert_array_equal(
+ imp_fcst_quantile.lead_time, np.array([np.timedelta64("NaT")])
+ )
+ npt.assert_array_equal(imp_fcst_quantile.event_id, np.array([0]))
+ npt.assert_array_equal(
+ imp_fcst_quantile.event_name, np.array([f"quantile_{quantile}"])
+ )
+ npt.assert_array_equal(imp_fcst_quantile.frequency, np.array([1]))
+ npt.assert_array_equal(imp_fcst_quantile.date, np.array([0]))
+
+
+def test_median(impact_forecast):
+ imp_fcst_median = impact_forecast.median()
+ imp_fcst_quantile = impact_forecast.quantile(q=0.5)
+ npt.assert_array_equal(
+ imp_fcst_median.imp_mat.toarray(), imp_fcst_quantile.imp_mat.toarray()
+ )
+ npt.assert_array_equal(imp_fcst_median.imp_mat.toarray(), [[2.5, 2.5]])
+
+ # check that attributes where reduced correctly
+ npt.assert_array_equal(imp_fcst_median.member, np.array([-1]))
+ npt.assert_array_equal(imp_fcst_median.lead_time, np.array([np.timedelta64("NaT")]))
+ npt.assert_array_equal(imp_fcst_median.event_id, np.array([0]))
+ npt.assert_array_equal(imp_fcst_median.event_name, np.array(["median"]))
+ npt.assert_array_equal(imp_fcst_median.frequency, np.array([1]))
+ npt.assert_array_equal(imp_fcst_median.date, np.array([0]))
diff --git a/climada/hazard/forecast.py b/climada/hazard/forecast.py
new file mode 100644
index 0000000000..cf6980f5ff
--- /dev/null
+++ b/climada/hazard/forecast.py
@@ -0,0 +1,340 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Define Forecast variant of Hazard.
+"""
+
+import logging
+
+import numpy as np
+import scipy.sparse as sparse
+
+from ..util.checker import size
+from ..util.forecast import Forecast
+from .base import Hazard
+
+LOGGER = logging.getLogger(__name__)
+
+
+class HazardForecast(Forecast, Hazard):
+ """A hazard object with forecast information"""
+
+ def __init__(
+ self,
+ lead_time: np.ndarray | None = None,
+ member: np.ndarray | None = None,
+ **hazard_kwargs,
+ ):
+ """
+ Initialize a HazardForecast object.
+
+ Parameters
+ ----------
+ lead_time : np.ndarray of np.timedelta64 or None, optional
+ Forecast lead times. Default is empty array.
+ member : np.ndarray or None, optional
+ Ensemble member identifiers as integers. Default is empty array.
+ **hazard_kwargs
+ keyword arguments to pass to :py:class:`~climada.hazard.base.Hazard` See
+ py:meth`~climada.hazard.base.Hazard.__init__` for details.
+ """
+ super().__init__(lead_time=lead_time, member=member, **hazard_kwargs)
+ self._check_sizes()
+
+ @classmethod
+ def from_hazard(cls, hazard: Hazard, lead_time: np.ndarray, member: np.ndarray):
+ """
+ Create a HazardForecast object from a Hazard object.
+
+ Parameters
+ ----------
+ hazard : climada.hazard.base.Hazard
+ Hazard object to convert into a HazardForecast.
+ lead_time : np.ndarray of np.timedelta64 or None, optional
+ Forecast lead times. Default is empty array.
+ member : np.ndarray or None, optional
+ Ensemble member identifiers as integers. Default is empty array.
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the same attributes as the input hazard,
+ but with lead_time and member attributes set from instance of HazardForecast.
+ """
+ return cls(
+ lead_time=lead_time,
+ member=member,
+ haz_type=hazard.haz_type,
+ pool=hazard.pool,
+ units=hazard.units,
+ centroids=hazard.centroids,
+ event_id=hazard.event_id,
+ frequency=hazard.frequency,
+ frequency_unit=hazard.frequency_unit,
+ event_name=hazard.event_name,
+ date=hazard.date,
+ orig=hazard.orig,
+ intensity=hazard.intensity,
+ fraction=hazard.fraction,
+ )
+
+ def _check_sizes(self):
+ """Check sizes of forecast data vs. hazard data.
+
+ Raises
+ ------
+ ValueError
+ If the sizes of the forecast data do not match the
+ :py:attr:`~climada.hazard.base.Hazard.event_id`
+ """
+ num_entries = len(self.event_id)
+ size(exp_len=num_entries, var=self.member, var_name="Forecast.member")
+ size(exp_len=num_entries, var=self.lead_time, var_name="Forecast.lead_time")
+
+ def _reduce_attrs(self, event_name: str):
+ """
+ Reduce the attributes of a HazardForecast to a single value.
+
+ Attributes are modified as follows:
+ - lead_time: set to NaT
+ - member: set to -1
+ - event_id: set to 0
+ - event_name: set to the name of the reduction method (default)
+ - date: set to 0
+ - frequency: set to 1
+
+ Parameters
+ ----------
+ event_name : str
+ The event_name given to the reduced data.
+ """
+ reduced_attrs = {
+ "lead_time": np.array([np.timedelta64("NaT")]),
+ "member": np.array([-1]),
+ "event_id": np.array([0]),
+ "event_name": np.array([event_name]),
+ "date": np.array([0]),
+ "frequency": np.array([1]),
+ "orig": np.array([True]),
+ }
+
+ return reduced_attrs
+
+ def min(self):
+ """
+ Reduce the intensity and fraction of a HazardForecast to the minimum
+ value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the min intensity and fraction.
+ """
+ red_intensity = self.intensity.min(axis=0).tocsr()
+ red_fraction = self.fraction.min(axis=0).tocsr()
+ return HazardForecast(
+ haz_type=self.haz_type,
+ pool=self.pool,
+ units=self.units,
+ centroids=self.centroids,
+ frequency_unit=self.frequency_unit,
+ intensity=red_intensity,
+ fraction=red_fraction,
+ **self._reduce_attrs("min"),
+ )
+
+ def max(self):
+ """
+ Reduce the intensity and fraction of a HazardForecast to the maximum
+ value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the min intensity and fraction.
+ """
+ red_intensity = self.intensity.max(axis=0).tocsr()
+ red_fraction = self.fraction.max(axis=0).tocsr()
+ return HazardForecast(
+ haz_type=self.haz_type,
+ pool=self.pool,
+ units=self.units,
+ centroids=self.centroids,
+ frequency_unit=self.frequency_unit,
+ intensity=red_intensity,
+ fraction=red_fraction,
+ **self._reduce_attrs("max"),
+ )
+
+ def mean(self):
+ """
+ Reduce the intensity and fraction of a HazardForecast to the mean value.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the min intensity and fraction.
+ """
+ red_intensity = sparse.csr_matrix(self.intensity.mean(axis=0))
+ red_fraction = sparse.csr_matrix(self.fraction.mean(axis=0))
+ return HazardForecast(
+ haz_type=self.haz_type,
+ pool=self.pool,
+ units=self.units,
+ centroids=self.centroids,
+ frequency_unit=self.frequency_unit,
+ intensity=red_intensity,
+ fraction=red_fraction,
+ **self._reduce_attrs("mean"),
+ )
+
+ @classmethod
+ def concat(cls, haz_list: list):
+ """Concatenate multiple HazardForecast instances and return a new object"""
+ if len(haz_list) == 0:
+ return cls()
+ hazard = Hazard.concat(haz_list)
+ lead_time = np.concatenate(tuple(haz.lead_time for haz in haz_list))
+ member = np.concatenate(tuple(haz.member for haz in haz_list))
+ return cls.from_hazard(hazard, lead_time=lead_time, member=member)
+
+ def select(
+ self,
+ member=None,
+ lead_time=None,
+ event_names=None,
+ event_id=None,
+ date=None,
+ orig=None,
+ reg_id=None,
+ extent=None,
+ reset_frequency=False,
+ ):
+ """Select entries based on the parameters and return a new instance.
+
+ The selection will contain the intersection of all given parameters.
+
+ Parameters
+ ----------
+ member : Sequence of ints
+ Ensemble members to select
+ lead_time : Sequence of numpy.timedelta64
+ Lead times to select
+
+ See Also
+ --------
+ :py:meth:`~climada.hazard.base.Hazard.select`
+ """
+ if member is not None or lead_time is not None:
+ mask_member = (
+ self.idx_member(member)
+ if member is not None
+ else np.full_like(self.member, True, dtype=bool)
+ )
+ mask_lead_time = (
+ self.idx_lead_time(lead_time)
+ if lead_time is not None
+ else np.full_like(self.lead_time, True, dtype=bool)
+ )
+ event_id_from_forecast_mask = np.asarray(self.event_id)[
+ (mask_member & mask_lead_time)
+ ]
+ event_id = (
+ np.intersect1d(event_id, event_id_from_forecast_mask)
+ if event_id is not None
+ else event_id_from_forecast_mask
+ )
+
+ return super().select(
+ event_names=event_names,
+ event_id=event_id,
+ date=date,
+ orig=orig,
+ reg_id=reg_id,
+ extent=extent,
+ reset_frequency=reset_frequency,
+ )
+
+ def _quantile(self, q: float, event_name: str | None = None):
+ """
+ Reduce the impact matrix and at_event of a HazardForecast to the quantile value.
+ """
+ red_intensity = sparse.csr_matrix(
+ np.quantile(self.intensity.toarray(), q, axis=0)
+ )
+ red_fraction = sparse.csr_matrix(
+ np.quantile(self.fraction.toarray(), q, axis=0)
+ )
+ if event_name is None:
+ event_name = f"quantile_{q}"
+ return HazardForecast(
+ haz_type=self.haz_type,
+ pool=self.pool,
+ units=self.units,
+ centroids=self.centroids,
+ frequency_unit=self.frequency_unit,
+ intensity=red_intensity,
+ fraction=red_fraction,
+ **self._reduce_attrs(event_name),
+ )
+
+ def quantile(self, q: float):
+ """
+ Reduce the impact matrix and at_event of a HazardForecast to the quantile value.
+
+ The quantile value is computed by taking the quantile of the impact matrix
+ along the event dimension axis (axis=0) and then taking the quantile of the
+ resulting array.
+
+ Parameters
+ ----------
+ q : float
+ The quantile to compute, between 0 and 1.
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the quantile intensity and fraction.
+ """
+ return self._quantile(q=q)
+
+ def median(self):
+ """
+ Reduce the impact matrix and at_event of a HazardForecast to the median value.
+
+ The median value is computed by taking the median of the impact matrix along the
+ event dimension axis (axis=0) and then taking the median of the resulting array.
+
+ Returns
+ -------
+ HazardForecast
+ A HazardForecast object with the median intensity and fraction.
+ """
+ return self._quantile(q=0.5, event_name="median")
diff --git a/climada/hazard/io.py b/climada/hazard/io.py
index 6f8bd01b09..03e608d050 100644
--- a/climada/hazard/io.py
+++ b/climada/hazard/io.py
@@ -917,6 +917,10 @@ def write_hdf5(self, file_name, todense=False):
# Centroids have their own write_hdf5 method,
# which is invoked at the end of this method (s.b.)
continue
+ elif var_name == "lead_time":
+ hf_data.create_dataset(
+ var_name, data=var_val.astype("timedelta64[ns]").astype("int64")
+ )
elif isinstance(var_val, sparse.csr_matrix):
if todense:
hf_data.create_dataset(var_name, data=var_val.toarray())
@@ -987,7 +991,11 @@ def from_hdf5(cls, file_name):
continue
if var_name == "centroids":
continue
- if isinstance(var_val, np.ndarray) and var_val.ndim == 1:
+ if var_name == "lead_time":
+ hazard_kwargs[var_name] = np.array(hf_data.get(var_name)).astype(
+ "timedelta64[ns]"
+ )
+ elif isinstance(var_val, np.ndarray) and var_val.ndim == 1:
hazard_kwargs[var_name] = np.array(hf_data.get(var_name))
elif isinstance(var_val, sparse.csr_matrix):
hf_csr = hf_data.get(var_name)
diff --git a/climada/hazard/test/test_base.py b/climada/hazard/test/test_base.py
index b4cbafd0e3..ea607893dd 100644
--- a/climada/hazard/test/test_base.py
+++ b/climada/hazard/test/test_base.py
@@ -46,27 +46,29 @@
"""
+def hazard_kwargs():
+ return {
+ "haz_type": "TC",
+ "pool": None,
+ "intensity": sparse.csr_matrix(
+ [[0.2, 0.3, 0.4], [0.1, 0.1, 0.01], [4.3, 2.1, 1.0], [5.3, 0.2, 0.0]]
+ ),
+ "fraction": sparse.csr_matrix(
+ [[0.02, 0.03, 0.04], [0.01, 0.01, 0.01], [0.3, 0.1, 0.0], [0.3, 0.2, 0.0]]
+ ),
+ "centroids": Centroids(lat=np.array([1, 3, 5]), lon=np.array([2, 4, 6])),
+ "event_id": np.array([1, 2, 3, 4]),
+ "event_name": ["ev1", "ev2", "ev3", "ev4"],
+ "date": np.array([1, 2, 3, 4]),
+ "orig": np.array([True, False, False, True]),
+ "frequency": np.array([0.1, 0.5, 0.5, 0.2]),
+ "frequency_unit": "1/week",
+ "units": "m/s",
+ }
+
+
def dummy_hazard():
- fraction = sparse.csr_matrix(
- [[0.02, 0.03, 0.04], [0.01, 0.01, 0.01], [0.3, 0.1, 0.0], [0.3, 0.2, 0.0]]
- )
- intensity = sparse.csr_matrix(
- [[0.2, 0.3, 0.4], [0.1, 0.1, 0.01], [4.3, 2.1, 1.0], [5.3, 0.2, 0.0]]
- )
-
- return Hazard(
- "TC",
- intensity=intensity,
- fraction=fraction,
- centroids=Centroids(lat=np.array([1, 3, 5]), lon=np.array([2, 4, 6])),
- event_id=np.array([1, 2, 3, 4]),
- event_name=["ev1", "ev2", "ev3", "ev4"],
- date=np.array([1, 2, 3, 4]),
- orig=np.array([True, False, False, True]),
- frequency=np.array([0.1, 0.5, 0.5, 0.2]),
- frequency_unit="1/week",
- units="m/s",
- )
+ return Hazard(**hazard_kwargs())
class TestLoader(unittest.TestCase):
diff --git a/climada/hazard/test/test_forecast.py b/climada/hazard/test/test_forecast.py
new file mode 100644
index 0000000000..26f26de4b1
--- /dev/null
+++ b/climada/hazard/test/test_forecast.py
@@ -0,0 +1,318 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Tests for Hazard Forecast.
+"""
+
+import numpy as np
+import numpy.testing as npt
+import pandas as pd
+import pytest
+from scipy.sparse import csr_matrix
+
+from climada.hazard.base import Hazard
+from climada.hazard.forecast import HazardForecast
+from climada.hazard.test.test_base import hazard_kwargs
+
+
+@pytest.fixture
+def haz_kwargs():
+ return hazard_kwargs()
+
+
+@pytest.fixture
+def hazard(haz_kwargs):
+ return Hazard(**haz_kwargs)
+
+
+@pytest.fixture
+def lead_time(haz_kwargs):
+ return pd.timedelta_range("1h", periods=len(haz_kwargs["event_id"])).to_numpy()
+
+
+@pytest.fixture
+def member(haz_kwargs):
+ return np.arange(len(haz_kwargs["event_id"]))
+
+
+@pytest.fixture
+def haz_fc(lead_time, member, haz_kwargs):
+ return HazardForecast(
+ lead_time=lead_time,
+ member=member,
+ **haz_kwargs,
+ )
+
+
+def assert_hazard_kwargs(hazard: Hazard, **kwargs):
+ for key, value in kwargs.items():
+ attr = getattr(hazard, key)
+ if isinstance(value, (np.ndarray, list)):
+ npt.assert_array_equal(attr, value)
+ elif isinstance(value, csr_matrix):
+ npt.assert_array_equal(attr.todense(), value.todense())
+ else:
+ assert attr == value
+
+
+def test_init_hazard_forecast(haz_fc, member, lead_time, haz_kwargs):
+ assert isinstance(haz_fc, HazardForecast)
+ npt.assert_array_equal(haz_fc.lead_time, lead_time)
+ assert haz_fc.lead_time.dtype == lead_time.dtype
+ npt.assert_array_equal(haz_fc.member, member)
+ assert_hazard_kwargs(haz_fc, **haz_kwargs)
+
+
+def test_init_hazard_forecast_error(hazard, member, lead_time, haz_kwargs):
+ with pytest.raises(ValueError, match="Forecast.lead_time"):
+ HazardForecast(lead_time=lead_time[:-2], member=member, **haz_kwargs)
+ with pytest.raises(ValueError, match="Forecast.member"):
+ HazardForecast.from_hazard(hazard, lead_time=lead_time, member=member[1:])
+
+
+def test_from_hazard(lead_time, member, hazard, haz_kwargs):
+ haz_fc_from_haz = HazardForecast.from_hazard(
+ hazard, lead_time=lead_time, member=member
+ )
+ assert isinstance(haz_fc_from_haz, HazardForecast)
+ npt.assert_array_equal(haz_fc_from_haz.lead_time, lead_time)
+ npt.assert_array_equal(haz_fc_from_haz.member, member)
+ assert_hazard_kwargs(haz_fc_from_haz, **haz_kwargs)
+
+
+class TestHazardForecastConcat:
+
+ def test_concat(self, haz_fc, lead_time, member, haz_kwargs):
+ haz_fc1 = haz_fc.select(event_id=[3])
+ haz_fc2 = HazardForecast(
+ haz_type=haz_kwargs["haz_type"], frequency_unit=haz_kwargs["frequency_unit"]
+ ) # Empty hazard
+ haz_fc3 = haz_fc.select(event_id=[1, 2])
+ haz_fc_concat = HazardForecast.concat([haz_fc1, haz_fc2, haz_fc3])
+ assert isinstance(haz_fc_concat, HazardForecast)
+ assert haz_fc_concat.size == 3
+ npt.assert_array_equal(
+ haz_fc_concat.lead_time, np.concatenate((lead_time[2:3], lead_time[0:2]))
+ )
+ npt.assert_array_equal(
+ haz_fc_concat.member, np.concatenate((member[2:3], member[0:2]))
+ )
+ npt.assert_array_equal(haz_fc_concat.event_id, [3, 1, 2])
+
+ def test_empty_list(self):
+ haz_concat = HazardForecast.concat([])
+ assert isinstance(haz_concat, HazardForecast)
+ assert haz_concat.size == 0
+ npt.assert_array_equal(haz_concat.lead_time, [])
+ npt.assert_array_equal(haz_concat.event_id, [])
+
+ def test_type_fail(self, haz_fc, hazard):
+ with pytest.raises(TypeError, match="different classes"):
+ HazardForecast.concat([haz_fc, hazard])
+ with pytest.raises(TypeError, match="different classes"):
+ Hazard.concat([haz_fc, hazard])
+
+
+class TestSelect:
+
+ @pytest.mark.parametrize(
+ "var, var_select",
+ [("event_id", "event_id"), ("event_name", "event_names"), ("date", "date")],
+ )
+ def test_base_class_select(
+ self, haz_fc, lead_time, member, haz_kwargs, var, var_select
+ ):
+ """Check if Hazard.select works on the derived class"""
+
+ select_mask = np.array([3, 2])
+ ordered_select_mask = np.array([3, 2])
+ if var == "date":
+ # Date needs to be a valid delta
+ select_mask = np.array([2, 3])
+ ordered_select_mask = np.array([2, 3])
+
+ var_value = np.array(haz_kwargs[var])[select_mask]
+ # event_name is a list, convert to numpy array for indexing
+ haz_fc_sel = haz_fc.select(**{var_select: var_value})
+ # Note: order is preserved
+ npt.assert_array_equal(
+ haz_fc_sel.event_id,
+ haz_fc.event_id[ordered_select_mask],
+ )
+ npt.assert_array_equal(
+ haz_fc_sel.event_name,
+ np.array(haz_fc.event_name)[ordered_select_mask],
+ )
+ npt.assert_array_equal(haz_fc_sel.date, haz_fc.date[ordered_select_mask])
+ npt.assert_array_equal(
+ haz_fc_sel.frequency, haz_fc.frequency[ordered_select_mask]
+ )
+ npt.assert_array_equal(haz_fc_sel.member, member[ordered_select_mask])
+ npt.assert_array_equal(haz_fc_sel.lead_time, lead_time[ordered_select_mask])
+ npt.assert_array_equal(
+ haz_fc_sel.intensity.todense(),
+ haz_fc.intensity.todense()[ordered_select_mask],
+ )
+ npt.assert_array_equal(
+ haz_fc_sel.fraction.todense(),
+ haz_fc.fraction.todense()[ordered_select_mask],
+ )
+
+ assert haz_fc_sel.centroids == haz_fc.centroids
+
+ def test_derived_select_single(self, haz_fc, lead_time, member):
+ haz_fc_select = haz_fc.select(member=[3, 0])
+ idx = np.array([0, 3])
+ npt.assert_array_equal(haz_fc_select.event_id, haz_fc.event_id[idx])
+ npt.assert_array_equal(haz_fc_select.member, member[idx])
+ npt.assert_array_equal(haz_fc_select.lead_time, lead_time[idx])
+
+ haz_fc_select = haz_fc.select(lead_time=lead_time[np.array([3, 0])])
+ npt.assert_array_equal(haz_fc_select.event_id, haz_fc.event_id[idx])
+ npt.assert_array_equal(haz_fc_select.member, member[idx])
+ npt.assert_array_equal(haz_fc_select.lead_time, lead_time[idx])
+
+ def test_derived_select_intersections(self, haz_fc, lead_time, member, haz_kwargs):
+ haz_fc_select = haz_fc.select(event_id=[1, 4], member=[0, 1, 2])
+ npt.assert_array_equal(haz_fc_select.event_id, haz_fc.event_id[np.array([0])])
+
+ haz_fc_select = haz_fc.select(
+ event_id=[1, 2, 4], member=[0, 1, 2], lead_time=lead_time[1:3]
+ )
+ npt.assert_array_equal(haz_fc_select.event_id, haz_fc.event_id[np.array([1])])
+
+ # Test "outer"
+ haz_fc2 = HazardForecast(
+ lead_time=lead_time, member=np.zeros_like(member, dtype="int"), **haz_kwargs
+ )
+ haz_fc_select = haz_fc2.select(event_id=[1, 2, 4], member=[0])
+ npt.assert_array_equal(haz_fc_select.event_id, [1, 2, 4])
+ npt.assert_array_equal(haz_fc_select.member, [0, 0, 0])
+
+ def test_derived_select_null(self, haz_fc, haz_kwargs):
+ haz_fc_select = haz_fc.select()
+ assert_hazard_kwargs(haz_fc_select, **haz_kwargs)
+
+ with pytest.raises(IndexError):
+ haz_fc.select(event_id=[-1])
+ with pytest.raises(IndexError):
+ haz_fc.select(member=[-1])
+ with pytest.raises(IndexError):
+ haz_fc.select(
+ lead_time=[np.timedelta64("2", "Y").astype("timedelta64[ns]")]
+ )
+
+
+def test_write_read_hazard_forecast(haz_fc, tmp_path):
+
+ file_name = tmp_path / "test_hazard_forecast.h5"
+
+ haz_fc.write_hdf5(file_name)
+ haz_fc_read = HazardForecast.from_hdf5(file_name)
+
+ assert haz_fc_read.lead_time.dtype.kind == np.dtype("timedelta64").kind
+
+ for key in haz_fc.__dict__.keys():
+ if key in ["intensity", "fraction"]:
+ (haz_fc.__dict__[key] != haz_fc_read.__dict__[key]).nnz == 0
+ else:
+ # npt.assert_array_equal also works for comparing int, float or list
+ npt.assert_array_equal(haz_fc.__dict__[key], haz_fc_read.__dict__[key])
+
+
+@pytest.mark.parametrize("attr", ["min", "mean", "max"])
+def test_hazard_forecast_mean_min_max(haz_fc, attr):
+ """Check mean, min, and max methods for ImpactForecast"""
+ haz_fcst_reduced = getattr(haz_fc, attr)()
+
+ # Assert sparse matrices
+ npt.assert_array_equal(
+ haz_fcst_reduced.intensity.todense(),
+ getattr(haz_fc.intensity.todense(), attr)(axis=0),
+ )
+ npt.assert_array_equal(
+ haz_fcst_reduced.fraction.todense(),
+ getattr(haz_fc.fraction.todense(), attr)(axis=0),
+ )
+
+ # Check that attributes where reduced correctly
+ npt.assert_array_equal(np.isnat(haz_fcst_reduced.lead_time), [True])
+ npt.assert_array_equal(haz_fcst_reduced.member, [-1])
+ npt.assert_array_equal(haz_fcst_reduced.event_name, [attr])
+ npt.assert_array_equal(haz_fcst_reduced.event_id, [0])
+ npt.assert_array_equal(haz_fcst_reduced.frequency, [1])
+ npt.assert_array_equal(haz_fcst_reduced.date, [0])
+ npt.assert_array_equal(haz_fcst_reduced.orig, [True])
+
+
+@pytest.mark.parametrize("quantile", [0.3, 0.6, 0.8])
+def test_hazard_forecast_quantile(haz_fc, quantile):
+ """Check quantile method for HazardForecast"""
+ haz_fcst_quantile = haz_fc.quantile(q=quantile)
+
+ # assert intensity
+ npt.assert_array_equal(
+ haz_fcst_quantile.intensity.toarray().squeeze(),
+ np.quantile(haz_fc.intensity.toarray(), quantile, axis=0),
+ )
+ # assert fraction
+ npt.assert_array_equal(
+ haz_fcst_quantile.fraction.toarray().squeeze(),
+ np.quantile(haz_fc.fraction.toarray(), quantile, axis=0),
+ )
+
+ # check that attributes where reduced correctly
+ npt.assert_array_equal(
+ haz_fcst_quantile.lead_time, np.array([np.timedelta64("NaT")])
+ )
+ npt.assert_array_equal(haz_fcst_quantile.member, np.array([-1]))
+ npt.assert_array_equal(
+ haz_fcst_quantile.event_name, np.array([f"quantile_{quantile}"])
+ )
+ npt.assert_array_equal(haz_fcst_quantile.event_id, np.array([0]))
+ npt.assert_array_equal(haz_fcst_quantile.frequency, np.array([1]))
+ npt.assert_array_equal(haz_fcst_quantile.date, np.array([0]))
+ npt.assert_array_equal(haz_fcst_quantile.orig, np.array([True]))
+
+
+def test_median(haz_fc):
+ haz_fcst_median = haz_fc.median()
+ haz_fcst_quantile = haz_fc.quantile(q=0.5)
+ npt.assert_array_equal(
+ haz_fcst_median.intensity.todense(), haz_fcst_quantile.intensity.todense()
+ )
+ npt.assert_array_equal(
+ haz_fcst_median.intensity.todense(),
+ np.median(haz_fc.intensity.todense(), axis=0),
+ )
+ npt.assert_array_equal(
+ haz_fcst_median.fraction.todense(), haz_fcst_quantile.fraction.todense()
+ )
+ npt.assert_array_equal(
+ haz_fcst_median.fraction.todense(),
+ np.median(haz_fc.fraction.todense(), axis=0),
+ )
+
+ # check that attributes where reduced correctly
+ npt.assert_array_equal(haz_fcst_median.member, np.array([-1]))
+ npt.assert_array_equal(haz_fcst_median.lead_time, np.array([np.timedelta64("NaT")]))
+ npt.assert_array_equal(haz_fcst_median.event_id, np.array([0]))
+ npt.assert_array_equal(haz_fcst_median.event_name, np.array(["median"]))
+ npt.assert_array_equal(haz_fcst_median.frequency, np.array([1]))
+ npt.assert_array_equal(haz_fcst_median.date, np.array([0]))
+ npt.assert_array_equal(haz_fcst_median.orig, np.array([True]))
diff --git a/climada/util/forecast.py b/climada/util/forecast.py
new file mode 100644
index 0000000000..94b9751a8b
--- /dev/null
+++ b/climada/util/forecast.py
@@ -0,0 +1,94 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Define Forecast base class.
+"""
+
+import numpy as np
+
+
+class Forecast:
+ """Mixin class for forecast data.
+
+ Attributes
+ ----------
+ lead_time : np.ndarray
+ Array of forecast lead times, given as timedelta64 objects.
+ Represents the lead times of the forecasts.
+ member : np.ndarray
+ Array of ensemble member identifiers, given as integers.
+ Represents different forecast ensemble members.
+ """
+
+ def __init__(
+ self,
+ lead_time: np.ndarray | None = None,
+ member: np.ndarray | None = None,
+ **kwargs,
+ ):
+ """Initialize Forecast.
+
+ Parameters
+ ----------
+ lead_time : np.ndarray or None, optional
+ Forecast lead times. Default is empty array.
+ member : np.ndarray or None, optional
+ Ensemble member identifiers. Default is empty array.
+ """
+
+ self.lead_time = (
+ np.asarray(lead_time)
+ if lead_time is not None
+ else np.array([], dtype="timedelta64[ns]")
+ )
+ self.member = (
+ np.asarray(member) if member is not None else np.array([], dtype="int")
+ )
+ super().__init__(**kwargs)
+
+ def idx_member(self, member: np.ndarray) -> np.ndarray:
+ """Return boolean array where self.member == member using numpy.isin()
+
+ Parameters
+ ----------
+ member : np.ndarray
+ Array of ensemble members (ints) for which to return an indexer
+
+ Returns
+ -------
+ np.ndarray
+ Boolean array where self.member is in member.
+ """
+
+ return np.isin(self.member, member)
+
+ def idx_lead_time(self, lead_time: np.ndarray) -> np.ndarray:
+ """Return boolean array where self.lead_time == lead_time using numpy.isin()
+
+ Parameters
+ ----------
+ lead_time : np.ndarray
+ Array of lead times (numpy.timedelta64) for which to return an indexer
+
+ Returns
+ -------
+ np.ndarray
+ Boolean array where self.lead_time is in lead_time.
+ """
+
+ return np.isin(self.lead_time, lead_time)
diff --git a/climada/util/test/test_forecast.py b/climada/util/test/test_forecast.py
new file mode 100644
index 0000000000..8f1fcab0e5
--- /dev/null
+++ b/climada/util/test/test_forecast.py
@@ -0,0 +1,95 @@
+"""
+This file is part of CLIMADA.
+
+Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
+
+CLIMADA is free software: you can redistribute it and/or modify it under the
+terms of the GNU General Public License as published by the Free
+Software Foundation, version 3.
+
+CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with CLIMADA. If not, see .
+
+---
+
+Tests for Forecast base class.
+"""
+
+import numpy as np
+import numpy.testing as npt
+import pandas as pd
+
+from climada.util.forecast import Forecast
+
+
+def test_forecast_init():
+ """Test initialization of Forecast class."""
+ forecast = Forecast()
+ npt.assert_array_equal(forecast.lead_time, np.array([]))
+ npt.assert_array_equal(forecast.member, np.array([]))
+
+ forecast = Forecast(member=np.array([1, 2]))
+ npt.assert_array_equal(forecast.member, np.array([1, 2]), strict=True)
+
+ forecast = Forecast(lead_time=np.array([6, 12], dtype="timedelta64[h]"))
+ npt.assert_array_equal(
+ forecast.lead_time, np.array([6, 12], dtype="timedelta64[h]"), strict=True
+ )
+
+ forecast = Forecast(lead_time=np.array([1, 2]), member=[3, 4])
+ npt.assert_array_equal(forecast.lead_time, np.array([1, 2]), strict=True)
+ npt.assert_array_equal(forecast.member, np.array([3, 4]), strict=True)
+ assert isinstance(forecast.member, np.ndarray)
+
+ # Test with datetime64 including seconds
+ lead_times_seconds = pd.timedelta_range(start="1 day", periods=4).to_numpy()
+ forecast = Forecast(lead_time=lead_times_seconds, member=[1, 2, 3])
+ npt.assert_array_equal(forecast.lead_time, lead_times_seconds, strict=True)
+ assert forecast.lead_time.dtype == np.dtype("timedelta64[ns]")
+
+
+def test_idx_member():
+ """Test idx_member method of Forecast class."""
+ forecast = Forecast(member=np.array([1, 2, 3, 4]))
+
+ idx = forecast.idx_member(1)
+ npt.assert_array_equal(idx, np.array([True, False, False, False]), strict=True)
+
+ idx = forecast.idx_member(np.array([2, 4]))
+ npt.assert_array_equal(idx, np.array([False, True, False, True]), strict=True)
+
+ idx = forecast.idx_member([2, 4])
+ npt.assert_array_equal(idx, np.array([False, True, False, True]), strict=True)
+
+ idx = forecast.idx_member(None)
+ npt.assert_array_equal(idx, np.array([False, False, False, False]), strict=True)
+
+ # Try once with inconsitent types
+ forecast = Forecast(member=np.array(["1", -2, np.nan]))
+ npt.assert_array_equal(
+ forecast.idx_member([np.nan, "1"]), np.array([True, False, True]), strict=True
+ )
+
+
+def test_idx_lead_time():
+ """Test idx_lead_time method of Forecast class."""
+ forecast = Forecast(
+ lead_time=pd.timedelta_range(start="1 day", periods=4).to_numpy()
+ )
+
+ idx = forecast.idx_lead_time(
+ pd.timedelta_range(start="1 day", periods=4).to_numpy()[::2]
+ )
+ npt.assert_array_equal(idx, np.array([True, False, True, False]), strict=True)
+
+ idx = forecast.idx_lead_time(
+ pd.timedelta_range(start="1 day", periods=4).to_numpy()[0]
+ )
+ npt.assert_array_equal(idx, np.array([True, False, False, False]), strict=True)
+
+ idx = forecast.idx_lead_time(None)
+ npt.assert_array_equal(idx, np.array([False, False, False, False]), strict=True)