Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions examples/20-kalman_filter_simulator.ipynb

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion s2generator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

__version__ = "0.0.5"
__version__ = "0.0.6"

__all__ = [
"Node",
Expand All @@ -16,6 +16,7 @@
"print_ascii",
"print_hello",
"excitation",
"simulator",
"utils",
"params",
]
Expand All @@ -35,6 +36,9 @@
# Generic interface for generating stimulus time series data
from .excitation import Excitation

# # The simulators for generating time series data
# from .simulator import ARIMASimulator, WienerFilterSimulator

# Visualize the generated S2 object
from .utils.visualization import plot_series

Expand Down
3 changes: 3 additions & 0 deletions s2generator/simulator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
Created on 2026/02/13 13:04:42
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""

from .arima import ARIMASimulator

from .wiener_filter import WienerFilterSimulator
272 changes: 272 additions & 0 deletions s2generator/simulator/wiener_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
"""
Created on 2026/02/18 14:28:15
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""

from typing import Optional

import numpy as np
from scipy import signal
from scipy.linalg import toeplitz
from statsmodels.tsa.stattools import acf

from s2generator.utils._tools import yule_walker


class WienerFilterSimulator(object):
"""
Simulate new time series using a Wiener optimal filter fitted to the input series.

Any stationary signal can be viewed as the response obtained by exciting a linear system with white noise.

Inspired by parametric spectral estimation methods, this module constructs a learnable Wiener filter for modeling time series.
It mainly learns a convolution kernel that transforms a white noise sequence into a sequence with statistical properties similar to the input sequence.Meanwhile,
it ensures that the autocorrelation and power spectral density of the output exhibit shapes analogous to those of the input sequence.

Suppose the convolution kernel is a = [1, a1, a2, ..., an], the input white noise sequence is w[n], and the output analog sequence is y[n].
The filter function is implemented as ::

y[n] = a[0]*w[n] + a[1]*w[n-1] + ... + a[n]*w[n-n]

That is, it is generated by exciting a linear system with a white noise sequence.

We can solve for the filter coefficients a1~an and the noise variance σ² using the Yule-Walker equation.

This equation is somewhat similar to the spectral estimation of the AR model,
and its basic principle is as follows::

1
Y(z) = -------------------------------- X(z)
-1 -N
a[0] + a[1]z + ... + a[N] z

From the perspective of modern signal processing, this module is more akin to the parametric spectral estimation method of the AR model than to the Wiener filter.
In fact, this module can be regarded purely as the first part of the causal Wiener filter,
namely the whitening process.We only learn the whitening filter and do not learn the de-whitening filter.However,
in current deep learning-based time series analysis,naming the module as a Wiener filter is more understandable than referring to the spectral estimation theory of the AR model.
"""

def __init__(
self,
filter_order: int = 6,
revin: Optional[bool] = True,
random_state: Optional[int] = 42,
) -> None:
"""
:param filter_order: The order of the Wiener filter (including a0=1).
:param revin: Whether to perform reversible normalization (keep the variance of the input sequence unchanged).
:param random_state: The random seed for reproducibility.

:return: None
"""
# Filter order (including a0=1)
self.filter_order = filter_order
self.lag_max = filter_order * 2

# Record the autocorrelation function value of the input sequence
self.acf_vals = None

# Record the autocorrelation matrix R
# Toeplitz matrix [filter_order x filter_order]
self.R = None

# Noise variance σ²
self.sigma_sq = None

# Wiener filter coefficients [filter_order, ]
self._coeffs = None

# Should we perform normalization
# (keeping the variance of the input sequence constant)?
self.revin = revin
self.mean, self.std = None, None

# Record the currently used random seed and create a random number generator.
self.random_state = random_state
self.rng = np.random.RandomState(seed=random_state)

# Record the original time series of input
self.time_series = None
# The time series after truncation of the original sequence
# (removing the first filter_order points to avoid edge effects).
self.time_series_trimmed = None

# Record the fitting residuals of the model
self.residuals = None

# Record the time series generated by the simulation
self.simulated_series = None

def fit(self, time_series: np.ndarray) -> None:
"""
Fit a Wiener filter and calculate the filter coefficients.

The specific parameters will be obtained using the Yule-Walker method.

:param time_series: The input time series to fit the Wiener filter, with shape 1D [seq_len, ] or 2D [num_samples, seq_len].

:return: None
"""

# Check the format and length of the input time series.
time_series = self.check_inputs(time_series)
# Record the original input sequence
self.time_series = time_series

# Should we perform normalization?
if self.revin:
self.mean = np.mean(time_series)
self.std = np.std(time_series)
time_series = (time_series - self.mean) / self.std

self.acf_vals = self.acf(time_series=time_series)

# Constructing the autocorrelation matrix
self.R = toeplitz(self.acf_vals[: self.filter_order])

# The filter coefficients and noise variance are obtained by solving the Yule-Walker equation.
self._coeffs, self.sigma_sq = yule_walker(A=self.R)

# Initialize noise and calculate the fitted residuals.
# Note that increasing the filter order is necessary to avoid edge effects.
white_noise = self.rng.normal(
0, scale=np.sqrt(self.sigma_sq), size=len(time_series) + self.filter_order
)
self.residuals = time_series - self.invoke(white_noise=white_noise)

def transform(
self, num_samples: int, seq_len: int, random_state: Optional[int] = None
) -> np.ndarray:
"""
Generate new time series by exciting the fitted Wiener filter with white noise.

:param num_samples: The number of new time series to generate.
:param seq_len: The length of each generated time series.
:param random_state: The random seed for reproducibility. If None, it will use the random state of the class instance.

:return: The generated time series with shape [num_samples, seq_len].
"""

# Check if the filter coefficients have been calculated; if not, raise an error.
if self._coeffs is None:
raise ValueError(
"The filter coefficients have not been calculated yet; please call the `fit` method first."
)

# If a random seed is provided, a new random number generator will be used to generate the random number.
if random_state is not None:
rng = np.random.RandomState(seed=random_state)
else:
# Otherwise, use the existing random number generator in the class.
rng = self.rng

# Generate white noise (zero mean, unit variance) with the same length as the original sequence.
white_noise = rng.normal(
0,
scale=np.sqrt(self.sigma_sq),
size=(num_samples, seq_len + self.filter_order),
) # Note that increasing the filter order is necessary to avoid edge effects.

# Using white noise to excite the Wiener optimal filter to generate entirely new sequences
simulated_series = np.zeros(shape=(num_samples, seq_len))

for i in range(num_samples):
# Implemented using linear filtering:
# New sequence = White noise * Wiener filter (convolution)
simulated_series[i, :] = self.invoke(white_noise=white_noise[i, :])

# Remove edge effects after filtering (first filter_order points)
simulated_series = simulated_series[:, self.filter_order :]

# If normalization was performed previously,
# denormalization is now needed to maintain the same mean and variance as the original sequence.
if self.revin:
self.simulated_series = simulated_series * self.std + self.mean
else:
self.simulated_series = simulated_series

return self.simulated_series

def invoke(self, white_noise: np.ndarray) -> np.ndarray:
"""
Generate directly by calling the filter
(without generating random noise).

:param white_noise: The input white noise sequence to excite the filter, with shape 1D [seq_len + filter_order, ].

:return: The generated time series with shape 1D [seq_len, ].
"""
if self._coeffs is None:
raise ValueError(
"The filter coefficients have not been calculated yet; please call the `fit` method first."
)

# Implemented using linear filtering:
# New sequence = White noise * Wiener filter (convolution)
simulated_series = signal.lfilter(a=self._coeffs, b=1.0, x=white_noise)

# Remove edge effects after filtering (first filter_order points)
return simulated_series[self.filter_order :]

def acf(
self,
time_series: np.ndarray,
lag_max: Optional[int] = None,
fft: Optional[bool] = True,
) -> np.ndarray:
"""
Calculate the autocorrelation function (ACF) of the input time series.

:param time_series: The input time series for which to calculate the ACF, with shape 1D [seq_len, ].
:param lag_max: The maximum lag for which to calculate the ACF. If None, it defaults to self.lag_max (which is filter_order * 2).
:param fft: Whether to use FFT to speed up the ACF calculation. Default is True.

:return: The autocorrelation function values for lags from 0 to lag_max, with shape 1D [lag_max + 1, ].
"""
return acf(
time_series, nlags=self.lag_max if lag_max is None else lag_max, fft=fft
) # Use FFT to speed up the calculation

def check_inputs(self, time_series: np.ndarray) -> np.ndarray:
"""
Check the format and length of the input time series.

:param time_series: The input time series to check, with shape 1D [seq_len, ] or 2D [num_samples, seq_len].

:return: The checked and possibly reshaped time series, with shape 1D [seq_len, ].
"""

# Check if the input is a NumPy array
if not isinstance(time_series, np.ndarray):
raise ValueError("The input time series must be a NumPy array.")

# Check the shape of the input time series. It should be either 1D [seq_len, ] or 2D [num_samples, seq_len].
if len(time_series.shape) > 2:
raise ValueError(
"The input time series must be 1D with [seq_len, ] or 2D with [num_samples, seq_len], but got shape: {}".format(
time_series.shape
)
)

if len(time_series.shape) == 2:
# Flatten a 2D array into a 1D array
time_series = time_series.flatten()

if len(time_series) < 2 * self.filter_order:
raise ValueError(
f"Input time series length must be at least {2 * self.filter_order} to ensure accurate ACF estimation."
)

return time_series

@property
def coeffs(self) -> np.ndarray:
"""Get the Wiener filter coefficients after fitting the model."""
if self._coeffs is None:
raise ValueError(
"The filter coefficients have not been calculated yet; please call the `fit` method first."
)
return self._coeffs
4 changes: 4 additions & 0 deletions s2generator/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"generate_arma_samples",
"generate_nonstationary_sine",
"eacf_rlike",
"yule_walker",
"fft",
"fftshift",
"ifft",
Expand Down Expand Up @@ -68,6 +69,9 @@
# The EACF function to determine the order of ARMA model
from ._tools import eacf_rlike

# The Yule-Walker method to estimate the parameters of AR model
from ._tools import yule_walker

# Print the Generation Status
from ._print_status import PrintStatus

Expand Down
60 changes: 60 additions & 0 deletions s2generator/utils/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"generate_arma_samples",
"generate_nonstationary_sine",
"eacf_rlike",
"yule_walker",
]

import os
Expand Down Expand Up @@ -527,3 +528,62 @@ def eacf_rlike(
)

return eacf_matrix, threshold, eacf_df


def yule_walker(A: np.ndarray) -> Tuple[np.ndarray, Union[float, np.ndarray]]:
"""
Solving the Yule-Walker equations yields the parameters of the AR model.

The Yule-Walker equations can be expressed in matrix form as::

r(0)*1 + r(1)*a[1] + r(2)*a[2] + ... + r(n)*a[n] = σ²
r(1)*1 + r(0)*a[1] + r(1)*a[2] + ... + r(n-1)*a[n] = 0
r(2)*1 + r(1)*a[1] + r(0)*a[2] + ... + r(n-2)*a[n] = 0
...
r(n-1)*1 + r(n-2)*a[1] + r(n-3)*a[2] + ... + r(1)*a[n] = 0
r(n)*1 + r(n-1)*a[1] + r(n-2)*a[2] + ... + r(0)*a[n] = 0

Where r(k) is the value of the autocorrelation function at lag k,
a[i] are the coefficients of the AR model, and σ² is the variance of the noise.

This equation can be derived based on the orthogonality principle,
representing the relationship between the parameters of the AR model and the autocorrelation function of the sequence.

:param A: The matrix representing the Yule-Walker equations, where the first row corresponds to the equation for σ² and the subsequent rows correspond to the equations for a1~an.

:return: A tuple containing the vector of AR coefficients (a1~an) and the variance of the noise (σ²).
"""
# Check if A is a ndarray and has the correct shape
if not isinstance(A, np.ndarray):
raise ValueError("Input A must be a numpy array.")
if A.shape[0] < 2 or A.shape[1] < 2:
raise ValueError(
"Input A must have at least 2 rows and 2 columns to solve for a1~an and σ²."
)

# Check if the matrix A is square
if A.shape[0] != A.shape[1]:
raise ValueError("Input A must be a square matrix.")

# Solve for the coefficients using the equations from the second line onwards.
B = A[1:, 1:] # Extracting the coefficient matrix
c = -A[1:, 0] # Extract the constant term and take its negative.

try:
# Solve for a1~an
a = np.linalg.solve(B, c)

except np.linalg.LinAlgError as e:
print(f"Solution failed: {e}")
print(
"Reason: Matrix B is not invertible, and therefore cannot uniquely determine a1~an."
)

# If B is not invertible, we can use least squares to find an approximate solution.
a = np.linalg.lstsq(B, c, rcond=None)[0]

# Solve for σ² using the first line of the equations
x = np.hstack([1, a]) # Construct the x vector
sigma_sq = np.dot(A[0], x)

return x, sigma_sq
Loading