diff --git a/src/svdrom/pod.py b/src/svdrom/pod.py index 9de5e00..09373c0 100644 --- a/src/svdrom/pod.py +++ b/src/svdrom/pod.py @@ -1 +1,98 @@ -# Module for POD implementation +import dask.array as da +import numpy as np +import xarray as xr + +from svdrom.logger import setup_logger +from svdrom.preprocessing import StandardScaler +from svdrom.svd import TruncatedSVD + +logger = setup_logger("POD", "pod.log") + + +# only reconstructs +class POD(TruncatedSVD): + def __init__( + self, + n_modes: int, + svd_algorithm: str = "tsqr", + compute_modes: bool = True, + compute_time_coeffs: bool = True, + compute_energy_ratio: bool = False, + rechunk: bool = False, + remove_mean: bool = False, + time_dimension: str = "time", + ): + super().__init__( + n_components=n_modes, + algorithm=svd_algorithm, + compute_u=compute_modes, + compute_v=compute_time_coeffs, + compute_var_ratio=compute_energy_ratio, + rechunk=rechunk, + ) + + self._energy: np.ndarray | None = None + self._remove_mean: bool = remove_mean + self._time_dim: str = time_dimension + + @property + def modes(self) -> xr.DataArray | None: + """POD (spatial) modes (read-only).""" + return self._u + + @property + def time_coeffs(self) -> xr.DataArray | None: + """Time coefficients (read-only).""" + return self._v + + @property + def energy(self) -> np.ndarray | None: + """Energy (variance) explained by each POD mode (read-only).""" + if self._s is not None and self._u is not None: + return self._s**2 / self._u.shape[0] + return None + + @property + def explained_energy_ratio(self) -> np.ndarray | da.Array | None: + return self._explained_var_ratio + + def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray: + """Transpose the array if the user-specified time dimension + is not along the columns. Remove the temporal average if + requested by the user. + """ + dims = X.dims + if dims.index(self._time_dim) != 1: + X = X.T + if self._remove_mean: + scaler = StandardScaler() + X = scaler( + X, + dim=self._time_dim, + with_std=False, + ) + assert isinstance(X, xr.DataArray), "Expected DataArray after scaling." + return X + + def fit( + self, + X: xr.DataArray, + **kwargs, + ) -> None: + if self._time_dim not in X.dims: + msg = ( + f"Specified time dimension '{self._time_dim}' " + "is not a dimension of the input array." + ) + raise ValueError(msg) + X = self._preprocess_array(X) + super().fit(X, **kwargs) + + def compute_modes(self) -> None: + super().compute_u() + + def compute_time_coeffs(self) -> None: + super().compute_v() + + def compute_energy_ratio(self) -> None: + super().compute_var_ratio() diff --git a/src/svdrom/svd.py b/src/svdrom/svd.py index 9716c33..37cc4ad 100644 --- a/src/svdrom/svd.py +++ b/src/svdrom/svd.py @@ -94,6 +94,8 @@ def __init__( self._s: np.ndarray | None = None self._v: xr.DataArray | None = None self._explained_var_ratio: np.ndarray | da.Array | None = None + self._original_dims: tuple | None = None + self._original_coords: dict | None = None @property def n_components(self) -> int: @@ -204,7 +206,19 @@ def _singular_vectors_to_dataarray( ) logger.exception(msg) raise ValueError(msg) - return xr.DataArray(singular_vectors, dims=new_dims, coords=coords, name=name) + + new_coords = {} + for coord_name, coord_array in X.coords.items(): + if all(dim in new_dims for dim in coord_array.dims): + new_coords[coord_name] = coord_array + + if "components" in new_dims: + comp_idx = new_dims.index("components") + new_coords["components"] = np.arange(singular_vectors.shape[comp_idx]) + + return xr.DataArray( + singular_vectors, dims=new_dims, coords=new_coords, name=name + ) def fit( self, diff --git a/tests/test_pod.py b/tests/test_pod.py new file mode 100644 index 0000000..157a42d --- /dev/null +++ b/tests/test_pod.py @@ -0,0 +1,201 @@ +import dask.array as da +import numpy as np +import pytest +import xarray as xr + +from svdrom.pod import POD + + +def make_dataarray(matrix_type: str, time_dim_pos: int = 1) -> xr.DataArray: + """Make a Dask-backed DataArray with random data of + specified matrix type. The matrix type can be one of: + - "tall-and-skinny": More samples than features. + - "short-and-fat": More features than samples. + - "square": Equal number of samples and features. + + """ + if matrix_type == "tall-and-skinny": + n_space, n_time = 10_000, 100 + space_chunks, time_chunks = -1, int(n_time / 2) + elif matrix_type == "short-and-fat": + n_space, n_time = 100, 10_000 + space_chunks, time_chunks = int(n_space / 2), -1 + elif matrix_type == "square": + n_space, n_time = 1_000, 1_000 + space_chunks, time_chunks = int(n_space / 2), int(n_time / 2) + else: + msg = ( + "Matrix type not supported. " + "Must be one of: tall-and-skinny, short-and-fat, square." + ) + raise ValueError(msg) + + if time_dim_pos == 1: + shape = (n_space, n_time) + chunks = (space_chunks, time_chunks) + dims = ["space", "time"] + coords = {"space": np.arange(n_space), "time": np.arange(n_time)} + elif time_dim_pos == 0: + shape = (n_time, n_space) + chunks = (time_chunks, space_chunks) + dims = ["time", "space"] + coords = {"time": np.arange(n_time), "space": np.arange(n_space)} + else: + raise ValueError("time_dim_pos must be 0 or 1.") + + X = da.random.random(shape, chunks=chunks).astype("float32") + return xr.DataArray(X, dims=dims, coords=coords) + + +@pytest.mark.parametrize("svd_algorithm", ["tsqr", "randomized"]) +def test_pod_basic_attributes_and_aliases(svd_algorithm): + """Test basic functionality and property aliases of POD.""" + n_modes = 10 + pod = POD( + n_modes=n_modes, + svd_algorithm=svd_algorithm, + compute_energy_ratio=True, + ) + + expected_attrs = ( + "modes", + "time_coeffs", + "energy", + "explained_energy_ratio", + ) + for attr in expected_attrs: + assert hasattr(pod, attr), f"POD should have attribute '{attr}'." + + X = make_dataarray("tall-and-skinny") + pod.fit(X) + + assert isinstance(pod.modes, xr.DataArray) + assert isinstance(pod.modes.data, np.ndarray) + assert isinstance(pod.time_coeffs, xr.DataArray) + assert isinstance(pod.time_coeffs.data, np.ndarray) + assert isinstance(pod.energy, np.ndarray) + assert isinstance(pod.explained_energy_ratio, np.ndarray) + + assert pod.modes is pod.u + assert pod.time_coeffs is pod.v + assert pod.explained_energy_ratio is pod.explained_var_ratio + + +@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat", "square"]) +def test_pod_shapes_and_dims(matrix_type): + """Test that POD modes and time coefficients have the correct shapes and dims.""" + X = make_dataarray(matrix_type, time_dim_pos=1) + n_space, n_time = X.shape + n_modes = 10 + + pod = POD(n_modes=n_modes) + pod.fit(X) + + assert pod.modes.shape == (n_space, n_modes) + assert pod.time_coeffs.shape == (n_modes, n_time) + assert pod.energy.shape == (n_modes,) + + assert pod.modes.dims == ("space", "components") + assert pod.time_coeffs.dims == ("components", "time") + + assert "space" in pod.modes.coords + assert "components" in pod.modes.coords + assert "time" not in pod.modes.coords + + assert "time" in pod.time_coeffs.coords + assert "components" in pod.time_coeffs.coords + assert "space" not in pod.time_coeffs.coords + + +def test_time_dimension_handling(): + """Test that POD correctly handles the `time_dimension` parameter by transposing if necessary.""" + X = make_dataarray("short-and-fat", time_dim_pos=0) + assert X.dims == ("time", "space") + n_time, n_space = X.shape + n_modes = 15 + + pod = POD(n_modes=n_modes, time_dimension="time") + pod.fit(X) + + assert pod.modes.shape == (n_space, n_modes) + assert pod.time_coeffs.shape == (n_modes, n_time) + + assert pod.modes.dims == ("space", "components") + assert "space" in pod.modes.coords + assert pod.time_coeffs.dims == ("components", "time") + assert "time" in pod.time_coeffs.coords + + +def test_remove_mean(): + X = make_dataarray("tall-and-skinny") + n_modes = 5 + + pod = POD(n_modes=n_modes, remove_mean=True, time_dimension="time") + pod.fit(X) + + # (U*S @ V). + reconstructed_fluctuations = (pod.modes.data * pod.s) @ pod.time_coeffs.data + + # *The temporal mean of the reconstructed fluctuations should be close to zero + mean_of_reconstruction = reconstructed_fluctuations.mean(axis=1) + assert np.allclose(mean_of_reconstruction, 0, atol=1e-5) + + +def test_energy_calculation(): + """Test that the `energy` property is calculated correctly.""" + X = make_dataarray("square") + n_modes = 20 + + pod = POD(n_modes=n_modes, compute_energy_ratio=True, remove_mean=True) + pod.fit(X) + + n_samples = pod.modes.shape[0] + expected_energy = pod.s**2 / n_samples + assert np.allclose(pod.energy, expected_energy, atol=1e-6) + + # The total variance is the total energy of the system + X_processed = X - X.mean(dim="time") + total_variance = (X_processed.data**2).sum().compute() / n_samples + + # The explained energy ratio of each mode should be its energy divided by the total system energy (total variance) + assert np.allclose( + pod.explained_energy_ratio, + pod.energy / total_variance, + rtol=1e-2, + ) + + +def test_invalid_time_dimension_error(): + """Test that a ValueError is raised for a non-existent time dimension.""" + X = make_dataarray("tall-and-skinny") + pod = POD(n_modes=5, time_dimension="non_existent_dim") + + with pytest.raises(ValueError, match="is not a dimension of the input array"): + pod.fit(X) + + +def test_compute_methods(): + """Test that the `compute_*` convenience methods work.""" + n_modes = 5 + pod = POD( + n_modes=n_modes, + compute_modes=False, + compute_time_coeffs=False, + compute_energy_ratio=False, + ) + + X = make_dataarray("tall-and-skinny") + pod.fit(X) + + assert isinstance(pod.modes.data, da.Array) + assert isinstance(pod.time_coeffs.data, da.Array) + assert isinstance(pod.explained_energy_ratio, da.Array) + + pod.compute_modes() + assert isinstance(pod.modes.data, np.ndarray) + + pod.compute_time_coeffs() + assert isinstance(pod.time_coeffs.data, np.ndarray) + + pod.compute_energy_ratio() + assert isinstance(pod.explained_energy_ratio, np.ndarray) diff --git a/tests/test_svd.py b/tests/test_svd.py index d6a488c..cec394b 100644 --- a/tests/test_svd.py +++ b/tests/test_svd.py @@ -209,3 +209,69 @@ def test_reconstruct_snapshot(matrix_type): assert X_r.shape == ( tsvd.u.shape[0], ), f"Reconstructed snapshot should have shape ({tsvd.u.shape[0]}), got {X_r.shape}." + + +@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat"]) +@pytest.mark.parametrize("compute_initially", [True, False]) +def test_reconstruct_size_based_computation(matrix_type, compute_initially): + """Test the reconstruct method's size-based computation (NumPy vs Dask).""" + X = make_dataarray(matrix_type) + n_components = 10 + tsvd = TruncatedSVD( + n_components=n_components, + compute_u=compute_initially, + compute_v=compute_initially, + ) + tsvd.fit(X) + + # --- 1. Test NumPy path (in-memory) --- + # Force NumPy path by setting a very high memory limit + X_recon_np = tsvd.reconstruct(memory_limit_bytes=1e12) # 1 TB limit + + assert isinstance( + X_recon_np.data, np.ndarray + ), "Reconstruction should be NumPy-backed for a large memory limit." + assert X_recon_np.shape == X.shape, "Reconstructed shape must match original." + assert X_recon_np.dims == X.dims, "Reconstructed dims must match original." + # Verify coordinates are preserved + assert X_recon_np.coords["samples"].equals(X.coords["samples"]) + assert X_recon_np.coords["time"].equals(X.coords["time"]) + + # --- 2. Test Dask path (out-of-memory) --- + # Force Dask path by setting a very low memory limit + X_recon_da = tsvd.reconstruct(memory_limit_bytes=1) # 1 Byte limit + + assert isinstance( + X_recon_da.data, da.Array + ), "Reconstruction should be Dask-backed for a small memory limit." + assert X_recon_da.shape == X.shape, "Lazy reconstructed shape must match original." + assert X_recon_da.dims == X.dims, "Lazy reconstructed dims must match original." + + # --- 3. Verify correctness of both results --- + # Compute the original and Dask-reconstructed data to compare + X_original_np = X.compute() + X_recon_da_computed = X_recon_da.compute() + + # The NumPy-path result and the computed Dask-path result should be identical + np.testing.assert_allclose( + X_recon_np.data, + X_recon_da_computed, + rtol=1e-5, + err_msg="NumPy and Dask path reconstructions should be nearly identical.", + ) + + # Both reconstructions should be a close approximation of the original data + # We use a loose tolerance because it's a low-rank approximation + reconstruction_error = np.linalg.norm( + X_original_np - X_recon_np.data + ) / np.linalg.norm(X_original_np) + assert reconstruction_error < 0.5, "Reconstruction error is unexpectedly high." + + +def test_reconstruct_before_fit_raises_error(): + """Test that calling reconstruct() before fit() raises a ValueError.""" + tsvd = TruncatedSVD(n_components=5) + with pytest.raises( + ValueError, match="The SVD model must be fitted before calling reconstruct" + ): + tsvd.reconstruct()