diff --git a/examples/19-arma_simulator.ipynb b/examples/19-arima_simulator.ipynb similarity index 100% rename from examples/19-arma_simulator.ipynb rename to examples/19-arima_simulator.ipynb diff --git a/s2generator/__init__.py b/s2generator/__init__.py index fe4fd58..1323b70 100644 --- a/s2generator/__init__.py +++ b/s2generator/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = "0.0.7" +__version__ = "0.0.8" __all__ = [ "Node", @@ -17,6 +17,7 @@ "print_hello", "excitation", "simulator", + "augmentation", "utils", "params", ] diff --git a/s2generator/augmentation/__init__.py b/s2generator/augmentation/__init__.py new file mode 100644 index 0000000..bfcbd4a --- /dev/null +++ b/s2generator/augmentation/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +""" +Created on 2026/03/02 12:15:45 +@author: Whenxuan Wang +@email: wwhenxuan@gmail.com +@url: https://github.com/wwhenxuan/S2Generator +""" + +__all__ = ["frequency_perturbation"] + +from .frequency_perturbation import frequency_perturbation diff --git a/s2generator/augmentation/frequency_perturbation.py b/s2generator/augmentation/frequency_perturbation.py new file mode 100644 index 0000000..34fb22a --- /dev/null +++ b/s2generator/augmentation/frequency_perturbation.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +""" +Created on 2026/03/02 12:16:05 +@author: Whenxuan Wang +@email: wwhenxuan@gmail.com +@url: https://github.com/wwhenxuan/S2Generator +""" + +import numpy as np +from numpy import fft + + +def sample_random_perturbation( + K: int, min_alpha: float, max_alpha: float, rng: np.random.RandomState = None +) -> np.ndarray: + """ + Randomly sample K numbers in the interval [-alpha_max, -alpha_min] U [alpha_min, alpha_max] + The purpose of this sampling is to construct random perturbations in the frequency domain. + + :param K: Number of random numbers to sample + :param min_alpha: Minimum absolute value of the random numbers + :param max_alpha: Maximum absolute value of the random numbers + :param rng: Optional random number generator, if not provided, the global numpy random number generator will be used + + :return: A numpy array containing K random numbers, which are uniformly distributed in the interval [-alpha_max, -alpha_min] U [alpha_min, alpha_max] + """ + + # First generate random numbers in [alpha_min, alpha_max] + if rng is not None: + positive_rand = rng.uniform(min_alpha, max_alpha, K) + + # Randomly generate sign (-1 or 1) + signs = rng.choice([-1, 1], size=K) + + else: + # When the random number generator is not passed in, use the global numpy random number generator + positive_rand = np.random.uniform(min_alpha, max_alpha, K) + signs = np.random.choice([-1, 1], size=K) + + # Combine to get the final result + final_random_nums = positive_rand * signs + + return final_random_nums + + +def frequency_perturbation( + series: np.ndarray, + min_alpha: float, + max_alpha: float, + r: float = 0.5, + rng: np.random.RandomState = None, +) -> np.ndarray: + """ + Perform frequency domain perturbation on the input time series. + This method adds random perturbations to the frequency components of the time series, + which can help to enhance the diversity of the data and improve the robustness of models trained on it. + + :param series: Input time series, a 1D numpy array + :param min_alpha: Minimum absolute value of the random perturbation added to the frequency components + :param max_alpha: Maximum absolute value of the random perturbation added to the frequency components + :param r: Proportion of frequency components to perturb (default is 0.5, meaning 50% of the frequency components will be perturbed) + :param rng: Optional random number generator, if not provided, the global numpy random number generator will be used. + + :return: Perturbed time series, a 1D numpy array of the same length as the input series. + """ + f = fft.rfft(series) + f_perturbed = f.copy() + frequencies = fft.fftfreq(len(series)) + + # Calculate the number of frequency domain components that can be perturbed + K = int(len(frequencies) * r) + + # Sample random perturbations for the real and imaginary parts of the frequency components + alpha_real = sample_random_perturbation( + K=K, min_alpha=min_alpha, max_alpha=max_alpha, rng=rng + ) + alpha_imag = sample_random_perturbation( + K=K, min_alpha=min_alpha, max_alpha=max_alpha, rng=rng + ) + + # Randomly select K frequency domain components for perturbation + indices = np.random.choice(len(f_perturbed), size=K, replace=False) + f_perturbed[indices] += alpha_real + 1j * alpha_imag + + # Perform inverse Fourier transform to restore the original time-domain signal + perturbed_series = fft.irfft(f_perturbed).real + + return perturbed_series diff --git a/s2generator/simulator/__init__.py b/s2generator/simulator/__init__.py index e2e287f..5643033 100644 --- a/s2generator/simulator/__init__.py +++ b/s2generator/simulator/__init__.py @@ -6,6 +6,8 @@ @url: https://github.com/wwhenxuan/S2Generator """ +__all__ = ["ARIMASimulator", "WienerFilterSimulator"] + from .arima import ARIMASimulator from .wiener_filter import WienerFilterSimulator diff --git a/s2generator/simulator/wiener_filter.py b/s2generator/simulator/wiener_filter.py index 3331313..4546c0f 100644 --- a/s2generator/simulator/wiener_filter.py +++ b/s2generator/simulator/wiener_filter.py @@ -74,7 +74,7 @@ def __init__( self.R = None # Noise variance σ² - self.sigma_sq = None + self._sigma_sq = None # Wiener filter coefficients [filter_order, ] self._coeffs = None @@ -128,7 +128,7 @@ def fit(self, time_series: np.ndarray) -> None: self.R = toeplitz(self.acf_vals[: self.filter_order]) # The filter coefficients and noise variance are obtained by solving the Yule-Walker equation. - self._coeffs, self.sigma_sq = yule_walker(A=self.R) + self._coeffs, self._sigma_sq = yule_walker(A=self.R) # Initialize noise and calculate the fitted residuals. # Note that increasing the filter order is necessary to avoid edge effects. @@ -259,6 +259,42 @@ def check_inputs(self, time_series: np.ndarray) -> np.ndarray: return time_series + def set_coeffs(self, coeffs: np.ndarray) -> None: + """ + Manually set the Wiener filter coefficients (for testing purposes). + + :param coeffs: The Wiener filter coefficients to set, with shape 1D [filter_order, ]. + + :return: None + """ + assert isinstance(coeffs, np.ndarray), "Coefficients must be a NumPy array." + assert ( + len(coeffs) == self.filter_order + ), f"Length of coefficients must be equal to filter_order ({self.filter_order})." + self._coeffs = coeffs + + def set_sigma_sq(self, sigma_sq: float) -> None: + """ + Manually set the noise variance σ² (for testing purposes). + + :param sigma_sq: The noise variance σ² to set, a positive float. + + :return: None + """ + # Check if sigma_sq is a numeric value + assert isinstance( + sigma_sq, (int, float, np.ndarray) + ), "Noise variance σ² must be a numeric value." + + # Check if sigma_sq is a positive value + assert sigma_sq > 0, "Noise variance σ² must be a positive float." + + # If sigma_sq is a NumPy array, check if it is a scalar (shape should be ()). + if isinstance(sigma_sq, np.ndarray): + assert sigma_sq.shape == (), "Noise variance σ² must be a scalar value." + + self._sigma_sq = np.asarray(sigma_sq, dtype=np.float64) + @property def coeffs(self) -> np.ndarray: """Get the Wiener filter coefficients after fitting the model.""" @@ -267,3 +303,12 @@ def coeffs(self) -> np.ndarray: "The filter coefficients have not been calculated yet; please call the `fit` method first." ) return self._coeffs + + @property + def sigma_sq(self) -> float: + """Get the noise variance σ² after fitting the model.""" + if self._sigma_sq is None: + raise ValueError( + "The noise variance has not been calculated yet; please call the `fit` method first." + ) + return self._sigma_sq diff --git a/setup.py b/setup.py index d7f103f..4de574f 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setuptools.setup( name="S2Generator", packages=setuptools.find_packages(), - version="0.0.7", + version="0.0.8", description="A series-symbol (S2) dual-modality data generation mechanism, enabling the unrestricted creation of high-quality time series data paired with corresponding symbolic representations.", # 包的简短描述 url="https://github.com/wwhenxuan/S2Generator", author="whenxuan, johnfan12, changewam", diff --git a/tests/data/data.npy b/tests/data/data.npy index 40ab060..6068906 100644 Binary files a/tests/data/data.npy and b/tests/data/data.npy differ diff --git a/tests/data/data.npz b/tests/data/data.npz index 2ddc83e..81f23cd 100644 Binary files a/tests/data/data.npz and b/tests/data/data.npz differ diff --git a/tests/data/s2data.npy b/tests/data/s2data.npy index 40ab060..6068906 100644 Binary files a/tests/data/s2data.npy and b/tests/data/s2data.npy differ diff --git a/tests/data/s2data.npz b/tests/data/s2data.npz index 2ddc83e..81f23cd 100644 Binary files a/tests/data/s2data.npz and b/tests/data/s2data.npz differ diff --git a/tests/test_augmentation.py b/tests/test_augmentation.py new file mode 100644 index 0000000..1bf98bb --- /dev/null +++ b/tests/test_augmentation.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +""" +Created on 2026/03/02 16:02:37 +@author: Whenxuan Wang +@email: wwhenxuan@gmail.com +@url: https://github.com/wwhenxuan/S2Generator +""" +import unittest + +import numpy as np + +from s2generator.augmentation import frequency_perturbation +from s2generator.augmentation.frequency_perturbation import sample_random_perturbation + + +class TestDataAugmentation(unittest.TestCase): + """Testing the data augmentation module for time series data""" + + # Random number generator for testing + rng = np.random.RandomState(42) + + def test_sample_random_perturbation(self) -> None: + """Test the function for sampling random perturbations in the frequency domain""" + K = 10 + min_alpha = 0.1 + max_alpha = 0.5 + + random_perturbations = sample_random_perturbation( + K=K, min_alpha=min_alpha, max_alpha=max_alpha, rng=self.rng + ) + + # Check the length of the output + self.assertEqual( + len(random_perturbations), + K, + msg="Wrong length of random perturbations in `test_sample_random_perturbation` method", + ) + + # Check the value range of the output + for alpha in random_perturbations: + self.assertTrue( + (alpha >= min_alpha and alpha <= max_alpha) + or (alpha <= -min_alpha and alpha >= -max_alpha), + msg="Random perturbation value out of range in `test_sample_random_perturbation` method", + ) + + def test_frequency_perturbation(self) -> None: + """Test the function for performing frequency domain perturbation on time series data""" + # Generate a simple time series for testing + t = np.linspace(0, 1, 100) + series = np.sin(2 * np.pi * 5 * t) + 0.5 * np.random.normal(size=100) + + min_alpha = 0.1 + max_alpha = 0.5 + r = 0.3 + + perturbed_series = frequency_perturbation( + series=series, min_alpha=min_alpha, max_alpha=max_alpha, r=r, rng=self.rng + ) + + # Check that the output has the same length as the input + self.assertEqual( + len(perturbed_series), + len(series), + msg="Output length does not match input length in `test_frequency_perturbation` method", + ) + + # Check that the output is different from the input (since we added perturbations) + self.assertFalse( + np.array_equal(perturbed_series, series), + msg="Perturbed series is identical to original series in `test_frequency_perturbation` method", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_wiener_filter_simulator.py b/tests/test_wiener_filter_simulator.py new file mode 100644 index 0000000..5e0c298 --- /dev/null +++ b/tests/test_wiener_filter_simulator.py @@ -0,0 +1,204 @@ +# -*- coding: utf-8 -*- +""" +Created on 2026/03/02 12:16:05 +@author: Whenxuan Wang +@email: wwhenxuan@gmail.com +@url: https://github.com/wwhenxuan/S2Generator +""" +import unittest + +import numpy as np +from scipy.linalg import toeplitz + +from s2generator.simulator import WienerFilterSimulator +from s2generator.utils._tools import yule_walker + + +class TestWienerFilterSimulator(unittest.TestCase): + """The Unittest for WienerFilterSimulator class.""" + + def test_create_instance(self) -> None: + """Test the creation of a WienerFilterSimulator instance.""" + + # Test with different filter orders + for filter_order in [1, 3, 5, 7, 9, 25]: + # Test with different combinations of revin and random_state parameters + for revin in [True, False]: + for random_state in [None, 0, 42]: + # Use subTest to test different combinations of parameters + with self.subTest( + filter_order=filter_order, + revin=revin, + random_state=random_state, + ): + + # Create an instance of WienerFilterSimulator with the specified parameters + simulator = WienerFilterSimulator( + filter_order=filter_order, + revin=revin, + random_state=random_state, + ) + self.assertIsInstance(simulator, WienerFilterSimulator) + + def test_fit_transform(self) -> None: + """Test the fit_transform method of WienerFilterSimulator.""" + + # Create a WienerFilterSimulator instance + simulator = WienerFilterSimulator(filter_order=5) + + # Generate random input signal for fitting and transforming + np.random.seed(0) # For reproducibility + time_series = np.random.rand(100) + + # Fit the model with the input signal + simulator.fit(time_series) + + # Check the residual variance after fitting (should be a non-negative value) + self.assertIsInstance(simulator.residuals, np.ndarray) + + # Check the shape of the residuals (should be the same as the input signal) + self.assertEqual(simulator.residuals.shape, time_series.shape) + + # Transform the input signal using the fitted model + simulation = simulator.transform(num_samples=5, seq_len=100) + + # Check the shape of the simulated data (should be (num_samples, seq_len)) + self.assertEqual(simulation.shape, (5, 100)) + + def test_check_inputs(self) -> None: + """Test the check_inputs method of WienerFilterSimulator.""" + + # Create a WienerFilterSimulator instance + simulator = WienerFilterSimulator() + + # The the wrong output signal with a different shape + for wrong_input_signal in [ + 1, + "hello, world!", + True, + [1, 2, 3], + {"input_signal": [1, 2, 3]}, + ]: + with self.subTest(wrong_input_signal=wrong_input_signal): + with self.assertRaises(ValueError): + simulator.check_inputs(wrong_input_signal) + + # Generate random input and output signals for fitting + np.random.seed(0) # For reproducibility + time_series = np.random.rand(100) + + # Test the valid time series + result = simulator.check_inputs(time_series) + self.assertEqual(result.shape, time_series.shape) + + # The the 2D array input signal + time_series_2d = np.random.rand(2, 100) + result = simulator.check_inputs(time_series_2d) + self.assertEqual(result.shape, time_series_2d.flatten().shape) + + def test_coeffs(self) -> None: + """The test for the coeffs property of WienerFilterSimulator.""" + + for filter_order in [3, 5, 7, 9]: + # Create a WienerFilterSimulator instance with different filter orders + simulator = WienerFilterSimulator(filter_order=filter_order) + + # Try to access the coeffs property before fitting the model (should raise an error) + with self.assertRaises(ValueError): + _ = simulator.coeffs + + time_series = np.random.rand(100) + simulator.fit(time_series) + + # Check if the coefficients are initialized correctly + self.assertEqual(len(simulator._coeffs), filter_order) + + def test_sigma_sq(self) -> None: + """Test the sigma_sq property of WienerFilterSimulator.""" + + # Create a WienerFilterSimulator instance + simulator = WienerFilterSimulator(filter_order=5) + + # Try to access the sigma_sq property before fitting the model (should raise an error) + with self.assertRaises(ValueError): + _ = simulator.sigma_sq + + # Generate random input signal for fitting + time_series = np.random.rand(100) + simulator.fit(time_series) + + # Check if the sigma_sq is initialized correctly (should be None) + self.assertIsNotNone(simulator.sigma_sq) + + def test_set_coeffs(self) -> None: + """Test the set_coeffs method of WienerFilterSimulator.""" + + for filter_order in [3, 5, 7, 9]: + # Create a WienerFilterSimulator instance with different filter orders + simulator = WienerFilterSimulator(filter_order=filter_order) + + # Generate random coefficients with the correct shape + coeffs = np.random.rand(filter_order) + + # Set the coefficients using the set_coeffs method + simulator.set_coeffs(coeffs=coeffs) + + # Check if the coefficients are initialized correctly + self.assertEqual(len(simulator.coeffs), filter_order) + + # Check if the coefficients are set correctly + self.assertTrue(np.allclose(simulator.coeffs, coeffs)) + + # Create a WienerFilterSimulator instance + simulator = WienerFilterSimulator(filter_order=5) + + # Test the wrong coefficients with a different shape + for wrong_coeffs in [np.random.rand(4), [1, 2, 3, 4, 5, 6], "hello, world!"]: + with self.subTest(wrong_coeffs=wrong_coeffs): + with self.assertRaises(AssertionError): + simulator.set_coeffs(wrong_coeffs) + + def test_set_sigma_sq(self) -> None: + """Test the set_sigma_sq method of WienerFilterSimulator.""" + + # Create a WienerFilterSimulator instance + simulator = WienerFilterSimulator(filter_order=5) + + # Generate a random sigma_sq value + sigma_sq = np.random.rand() + + # Set the sigma_sq using the set_sigma_sq method + simulator.set_sigma_sq(sigma_sq=sigma_sq) + + # Check if the sigma_sq is set correctly + self.assertEqual(simulator.sigma_sq, sigma_sq) + + # Test the wrong sigma_sq with a non-numeric value + for wrong_sigma_sq in [ + "hello, world!", + [1, 2, 3], + {"sigma_sq": sigma_sq}, + -0.5, + 0, + ]: + with self.subTest(wrong_sigma_sq=wrong_sigma_sq): + with self.assertRaises(AssertionError): + simulator.set_sigma_sq(wrong_sigma_sq) + + def test_yule_walker(self) -> None: + """Test the yule_walker function used in WienerFilterSimulator.""" + + # Generate a random time series + np.random.seed(0) # For reproducibility + time_series = np.random.rand(100) + + # Test the yule_walker function with different filter orders + for filter_order in [5, 7, 9]: + with self.subTest(filter_order=filter_order): + simulator = WienerFilterSimulator(filter_order=filter_order) + A = toeplitz(simulator.acf(time_series)[:filter_order]) + coeffs, sigma_sq = yule_walker(A=A) + + # Check if the coefficients and sigma_sq are returned correctly + self.assertEqual(len(coeffs), filter_order) + self.assertIsInstance(sigma_sq, (float, np.ndarray))