From 942c520ce8f090b1a423612e5f57a1b322101bed Mon Sep 17 00:00:00 2001 From: robertvava Date: Wed, 20 Aug 2025 16:35:42 +0100 Subject: [PATCH 1/6] fixed mypy issues --- src/svdrom/pod.py | 202 +++++++++++++++++++++++++++++++++++++++++++++- src/svdrom/svd.py | 14 +++- tests/test_pod.py | 147 +++++++++++++++++++++++++++++++++ 3 files changed, 361 insertions(+), 2 deletions(-) create mode 100644 tests/test_pod.py diff --git a/src/svdrom/pod.py b/src/svdrom/pod.py index 9de5e00..6a88c42 100644 --- a/src/svdrom/pod.py +++ b/src/svdrom/pod.py @@ -1 +1,201 @@ -# Module for POD implementation +# src/svdrom/pod.py + +import numpy as np +import xarray as xr + +from svdrom.logger import setup_logger +from svdrom.svd import TruncatedSVD + +logger = setup_logger("POD", "pod.log") + +# LSV - modes +# RSV - Time coefficients # the POD modes +# return SV squared / energy +# flag for remove mean along temporal dimension in fit or class init? +# important to remove time average + +# The shape of the array matters when you feed it to SVD. +# Whether you have spatial dim along rows or columns + +# if you call modes, you're essentially returning the RSV +# time coefficients you return LSV +# The algo should figure out which is the spatioal and temporal dims! + + +# Inherit TruncatedSVD maybe! +class POD: + """ + Proper Orthogonal Decomposition (POD) + + Uses SVD to compute the POD modes (spatial modes), + temporal coefficients, and modal energies of a given dataset. + + Parameters + ---------- + n_modes : int + The number of POD modes to compute and keep. This corresponds to + the number of components in the truncated SVD. + algorithm : str, {'tsqr', 'randomized'}, (default 'tsqr') + SVD algorithm to use via the `TruncatedSVD` class. + See `svdrom.svd.TruncatedSVD` for more details. + **svd_kwargs : dict + Additional keyword arguments to be passed to the `TruncatedSVD` + constructor (e.g., `compute_u`, `compute_v`, `rechunk`). + """ + + def __init__(self, n_modes: int, algorithm: str = "tsqr"): + self._n_modes = n_modes + self._algorithm = algorithm + self._svd_solver = TruncatedSVD(n_components=n_modes, algorithm=algorithm) + self._modes: xr.DataArray | None = None + self._coeffs: xr.DataArray | None = None + self._energy: np.ndarray | None = None + self._s: np.ndarray | None = None + self._mean_field: xr.DataArray | None = None + self._n_snapshots: int | None = None + + @property + def n_modes(self) -> int: + return self._n_modes + + @property + def algorithm(self) -> str: + return self._algorithm + + @property + def modes(self) -> xr.DataArray | None: + return self._modes + + @property + def coeffs(self) -> xr.DataArray | None: + return self._coeffs + + @property + def energy(self) -> np.ndarray | None: + return self._energy + + @property + def s(self) -> np.ndarray | None: + return self._s + + @property + def mean_field(self) -> xr.DataArray | None: + return self._mean_field + + @property + def n_snapshots(self) -> int | None: + return self._n_snapshots + + def fit(self, X: xr.DataArray, dim: str, **svd_kwargs) -> None: + """ + X : xr.DataArray, shape (n_samples, n_features) + The input data. + dim : str + The name of the dimension in `X` that represents the snapshots. + """ + if dim not in X.dims: + msg = ( + f"Dimension '{dim}' not present in the input array with dims {X.dims}." + ) + logger.exception(msg) + raise ValueError(msg) + + self._n_snapshots = X.sizes[dim] + logger.info(f"Computing mean field along dimension '{dim}'...") + # Remove mean is turned on by default - user warning + self._mean_field = X.mean( + dim=dim + ).compute() # We could just pass mean; could use StandardScaler instead + X_fluc = X - self._mean_field + + D = X_fluc / np.sqrt(self._n_snapshots) + logger.info(f"Calling TruncatedSVD with {self.n_modes} modes...") + self._svd_solver.fit(D, **svd_kwargs) + + assert self._svd_solver.s is not None + + self._s = self._svd_solver.s + self._energy = self._s**2 + + assert self._svd_solver.u is not None + assert self._svd_solver.v is not None + + u = self._svd_solver.u + v = self._svd_solver.v + + self._modes = v.T.rename({"components": "modes"}) + self._coeffs = (u * self._s).rename({"components": "modes"}) + + def transform(self, X: xr.DataArray) -> xr.DataArray: + """Project new data onto the computed POD basis.""" + if self.modes is None or self.mean_field is None or self.n_snapshots is None: + msg = "The fit() method must be called before calling transform." + logger.exception(msg) + raise RuntimeError(msg) + + X_fluc = X - self.mean_field + coeffs_data = (X_fluc.data @ self.modes.data) / np.sqrt(self.n_snapshots) + coeffs_computed = coeffs_data.compute() + + time_dim_name = X.dims[0] + new_coords = { + time_dim_name: X.coords[time_dim_name], + "modes": self.modes.coords["modes"], + } + + return xr.DataArray( + coeffs_computed, + dims=[time_dim_name, "modes"], + coords=new_coords, + name="transformed_coeffs", + ) + + def reconstruct(self, n_modes: int | None = None) -> xr.DataArray: + """Reconstruct the full data field using a specified number of modes.""" + if ( + self.coeffs is None + or self.mean_field is None + or self.modes is None + or self.n_snapshots is None + ): + msg = "The fit() method must be called before reconstruction." + logger.exception(msg) + raise RuntimeError(msg) + + if n_modes is None: + n_modes = self._n_modes + elif n_modes > self._n_modes: + logger.warning( + f"Requested {n_modes} modes, but only {self._n_modes} are " + f"available. Using all {self._n_modes} modes." + ) + n_modes = self._n_modes + + logger.info(f"Reconstructing field using the first {n_modes} modes...") + coeffs_subset = self.coeffs.sel(modes=slice(0, n_modes)) + + X_fluc_recon_data = (coeffs_subset.data @ self.modes.data.T) * np.sqrt( + self.n_snapshots + ) + time_dim, _ = coeffs_subset.dims + space_dim, _ = self.modes.dims + recon_coords = { + time_dim: coeffs_subset.coords[time_dim], + space_dim: self.modes.coords[space_dim], + } + X_fluc_recon = xr.DataArray( + X_fluc_recon_data, + dims=[time_dim, space_dim], + coords=recon_coords, + ) + + X_recon_data = X_fluc_recon.data + self.mean_field.data + X_recon_computed = X_recon_data + logger.info("Reconstruction complete.") + + return xr.DataArray( + X_recon_computed, + dims=X_fluc_recon.dims, + coords=X_fluc_recon.coords, + name="reconstructed_field", + ) diff --git a/src/svdrom/svd.py b/src/svdrom/svd.py index 9716c33..3771342 100644 --- a/src/svdrom/svd.py +++ b/src/svdrom/svd.py @@ -204,7 +204,19 @@ def _singular_vectors_to_dataarray( ) logger.exception(msg) raise ValueError(msg) - return xr.DataArray(singular_vectors, dims=new_dims, coords=coords, name=name) + + new_coords = {} + for coord_name, coord_array in X.coords.items(): + if all(dim in new_dims for dim in coord_array.dims): + new_coords[coord_name] = coord_array + + if "components" in new_dims: + comp_idx = new_dims.index("components") + new_coords["components"] = np.arange(singular_vectors.shape[comp_idx]) + + return xr.DataArray( + singular_vectors, dims=new_dims, coords=new_coords, name=name + ) def fit( self, diff --git a/tests/test_pod.py b/tests/test_pod.py new file mode 100644 index 0000000..743e818 --- /dev/null +++ b/tests/test_pod.py @@ -0,0 +1,147 @@ +import dask.array as da +import numpy as np +import pytest +import xarray as xr +from test_svd import make_dataarray + +from svdrom.pod import POD + + +@pytest.fixture() +def pod_data(): + """ + Provides an xarray.DataArray with a clear low-rank structure, + making it suitable for testing reconstruction. + """ + n_snapshots = 200 + n_points = 1000 + n_rank = 10 + + modes_np = np.random.randn(n_points, n_rank).astype("float32") + coeffs_np = np.sin(np.linspace(0, 4 * np.pi, n_snapshots))[:, None] * np.cos( + np.linspace(0, np.pi, n_rank) + )[None, :].astype("float32") + low_rank_data = coeffs_np @ modes_np.T + + noise = (np.random.rand(n_snapshots, n_points).astype("float32") - 0.5) * 0.1 + data = low_rank_data + noise + + return xr.DataArray( + data, + dims=["time", "space"], + coords={"time": np.arange(n_snapshots), "space": np.arange(n_points)}, + ) + + +def test_pod_initialization(): + """Test that the POD class initializes correctly.""" + pod = POD(n_modes=10, algorithm="randomized") + assert pod.n_modes == 10 + assert pod.algorithm == "randomized" + assert pod.modes is None + assert pod.coeffs is None + assert pod.energy is None + assert pod.mean_field is None + assert pod.s is None + + +@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat"]) +@pytest.mark.parametrize("algorithm", ["tsqr", "randomized"]) +def test_pod_fit_attributes_and_shapes(matrix_type, algorithm): + """Test the types, shapes, and metadata of POD attributes after fitting.""" + X = make_dataarray(matrix_type) + n_samples, n_features = X.shape + time_dim, space_dim = X.dims + n_modes = 15 + + pod = POD(n_modes, algorithm=algorithm) + pod.fit(X, dim=time_dim) + + assert isinstance(pod.mean_field, xr.DataArray) + assert isinstance(pod.modes, xr.DataArray) + assert isinstance(pod.coeffs, xr.DataArray) + assert isinstance(pod.energy, np.ndarray) + assert isinstance(pod.s, np.ndarray) + + assert pod.mean_field.shape == (n_features,) + assert pod.modes.shape == (n_features, n_modes) + assert pod.coeffs.shape == (n_samples, n_modes) + assert pod.energy.shape == (n_modes,) + assert pod.s.shape == (n_modes,) + + assert pod.coeffs.dims == (time_dim, "modes") + assert pod.modes.dims == (space_dim, "modes") + assert "modes" in pod.coeffs.coords and time_dim in pod.coeffs.coords + assert "modes" in pod.modes.coords and space_dim in pod.modes.coords + + +@pytest.mark.parametrize("algorithm", ["tsqr", "randomized"]) +def test_pod_calculation_correctness(pod_data, algorithm): + """Verify the POD results against the reference NumPy implementation.""" + X_xr = pod_data.chunk({"time": -1, "space": "auto"}) + X_np = X_xr.values + n_snapshots, n_features = X_np.shape + n_modes = 20 + + X_mean_np = np.mean(X_np, axis=0) + X_fluc_np = X_np - X_mean_np + X_scaled_np = X_fluc_np / np.sqrt(n_snapshots) + u_np, s_np, vh_np = np.linalg.svd(X_scaled_np, full_matrices=False) + + modes_np = vh_np.T[:, :n_modes] + coeffs_np = u_np[:, :n_modes] * s_np[:n_modes] + + svd_kwargs = {} + if algorithm == "randomized": + svd_kwargs["n_power_iter"] = 8 + + pod = POD(n_modes, algorithm=algorithm) + pod.fit(X_xr, dim="time") + + rtol = 1e-3 + + recon_fluc_pod = pod.coeffs.values @ pod.modes.values.T + recon_fluc_np = coeffs_np @ modes_np.T + + assert np.allclose( + recon_fluc_pod, recon_fluc_np, rtol=rtol + ), "Reconstructed fluctuations from POD do not match the reference NumPy calculation." + + +def test_pod_reconstruction_and_transform(pod_data): + """Test data reconstruction and transformation onto the POD basis.""" + X = pod_data.chunk({"time": -1, "space": "auto"}) + n_modes = 15 + pod = POD(n_modes, algorithm="randomized") + pod.fit(X, dim="time") + + coeffs_transformed = pod.transform(X) + assert isinstance( + coeffs_transformed, xr.DataArray + ), "Transform result should be a DataArray" + assert ( + coeffs_transformed.shape == pod.coeffs.shape + ), "Shape of transformed coeffs should match fitted coeffs" + + assert np.allclose( + np.abs(coeffs_transformed.values), np.abs(pod.coeffs.values), rtol=1e-3 + ), "Transformed coefficients do not match fitted coefficients within tolerance." + + X_reconstructed = pod.reconstruct(n_modes=n_modes) + assert isinstance( + X_reconstructed, xr.DataArray + ), "Reconstruction result should be a DataArray" + + assert ( + X_reconstructed.dims == X.dims + ), f"Expected dims {X.dims} but got {X_reconstructed.dims}" + assert ( + X_reconstructed.shape == X.shape + ), "Shape of reconstructed data should match original" + assert list(X_reconstructed.coords.keys()) == list( + X.coords.keys() + ), "Coordinates should match original" + + error = da.linalg.norm(X.data - X_reconstructed.data) / da.linalg.norm(X.data) + computed_error = error.compute() + assert computed_error < 0.1, f"Reconstruction error {computed_error} was too high." From b4ed3c2657dc35c139c81838101ada271ae3b9b8 Mon Sep 17 00:00:00 2001 From: David Salvador Jasin Date: Fri, 22 Aug 2025 18:26:34 +0100 Subject: [PATCH 2/6] POD class inheriting from TruncatedSVD --- src/svdrom/pod_inhereted.py | 97 +++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 src/svdrom/pod_inhereted.py diff --git a/src/svdrom/pod_inhereted.py b/src/svdrom/pod_inhereted.py new file mode 100644 index 0000000..8bccd6b --- /dev/null +++ b/src/svdrom/pod_inhereted.py @@ -0,0 +1,97 @@ +import dask.array as da +import numpy as np +import xarray as xr + +from svdrom.logger import setup_logger +from svdrom.preprocessing import StandardScaler +from svdrom.svd import TruncatedSVD + +logger = setup_logger("POD", "pod.log") + + +class POD(TruncatedSVD): + def __init__( + self, + n_components: int, + algorithm: str = "tsqr", + compute_modes: bool = True, + compute_time_coeffs: bool = True, + compute_energy_ratio: bool = False, + rechunk: bool = False, + remove_mean: bool = False, + time_dimension: str = "time", + ): + super().__init__( + n_components=n_components, + algorithm=algorithm, + compute_u=compute_modes, + compute_v=compute_time_coeffs, + compute_var_ratio=compute_energy_ratio, + rechunk=rechunk, + ) + + self._energy: np.ndarray | None = None + self._remove_mean: bool = remove_mean + self._time_dim: str = time_dimension + + @property + def modes(self) -> xr.DataArray | None: + """POD (spatial) modes (read-only).""" + return super().u + + @property + def time_coeffs(self) -> xr.DataArray | None: + """Time coefficients (read-only).""" + return super().v + + @property + def energy(self) -> np.ndarray | None: + """Energy (variance) explained by each POD mode (read-only).""" + if self._s is not None and self._u is not None: + return self._s**2 / self._u.shape[0] + return None + + @property + def explained_energy_ratio(self) -> np.ndarray | da.Array | None: + return super().explained_var_ratio + + def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray: + """Transpose the array if the user-specified time dimension + is not along the columns. Remove the temporal average if + requested by the user. + """ + dims = X.dims + if dims.index(self._time_dim) != 1: + X = X.T + if self._remove_mean: + scaler = StandardScaler() + X = scaler( + X, + dim=self._time_dim, + with_std=False, + ) + assert isinstance(X, xr.DataArray), "Expected DataArray after scaling." + return X + + def fit( + self, + X: xr.DataArray, + **kwargs, + ) -> None: + if self._time_dim not in X.dims: + msg = ( + f"Specified time dimension '{self._time_dim}' " + "is not a dimension of the input array." + ) + raise ValueError(msg) + X = self._preprocess_array(X) + super().fit(X, **kwargs) + + def compute_modes(self) -> None: + super().compute_u() + + def compute_time_coeffs(self) -> None: + super().compute_v() + + def compute_energy_ratio(self) -> None: + super().compute_var_ratio() From 3116f0ea75746d29e890c4900b2811366ee107d2 Mon Sep 17 00:00:00 2001 From: David Salvador Jasin Date: Sat, 23 Aug 2025 18:39:09 +0100 Subject: [PATCH 3/6] Rename parameter from n_components to n_modes in POD class constructor --- src/svdrom/pod_inhereted.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/svdrom/pod_inhereted.py b/src/svdrom/pod_inhereted.py index 8bccd6b..d8e7a76 100644 --- a/src/svdrom/pod_inhereted.py +++ b/src/svdrom/pod_inhereted.py @@ -12,7 +12,7 @@ class POD(TruncatedSVD): def __init__( self, - n_components: int, + n_modes: int, algorithm: str = "tsqr", compute_modes: bool = True, compute_time_coeffs: bool = True, @@ -22,7 +22,7 @@ def __init__( time_dimension: str = "time", ): super().__init__( - n_components=n_components, + n_components=n_modes, algorithm=algorithm, compute_u=compute_modes, compute_v=compute_time_coeffs, From d4cb6f4f84cc6eac51547842162bb38dd42e255f Mon Sep 17 00:00:00 2001 From: David Salvador Jasin Date: Sat, 23 Aug 2025 18:41:13 +0100 Subject: [PATCH 4/6] Rename algorithm parameter to svd_algorithm in POD class constructor --- src/svdrom/pod_inhereted.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/svdrom/pod_inhereted.py b/src/svdrom/pod_inhereted.py index d8e7a76..fe6c60a 100644 --- a/src/svdrom/pod_inhereted.py +++ b/src/svdrom/pod_inhereted.py @@ -13,7 +13,7 @@ class POD(TruncatedSVD): def __init__( self, n_modes: int, - algorithm: str = "tsqr", + svd_algorithm: str = "tsqr", compute_modes: bool = True, compute_time_coeffs: bool = True, compute_energy_ratio: bool = False, @@ -23,7 +23,7 @@ def __init__( ): super().__init__( n_components=n_modes, - algorithm=algorithm, + algorithm=svd_algorithm, compute_u=compute_modes, compute_v=compute_time_coeffs, compute_var_ratio=compute_energy_ratio, From ad586f833c78af1fa3617374613d8eb065cafa2d Mon Sep 17 00:00:00 2001 From: David Salvador Jasin Date: Sat, 23 Aug 2025 18:43:28 +0100 Subject: [PATCH 5/6] Refactor POD class properties to use instance variables instead of super() calls --- src/svdrom/pod_inhereted.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/svdrom/pod_inhereted.py b/src/svdrom/pod_inhereted.py index fe6c60a..a43046e 100644 --- a/src/svdrom/pod_inhereted.py +++ b/src/svdrom/pod_inhereted.py @@ -37,12 +37,12 @@ def __init__( @property def modes(self) -> xr.DataArray | None: """POD (spatial) modes (read-only).""" - return super().u + return self._u @property def time_coeffs(self) -> xr.DataArray | None: """Time coefficients (read-only).""" - return super().v + return self._v @property def energy(self) -> np.ndarray | None: @@ -53,7 +53,7 @@ def energy(self) -> np.ndarray | None: @property def explained_energy_ratio(self) -> np.ndarray | da.Array | None: - return super().explained_var_ratio + return self._explained_var_ratio def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray: """Transpose the array if the user-specified time dimension From 861a35cd2f187a88d84b1360390fe4fb79742024 Mon Sep 17 00:00:00 2001 From: robertvava Date: Wed, 8 Oct 2025 15:01:46 +0100 Subject: [PATCH 6/6] linting fix commit --- src/svdrom/pod.py | 241 +++++++++--------------------- src/svdrom/pod_inhereted.py | 97 ------------ src/svdrom/svd.py | 2 + tests/test_pod.py | 284 +++++++++++++++++++++--------------- tests/test_svd.py | 66 +++++++++ 5 files changed, 306 insertions(+), 384 deletions(-) delete mode 100644 src/svdrom/pod_inhereted.py diff --git a/src/svdrom/pod.py b/src/svdrom/pod.py index 6a88c42..09373c0 100644 --- a/src/svdrom/pod.py +++ b/src/svdrom/pod.py @@ -1,201 +1,98 @@ -# src/svdrom/pod.py - +import dask.array as da import numpy as np import xarray as xr from svdrom.logger import setup_logger +from svdrom.preprocessing import StandardScaler from svdrom.svd import TruncatedSVD logger = setup_logger("POD", "pod.log") -# LSV - modes -# RSV - Time coefficients # the POD modes -# return SV squared / energy -# flag for remove mean along temporal dimension in fit or class init? -# important to remove time average - -# The shape of the array matters when you feed it to SVD. -# Whether you have spatial dim along rows or columns - -# if you call modes, you're essentially returning the RSV -# time coefficients you return LSV -# The algo should figure out which is the spatioal and temporal dims! - - -# Inherit TruncatedSVD maybe! -class POD: - """ - Proper Orthogonal Decomposition (POD) - Uses SVD to compute the POD modes (spatial modes), - temporal coefficients, and modal energies of a given dataset. - - Parameters - ---------- - n_modes : int - The number of POD modes to compute and keep. This corresponds to - the number of components in the truncated SVD. - algorithm : str, {'tsqr', 'randomized'}, (default 'tsqr') - SVD algorithm to use via the `TruncatedSVD` class. - See `svdrom.svd.TruncatedSVD` for more details. - **svd_kwargs : dict - Additional keyword arguments to be passed to the `TruncatedSVD` - constructor (e.g., `compute_u`, `compute_v`, `rechunk`). - """ +# only reconstructs +class POD(TruncatedSVD): + def __init__( + self, + n_modes: int, + svd_algorithm: str = "tsqr", + compute_modes: bool = True, + compute_time_coeffs: bool = True, + compute_energy_ratio: bool = False, + rechunk: bool = False, + remove_mean: bool = False, + time_dimension: str = "time", + ): + super().__init__( + n_components=n_modes, + algorithm=svd_algorithm, + compute_u=compute_modes, + compute_v=compute_time_coeffs, + compute_var_ratio=compute_energy_ratio, + rechunk=rechunk, + ) - def __init__(self, n_modes: int, algorithm: str = "tsqr"): - self._n_modes = n_modes - self._algorithm = algorithm - self._svd_solver = TruncatedSVD(n_components=n_modes, algorithm=algorithm) - self._modes: xr.DataArray | None = None - self._coeffs: xr.DataArray | None = None self._energy: np.ndarray | None = None - self._s: np.ndarray | None = None - self._mean_field: xr.DataArray | None = None - self._n_snapshots: int | None = None - - @property - def n_modes(self) -> int: - return self._n_modes - - @property - def algorithm(self) -> str: - return self._algorithm + self._remove_mean: bool = remove_mean + self._time_dim: str = time_dimension @property def modes(self) -> xr.DataArray | None: - return self._modes + """POD (spatial) modes (read-only).""" + return self._u @property - def coeffs(self) -> xr.DataArray | None: - return self._coeffs + def time_coeffs(self) -> xr.DataArray | None: + """Time coefficients (read-only).""" + return self._v @property def energy(self) -> np.ndarray | None: - return self._energy + """Energy (variance) explained by each POD mode (read-only).""" + if self._s is not None and self._u is not None: + return self._s**2 / self._u.shape[0] + return None @property - def s(self) -> np.ndarray | None: - return self._s + def explained_energy_ratio(self) -> np.ndarray | da.Array | None: + return self._explained_var_ratio - @property - def mean_field(self) -> xr.DataArray | None: - return self._mean_field - - @property - def n_snapshots(self) -> int | None: - return self._n_snapshots - - def fit(self, X: xr.DataArray, dim: str, **svd_kwargs) -> None: - """ - X : xr.DataArray, shape (n_samples, n_features) - The input data. - dim : str - The name of the dimension in `X` that represents the snapshots. + def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray: + """Transpose the array if the user-specified time dimension + is not along the columns. Remove the temporal average if + requested by the user. """ - if dim not in X.dims: + dims = X.dims + if dims.index(self._time_dim) != 1: + X = X.T + if self._remove_mean: + scaler = StandardScaler() + X = scaler( + X, + dim=self._time_dim, + with_std=False, + ) + assert isinstance(X, xr.DataArray), "Expected DataArray after scaling." + return X + + def fit( + self, + X: xr.DataArray, + **kwargs, + ) -> None: + if self._time_dim not in X.dims: msg = ( - f"Dimension '{dim}' not present in the input array with dims {X.dims}." + f"Specified time dimension '{self._time_dim}' " + "is not a dimension of the input array." ) - logger.exception(msg) raise ValueError(msg) + X = self._preprocess_array(X) + super().fit(X, **kwargs) - self._n_snapshots = X.sizes[dim] - logger.info(f"Computing mean field along dimension '{dim}'...") - # Remove mean is turned on by default - user warning - self._mean_field = X.mean( - dim=dim - ).compute() # We could just pass mean; could use StandardScaler instead - X_fluc = X - self._mean_field - - D = X_fluc / np.sqrt(self._n_snapshots) - logger.info(f"Calling TruncatedSVD with {self.n_modes} modes...") - self._svd_solver.fit(D, **svd_kwargs) + def compute_modes(self) -> None: + super().compute_u() - assert self._svd_solver.s is not None + def compute_time_coeffs(self) -> None: + super().compute_v() - self._s = self._svd_solver.s - self._energy = self._s**2 - - assert self._svd_solver.u is not None - assert self._svd_solver.v is not None - - u = self._svd_solver.u - v = self._svd_solver.v - - self._modes = v.T.rename({"components": "modes"}) - self._coeffs = (u * self._s).rename({"components": "modes"}) - - def transform(self, X: xr.DataArray) -> xr.DataArray: - """Project new data onto the computed POD basis.""" - if self.modes is None or self.mean_field is None or self.n_snapshots is None: - msg = "The fit() method must be called before calling transform." - logger.exception(msg) - raise RuntimeError(msg) - - X_fluc = X - self.mean_field - coeffs_data = (X_fluc.data @ self.modes.data) / np.sqrt(self.n_snapshots) - coeffs_computed = coeffs_data.compute() - - time_dim_name = X.dims[0] - new_coords = { - time_dim_name: X.coords[time_dim_name], - "modes": self.modes.coords["modes"], - } - - return xr.DataArray( - coeffs_computed, - dims=[time_dim_name, "modes"], - coords=new_coords, - name="transformed_coeffs", - ) - - def reconstruct(self, n_modes: int | None = None) -> xr.DataArray: - """Reconstruct the full data field using a specified number of modes.""" - if ( - self.coeffs is None - or self.mean_field is None - or self.modes is None - or self.n_snapshots is None - ): - msg = "The fit() method must be called before reconstruction." - logger.exception(msg) - raise RuntimeError(msg) - - if n_modes is None: - n_modes = self._n_modes - elif n_modes > self._n_modes: - logger.warning( - f"Requested {n_modes} modes, but only {self._n_modes} are " - f"available. Using all {self._n_modes} modes." - ) - n_modes = self._n_modes - - logger.info(f"Reconstructing field using the first {n_modes} modes...") - coeffs_subset = self.coeffs.sel(modes=slice(0, n_modes)) - - X_fluc_recon_data = (coeffs_subset.data @ self.modes.data.T) * np.sqrt( - self.n_snapshots - ) - time_dim, _ = coeffs_subset.dims - space_dim, _ = self.modes.dims - recon_coords = { - time_dim: coeffs_subset.coords[time_dim], - space_dim: self.modes.coords[space_dim], - } - X_fluc_recon = xr.DataArray( - X_fluc_recon_data, - dims=[time_dim, space_dim], - coords=recon_coords, - ) - - X_recon_data = X_fluc_recon.data + self.mean_field.data - X_recon_computed = X_recon_data - logger.info("Reconstruction complete.") - - return xr.DataArray( - X_recon_computed, - dims=X_fluc_recon.dims, - coords=X_fluc_recon.coords, - name="reconstructed_field", - ) + def compute_energy_ratio(self) -> None: + super().compute_var_ratio() diff --git a/src/svdrom/pod_inhereted.py b/src/svdrom/pod_inhereted.py deleted file mode 100644 index a43046e..0000000 --- a/src/svdrom/pod_inhereted.py +++ /dev/null @@ -1,97 +0,0 @@ -import dask.array as da -import numpy as np -import xarray as xr - -from svdrom.logger import setup_logger -from svdrom.preprocessing import StandardScaler -from svdrom.svd import TruncatedSVD - -logger = setup_logger("POD", "pod.log") - - -class POD(TruncatedSVD): - def __init__( - self, - n_modes: int, - svd_algorithm: str = "tsqr", - compute_modes: bool = True, - compute_time_coeffs: bool = True, - compute_energy_ratio: bool = False, - rechunk: bool = False, - remove_mean: bool = False, - time_dimension: str = "time", - ): - super().__init__( - n_components=n_modes, - algorithm=svd_algorithm, - compute_u=compute_modes, - compute_v=compute_time_coeffs, - compute_var_ratio=compute_energy_ratio, - rechunk=rechunk, - ) - - self._energy: np.ndarray | None = None - self._remove_mean: bool = remove_mean - self._time_dim: str = time_dimension - - @property - def modes(self) -> xr.DataArray | None: - """POD (spatial) modes (read-only).""" - return self._u - - @property - def time_coeffs(self) -> xr.DataArray | None: - """Time coefficients (read-only).""" - return self._v - - @property - def energy(self) -> np.ndarray | None: - """Energy (variance) explained by each POD mode (read-only).""" - if self._s is not None and self._u is not None: - return self._s**2 / self._u.shape[0] - return None - - @property - def explained_energy_ratio(self) -> np.ndarray | da.Array | None: - return self._explained_var_ratio - - def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray: - """Transpose the array if the user-specified time dimension - is not along the columns. Remove the temporal average if - requested by the user. - """ - dims = X.dims - if dims.index(self._time_dim) != 1: - X = X.T - if self._remove_mean: - scaler = StandardScaler() - X = scaler( - X, - dim=self._time_dim, - with_std=False, - ) - assert isinstance(X, xr.DataArray), "Expected DataArray after scaling." - return X - - def fit( - self, - X: xr.DataArray, - **kwargs, - ) -> None: - if self._time_dim not in X.dims: - msg = ( - f"Specified time dimension '{self._time_dim}' " - "is not a dimension of the input array." - ) - raise ValueError(msg) - X = self._preprocess_array(X) - super().fit(X, **kwargs) - - def compute_modes(self) -> None: - super().compute_u() - - def compute_time_coeffs(self) -> None: - super().compute_v() - - def compute_energy_ratio(self) -> None: - super().compute_var_ratio() diff --git a/src/svdrom/svd.py b/src/svdrom/svd.py index 3771342..37cc4ad 100644 --- a/src/svdrom/svd.py +++ b/src/svdrom/svd.py @@ -94,6 +94,8 @@ def __init__( self._s: np.ndarray | None = None self._v: xr.DataArray | None = None self._explained_var_ratio: np.ndarray | da.Array | None = None + self._original_dims: tuple | None = None + self._original_coords: dict | None = None @property def n_components(self) -> int: diff --git a/tests/test_pod.py b/tests/test_pod.py index 743e818..157a42d 100644 --- a/tests/test_pod.py +++ b/tests/test_pod.py @@ -2,146 +2,200 @@ import numpy as np import pytest import xarray as xr -from test_svd import make_dataarray from svdrom.pod import POD -@pytest.fixture() -def pod_data(): - """ - Provides an xarray.DataArray with a clear low-rank structure, - making it suitable for testing reconstruction. +def make_dataarray(matrix_type: str, time_dim_pos: int = 1) -> xr.DataArray: + """Make a Dask-backed DataArray with random data of + specified matrix type. The matrix type can be one of: + - "tall-and-skinny": More samples than features. + - "short-and-fat": More features than samples. + - "square": Equal number of samples and features. + """ - n_snapshots = 200 - n_points = 1000 - n_rank = 10 - - modes_np = np.random.randn(n_points, n_rank).astype("float32") - coeffs_np = np.sin(np.linspace(0, 4 * np.pi, n_snapshots))[:, None] * np.cos( - np.linspace(0, np.pi, n_rank) - )[None, :].astype("float32") - low_rank_data = coeffs_np @ modes_np.T - - noise = (np.random.rand(n_snapshots, n_points).astype("float32") - 0.5) * 0.1 - data = low_rank_data + noise - - return xr.DataArray( - data, - dims=["time", "space"], - coords={"time": np.arange(n_snapshots), "space": np.arange(n_points)}, + if matrix_type == "tall-and-skinny": + n_space, n_time = 10_000, 100 + space_chunks, time_chunks = -1, int(n_time / 2) + elif matrix_type == "short-and-fat": + n_space, n_time = 100, 10_000 + space_chunks, time_chunks = int(n_space / 2), -1 + elif matrix_type == "square": + n_space, n_time = 1_000, 1_000 + space_chunks, time_chunks = int(n_space / 2), int(n_time / 2) + else: + msg = ( + "Matrix type not supported. " + "Must be one of: tall-and-skinny, short-and-fat, square." + ) + raise ValueError(msg) + + if time_dim_pos == 1: + shape = (n_space, n_time) + chunks = (space_chunks, time_chunks) + dims = ["space", "time"] + coords = {"space": np.arange(n_space), "time": np.arange(n_time)} + elif time_dim_pos == 0: + shape = (n_time, n_space) + chunks = (time_chunks, space_chunks) + dims = ["time", "space"] + coords = {"time": np.arange(n_time), "space": np.arange(n_space)} + else: + raise ValueError("time_dim_pos must be 0 or 1.") + + X = da.random.random(shape, chunks=chunks).astype("float32") + return xr.DataArray(X, dims=dims, coords=coords) + + +@pytest.mark.parametrize("svd_algorithm", ["tsqr", "randomized"]) +def test_pod_basic_attributes_and_aliases(svd_algorithm): + """Test basic functionality and property aliases of POD.""" + n_modes = 10 + pod = POD( + n_modes=n_modes, + svd_algorithm=svd_algorithm, + compute_energy_ratio=True, ) + expected_attrs = ( + "modes", + "time_coeffs", + "energy", + "explained_energy_ratio", + ) + for attr in expected_attrs: + assert hasattr(pod, attr), f"POD should have attribute '{attr}'." -def test_pod_initialization(): - """Test that the POD class initializes correctly.""" - pod = POD(n_modes=10, algorithm="randomized") - assert pod.n_modes == 10 - assert pod.algorithm == "randomized" - assert pod.modes is None - assert pod.coeffs is None - assert pod.energy is None - assert pod.mean_field is None - assert pod.s is None - - -@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat"]) -@pytest.mark.parametrize("algorithm", ["tsqr", "randomized"]) -def test_pod_fit_attributes_and_shapes(matrix_type, algorithm): - """Test the types, shapes, and metadata of POD attributes after fitting.""" - X = make_dataarray(matrix_type) - n_samples, n_features = X.shape - time_dim, space_dim = X.dims - n_modes = 15 - - pod = POD(n_modes, algorithm=algorithm) - pod.fit(X, dim=time_dim) + X = make_dataarray("tall-and-skinny") + pod.fit(X) - assert isinstance(pod.mean_field, xr.DataArray) assert isinstance(pod.modes, xr.DataArray) - assert isinstance(pod.coeffs, xr.DataArray) + assert isinstance(pod.modes.data, np.ndarray) + assert isinstance(pod.time_coeffs, xr.DataArray) + assert isinstance(pod.time_coeffs.data, np.ndarray) assert isinstance(pod.energy, np.ndarray) - assert isinstance(pod.s, np.ndarray) + assert isinstance(pod.explained_energy_ratio, np.ndarray) + + assert pod.modes is pod.u + assert pod.time_coeffs is pod.v + assert pod.explained_energy_ratio is pod.explained_var_ratio + + +@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat", "square"]) +def test_pod_shapes_and_dims(matrix_type): + """Test that POD modes and time coefficients have the correct shapes and dims.""" + X = make_dataarray(matrix_type, time_dim_pos=1) + n_space, n_time = X.shape + n_modes = 10 - assert pod.mean_field.shape == (n_features,) - assert pod.modes.shape == (n_features, n_modes) - assert pod.coeffs.shape == (n_samples, n_modes) + pod = POD(n_modes=n_modes) + pod.fit(X) + + assert pod.modes.shape == (n_space, n_modes) + assert pod.time_coeffs.shape == (n_modes, n_time) assert pod.energy.shape == (n_modes,) - assert pod.s.shape == (n_modes,) - assert pod.coeffs.dims == (time_dim, "modes") - assert pod.modes.dims == (space_dim, "modes") - assert "modes" in pod.coeffs.coords and time_dim in pod.coeffs.coords - assert "modes" in pod.modes.coords and space_dim in pod.modes.coords + assert pod.modes.dims == ("space", "components") + assert pod.time_coeffs.dims == ("components", "time") + assert "space" in pod.modes.coords + assert "components" in pod.modes.coords + assert "time" not in pod.modes.coords -@pytest.mark.parametrize("algorithm", ["tsqr", "randomized"]) -def test_pod_calculation_correctness(pod_data, algorithm): - """Verify the POD results against the reference NumPy implementation.""" - X_xr = pod_data.chunk({"time": -1, "space": "auto"}) - X_np = X_xr.values - n_snapshots, n_features = X_np.shape - n_modes = 20 + assert "time" in pod.time_coeffs.coords + assert "components" in pod.time_coeffs.coords + assert "space" not in pod.time_coeffs.coords - X_mean_np = np.mean(X_np, axis=0) - X_fluc_np = X_np - X_mean_np - X_scaled_np = X_fluc_np / np.sqrt(n_snapshots) - u_np, s_np, vh_np = np.linalg.svd(X_scaled_np, full_matrices=False) - modes_np = vh_np.T[:, :n_modes] - coeffs_np = u_np[:, :n_modes] * s_np[:n_modes] +def test_time_dimension_handling(): + """Test that POD correctly handles the `time_dimension` parameter by transposing if necessary.""" + X = make_dataarray("short-and-fat", time_dim_pos=0) + assert X.dims == ("time", "space") + n_time, n_space = X.shape + n_modes = 15 - svd_kwargs = {} - if algorithm == "randomized": - svd_kwargs["n_power_iter"] = 8 + pod = POD(n_modes=n_modes, time_dimension="time") + pod.fit(X) - pod = POD(n_modes, algorithm=algorithm) - pod.fit(X_xr, dim="time") + assert pod.modes.shape == (n_space, n_modes) + assert pod.time_coeffs.shape == (n_modes, n_time) - rtol = 1e-3 + assert pod.modes.dims == ("space", "components") + assert "space" in pod.modes.coords + assert pod.time_coeffs.dims == ("components", "time") + assert "time" in pod.time_coeffs.coords - recon_fluc_pod = pod.coeffs.values @ pod.modes.values.T - recon_fluc_np = coeffs_np @ modes_np.T - assert np.allclose( - recon_fluc_pod, recon_fluc_np, rtol=rtol - ), "Reconstructed fluctuations from POD do not match the reference NumPy calculation." +def test_remove_mean(): + X = make_dataarray("tall-and-skinny") + n_modes = 5 + pod = POD(n_modes=n_modes, remove_mean=True, time_dimension="time") + pod.fit(X) -def test_pod_reconstruction_and_transform(pod_data): - """Test data reconstruction and transformation onto the POD basis.""" - X = pod_data.chunk({"time": -1, "space": "auto"}) - n_modes = 15 - pod = POD(n_modes, algorithm="randomized") - pod.fit(X, dim="time") + # (U*S @ V). + reconstructed_fluctuations = (pod.modes.data * pod.s) @ pod.time_coeffs.data + + # *The temporal mean of the reconstructed fluctuations should be close to zero + mean_of_reconstruction = reconstructed_fluctuations.mean(axis=1) + assert np.allclose(mean_of_reconstruction, 0, atol=1e-5) - coeffs_transformed = pod.transform(X) - assert isinstance( - coeffs_transformed, xr.DataArray - ), "Transform result should be a DataArray" - assert ( - coeffs_transformed.shape == pod.coeffs.shape - ), "Shape of transformed coeffs should match fitted coeffs" +def test_energy_calculation(): + """Test that the `energy` property is calculated correctly.""" + X = make_dataarray("square") + n_modes = 20 + + pod = POD(n_modes=n_modes, compute_energy_ratio=True, remove_mean=True) + pod.fit(X) + + n_samples = pod.modes.shape[0] + expected_energy = pod.s**2 / n_samples + assert np.allclose(pod.energy, expected_energy, atol=1e-6) + + # The total variance is the total energy of the system + X_processed = X - X.mean(dim="time") + total_variance = (X_processed.data**2).sum().compute() / n_samples + + # The explained energy ratio of each mode should be its energy divided by the total system energy (total variance) assert np.allclose( - np.abs(coeffs_transformed.values), np.abs(pod.coeffs.values), rtol=1e-3 - ), "Transformed coefficients do not match fitted coefficients within tolerance." - - X_reconstructed = pod.reconstruct(n_modes=n_modes) - assert isinstance( - X_reconstructed, xr.DataArray - ), "Reconstruction result should be a DataArray" - - assert ( - X_reconstructed.dims == X.dims - ), f"Expected dims {X.dims} but got {X_reconstructed.dims}" - assert ( - X_reconstructed.shape == X.shape - ), "Shape of reconstructed data should match original" - assert list(X_reconstructed.coords.keys()) == list( - X.coords.keys() - ), "Coordinates should match original" - - error = da.linalg.norm(X.data - X_reconstructed.data) / da.linalg.norm(X.data) - computed_error = error.compute() - assert computed_error < 0.1, f"Reconstruction error {computed_error} was too high." + pod.explained_energy_ratio, + pod.energy / total_variance, + rtol=1e-2, + ) + + +def test_invalid_time_dimension_error(): + """Test that a ValueError is raised for a non-existent time dimension.""" + X = make_dataarray("tall-and-skinny") + pod = POD(n_modes=5, time_dimension="non_existent_dim") + + with pytest.raises(ValueError, match="is not a dimension of the input array"): + pod.fit(X) + + +def test_compute_methods(): + """Test that the `compute_*` convenience methods work.""" + n_modes = 5 + pod = POD( + n_modes=n_modes, + compute_modes=False, + compute_time_coeffs=False, + compute_energy_ratio=False, + ) + + X = make_dataarray("tall-and-skinny") + pod.fit(X) + + assert isinstance(pod.modes.data, da.Array) + assert isinstance(pod.time_coeffs.data, da.Array) + assert isinstance(pod.explained_energy_ratio, da.Array) + + pod.compute_modes() + assert isinstance(pod.modes.data, np.ndarray) + + pod.compute_time_coeffs() + assert isinstance(pod.time_coeffs.data, np.ndarray) + + pod.compute_energy_ratio() + assert isinstance(pod.explained_energy_ratio, np.ndarray) diff --git a/tests/test_svd.py b/tests/test_svd.py index d6a488c..cec394b 100644 --- a/tests/test_svd.py +++ b/tests/test_svd.py @@ -209,3 +209,69 @@ def test_reconstruct_snapshot(matrix_type): assert X_r.shape == ( tsvd.u.shape[0], ), f"Reconstructed snapshot should have shape ({tsvd.u.shape[0]}), got {X_r.shape}." + + +@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat"]) +@pytest.mark.parametrize("compute_initially", [True, False]) +def test_reconstruct_size_based_computation(matrix_type, compute_initially): + """Test the reconstruct method's size-based computation (NumPy vs Dask).""" + X = make_dataarray(matrix_type) + n_components = 10 + tsvd = TruncatedSVD( + n_components=n_components, + compute_u=compute_initially, + compute_v=compute_initially, + ) + tsvd.fit(X) + + # --- 1. Test NumPy path (in-memory) --- + # Force NumPy path by setting a very high memory limit + X_recon_np = tsvd.reconstruct(memory_limit_bytes=1e12) # 1 TB limit + + assert isinstance( + X_recon_np.data, np.ndarray + ), "Reconstruction should be NumPy-backed for a large memory limit." + assert X_recon_np.shape == X.shape, "Reconstructed shape must match original." + assert X_recon_np.dims == X.dims, "Reconstructed dims must match original." + # Verify coordinates are preserved + assert X_recon_np.coords["samples"].equals(X.coords["samples"]) + assert X_recon_np.coords["time"].equals(X.coords["time"]) + + # --- 2. Test Dask path (out-of-memory) --- + # Force Dask path by setting a very low memory limit + X_recon_da = tsvd.reconstruct(memory_limit_bytes=1) # 1 Byte limit + + assert isinstance( + X_recon_da.data, da.Array + ), "Reconstruction should be Dask-backed for a small memory limit." + assert X_recon_da.shape == X.shape, "Lazy reconstructed shape must match original." + assert X_recon_da.dims == X.dims, "Lazy reconstructed dims must match original." + + # --- 3. Verify correctness of both results --- + # Compute the original and Dask-reconstructed data to compare + X_original_np = X.compute() + X_recon_da_computed = X_recon_da.compute() + + # The NumPy-path result and the computed Dask-path result should be identical + np.testing.assert_allclose( + X_recon_np.data, + X_recon_da_computed, + rtol=1e-5, + err_msg="NumPy and Dask path reconstructions should be nearly identical.", + ) + + # Both reconstructions should be a close approximation of the original data + # We use a loose tolerance because it's a low-rank approximation + reconstruction_error = np.linalg.norm( + X_original_np - X_recon_np.data + ) / np.linalg.norm(X_original_np) + assert reconstruction_error < 0.5, "Reconstruction error is unexpectedly high." + + +def test_reconstruct_before_fit_raises_error(): + """Test that calling reconstruct() before fit() raises a ValueError.""" + tsvd = TruncatedSVD(n_components=5) + with pytest.raises( + ValueError, match="The SVD model must be fitted before calling reconstruct" + ): + tsvd.reconstruct()