Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion src/svdrom/pod.py
Original file line number Diff line number Diff line change
@@ -1 +1,98 @@
# Module for POD implementation
import dask.array as da
import numpy as np
import xarray as xr

from svdrom.logger import setup_logger
from svdrom.preprocessing import StandardScaler
from svdrom.svd import TruncatedSVD

logger = setup_logger("POD", "pod.log")


# only reconstructs
class POD(TruncatedSVD):
def __init__(
self,
n_modes: int,
svd_algorithm: str = "tsqr",
compute_modes: bool = True,
compute_time_coeffs: bool = True,
compute_energy_ratio: bool = False,
rechunk: bool = False,
remove_mean: bool = False,
time_dimension: str = "time",
):
super().__init__(
n_components=n_modes,
algorithm=svd_algorithm,
compute_u=compute_modes,
compute_v=compute_time_coeffs,
compute_var_ratio=compute_energy_ratio,
rechunk=rechunk,
)

self._energy: np.ndarray | None = None
self._remove_mean: bool = remove_mean
self._time_dim: str = time_dimension

@property
def modes(self) -> xr.DataArray | None:
"""POD (spatial) modes (read-only)."""
return self._u

@property
def time_coeffs(self) -> xr.DataArray | None:
"""Time coefficients (read-only)."""
return self._v

@property
def energy(self) -> np.ndarray | None:
"""Energy (variance) explained by each POD mode (read-only)."""
if self._s is not None and self._u is not None:
return self._s**2 / self._u.shape[0]
return None

@property
def explained_energy_ratio(self) -> np.ndarray | da.Array | None:
return self._explained_var_ratio

def _preprocess_array(self, X: xr.DataArray) -> xr.DataArray:
"""Transpose the array if the user-specified time dimension
is not along the columns. Remove the temporal average if
requested by the user.
"""
dims = X.dims
if dims.index(self._time_dim) != 1:
X = X.T
if self._remove_mean:
scaler = StandardScaler()
X = scaler(
X,
dim=self._time_dim,
with_std=False,
)
assert isinstance(X, xr.DataArray), "Expected DataArray after scaling."
return X

def fit(
self,
X: xr.DataArray,
**kwargs,
) -> None:
if self._time_dim not in X.dims:
msg = (
f"Specified time dimension '{self._time_dim}' "
"is not a dimension of the input array."
)
raise ValueError(msg)
X = self._preprocess_array(X)
super().fit(X, **kwargs)

def compute_modes(self) -> None:
super().compute_u()

def compute_time_coeffs(self) -> None:
super().compute_v()

def compute_energy_ratio(self) -> None:
super().compute_var_ratio()
16 changes: 15 additions & 1 deletion src/svdrom/svd.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def __init__(
self._s: np.ndarray | None = None
self._v: xr.DataArray | None = None
self._explained_var_ratio: np.ndarray | da.Array | None = None
self._original_dims: tuple | None = None
self._original_coords: dict | None = None

@property
def n_components(self) -> int:
Expand Down Expand Up @@ -204,7 +206,19 @@ def _singular_vectors_to_dataarray(
)
logger.exception(msg)
raise ValueError(msg)
return xr.DataArray(singular_vectors, dims=new_dims, coords=coords, name=name)

new_coords = {}
for coord_name, coord_array in X.coords.items():
if all(dim in new_dims for dim in coord_array.dims):
new_coords[coord_name] = coord_array

if "components" in new_dims:
comp_idx = new_dims.index("components")
new_coords["components"] = np.arange(singular_vectors.shape[comp_idx])

return xr.DataArray(
singular_vectors, dims=new_dims, coords=new_coords, name=name
)

def fit(
self,
Expand Down
201 changes: 201 additions & 0 deletions tests/test_pod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import dask.array as da
import numpy as np
import pytest
import xarray as xr

from svdrom.pod import POD


def make_dataarray(matrix_type: str, time_dim_pos: int = 1) -> xr.DataArray:
"""Make a Dask-backed DataArray with random data of
specified matrix type. The matrix type can be one of:
- "tall-and-skinny": More samples than features.
- "short-and-fat": More features than samples.
- "square": Equal number of samples and features.

"""
if matrix_type == "tall-and-skinny":
n_space, n_time = 10_000, 100
space_chunks, time_chunks = -1, int(n_time / 2)
elif matrix_type == "short-and-fat":
n_space, n_time = 100, 10_000
space_chunks, time_chunks = int(n_space / 2), -1
elif matrix_type == "square":
n_space, n_time = 1_000, 1_000
space_chunks, time_chunks = int(n_space / 2), int(n_time / 2)
else:
msg = (
"Matrix type not supported. "
"Must be one of: tall-and-skinny, short-and-fat, square."
)
raise ValueError(msg)

if time_dim_pos == 1:
shape = (n_space, n_time)
chunks = (space_chunks, time_chunks)
dims = ["space", "time"]
coords = {"space": np.arange(n_space), "time": np.arange(n_time)}
elif time_dim_pos == 0:
shape = (n_time, n_space)
chunks = (time_chunks, space_chunks)
dims = ["time", "space"]
coords = {"time": np.arange(n_time), "space": np.arange(n_space)}
else:
raise ValueError("time_dim_pos must be 0 or 1.")

X = da.random.random(shape, chunks=chunks).astype("float32")
return xr.DataArray(X, dims=dims, coords=coords)


@pytest.mark.parametrize("svd_algorithm", ["tsqr", "randomized"])
def test_pod_basic_attributes_and_aliases(svd_algorithm):
"""Test basic functionality and property aliases of POD."""
n_modes = 10
pod = POD(
n_modes=n_modes,
svd_algorithm=svd_algorithm,
compute_energy_ratio=True,
)

expected_attrs = (
"modes",
"time_coeffs",
"energy",
"explained_energy_ratio",
)
for attr in expected_attrs:
assert hasattr(pod, attr), f"POD should have attribute '{attr}'."

X = make_dataarray("tall-and-skinny")
pod.fit(X)

assert isinstance(pod.modes, xr.DataArray)
assert isinstance(pod.modes.data, np.ndarray)
assert isinstance(pod.time_coeffs, xr.DataArray)
assert isinstance(pod.time_coeffs.data, np.ndarray)
assert isinstance(pod.energy, np.ndarray)
assert isinstance(pod.explained_energy_ratio, np.ndarray)

assert pod.modes is pod.u
assert pod.time_coeffs is pod.v
assert pod.explained_energy_ratio is pod.explained_var_ratio


@pytest.mark.parametrize("matrix_type", ["tall-and-skinny", "short-and-fat", "square"])
def test_pod_shapes_and_dims(matrix_type):
"""Test that POD modes and time coefficients have the correct shapes and dims."""
X = make_dataarray(matrix_type, time_dim_pos=1)
n_space, n_time = X.shape
n_modes = 10

pod = POD(n_modes=n_modes)
pod.fit(X)

assert pod.modes.shape == (n_space, n_modes)
assert pod.time_coeffs.shape == (n_modes, n_time)
assert pod.energy.shape == (n_modes,)

assert pod.modes.dims == ("space", "components")
assert pod.time_coeffs.dims == ("components", "time")

assert "space" in pod.modes.coords
assert "components" in pod.modes.coords
assert "time" not in pod.modes.coords

assert "time" in pod.time_coeffs.coords
assert "components" in pod.time_coeffs.coords
assert "space" not in pod.time_coeffs.coords


def test_time_dimension_handling():
"""Test that POD correctly handles the `time_dimension` parameter by transposing if necessary."""
X = make_dataarray("short-and-fat", time_dim_pos=0)
assert X.dims == ("time", "space")
n_time, n_space = X.shape
n_modes = 15

pod = POD(n_modes=n_modes, time_dimension="time")
pod.fit(X)

assert pod.modes.shape == (n_space, n_modes)
assert pod.time_coeffs.shape == (n_modes, n_time)

assert pod.modes.dims == ("space", "components")
assert "space" in pod.modes.coords
assert pod.time_coeffs.dims == ("components", "time")
assert "time" in pod.time_coeffs.coords


def test_remove_mean():
X = make_dataarray("tall-and-skinny")
n_modes = 5

pod = POD(n_modes=n_modes, remove_mean=True, time_dimension="time")
pod.fit(X)

# (U*S @ V).
reconstructed_fluctuations = (pod.modes.data * pod.s) @ pod.time_coeffs.data

# *The temporal mean of the reconstructed fluctuations should be close to zero
mean_of_reconstruction = reconstructed_fluctuations.mean(axis=1)
assert np.allclose(mean_of_reconstruction, 0, atol=1e-5)


def test_energy_calculation():
"""Test that the `energy` property is calculated correctly."""
X = make_dataarray("square")
n_modes = 20

pod = POD(n_modes=n_modes, compute_energy_ratio=True, remove_mean=True)
pod.fit(X)

n_samples = pod.modes.shape[0]
expected_energy = pod.s**2 / n_samples
assert np.allclose(pod.energy, expected_energy, atol=1e-6)

# The total variance is the total energy of the system
X_processed = X - X.mean(dim="time")
total_variance = (X_processed.data**2).sum().compute() / n_samples

# The explained energy ratio of each mode should be its energy divided by the total system energy (total variance)
assert np.allclose(
pod.explained_energy_ratio,
pod.energy / total_variance,
rtol=1e-2,
)


def test_invalid_time_dimension_error():
"""Test that a ValueError is raised for a non-existent time dimension."""
X = make_dataarray("tall-and-skinny")
pod = POD(n_modes=5, time_dimension="non_existent_dim")

with pytest.raises(ValueError, match="is not a dimension of the input array"):
pod.fit(X)


def test_compute_methods():
"""Test that the `compute_*` convenience methods work."""
n_modes = 5
pod = POD(
n_modes=n_modes,
compute_modes=False,
compute_time_coeffs=False,
compute_energy_ratio=False,
)

X = make_dataarray("tall-and-skinny")
pod.fit(X)

assert isinstance(pod.modes.data, da.Array)
assert isinstance(pod.time_coeffs.data, da.Array)
assert isinstance(pod.explained_energy_ratio, da.Array)

pod.compute_modes()
assert isinstance(pod.modes.data, np.ndarray)

pod.compute_time_coeffs()
assert isinstance(pod.time_coeffs.data, np.ndarray)

pod.compute_energy_ratio()
assert isinstance(pod.explained_energy_ratio, np.ndarray)
Loading