Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions s2generator/augmentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,34 @@
@url: https://github.com/wwhenxuan/S2Generator
"""

__all__ = ["frequency_perturbation"]
__all__ = [
"amplitude_modulation",
"censor_augmentation",
"empirical_model_modulation",
"frequency_perturbation",
"spike_injection",
"wiener_filter",
"add_linear_trend",
"time_series_mixup",
]

from .frequency_perturbation import frequency_perturbation
# Import the amplitude modulation function
from ._amplitude_modulation import amplitude_modulation

# Import the censoring augmentation function
from ._censor_augmentation import censor_augmentation

# Import the empirical model modulation function
from ._empirical_model_modulation import empirical_model_modulation

# Import the frequency perturbation function
from ._frequency_perturbation import frequency_perturbation

# Import the spike injection function
from ._spike_injection import spike_injection

# Import the wiener filter function
from ._wiener_filter import wiener_filter

# Import the time transformation functions
from ._time_transformation import add_linear_trend, time_series_mixup
103 changes: 103 additions & 0 deletions s2generator/augmentation/_amplitude_modulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""
Created on 2026/03/04 22:52:40
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""

import numpy as np

from s2generator.utils._tools import (
linear_interpolation,
cubic_spline_interpolation,
lagrange_interpolation,
)


def amplitude_modulation(
time_series: np.ndarray,
num_changepoints: int = 5,
amplitude_mean: float = 1.0,
amplitude_variation: float = 1.0,
interpolation_method: str = "linear",
rng: np.random.RandomState = None,
seed: int = 42,
) -> np.ndarray:
"""
Perform amplitude modulation on the input time series.
This augmentation introduces scale trends and change points into the time series
by multiplying the signal with a piecewise linear trend.
The modulation trend is generated by sampling change points and interpolating amplitudes between them.

:param time_series: Input time series, a 1D numpy array
:param num_changepoints: Number of change points to introduce in the modulation trend.
:param amplitude_mean: The mean amplitude of the modulation trend.
:param amplitude_variation: The variation of the amplitude around the mean.
:param interpolation_method: The method to interpolate the modulation trend.
Options are "linear", "cubic", or "lagrange".
:param rng: Optional random number generator for reproducibility. If None, a new RNG will be created using the provided seed.
:param seed: Random seed for reproducibility if rng is not provided.

:return: Amplitude modulated time series, a 1D numpy array of the same length as the input series.
"""
# Validate the input time series
time_series = np.asarray(time_series)
if time_series.ndim != 1:
raise ValueError("Input time_series must be a 1D array.")

# Validate interpolation method
if interpolation_method not in ["linear", "cubic", "lagrange"]:
raise ValueError(
"interpolation_method must be one of 'linear', 'cubic', or 'lagrange'."
)

# Get the length of the time series
n = len(time_series)

# Validate num_changepoints
if num_changepoints < 2:
raise ValueError(
"num_changepoints must be at least 2 to create a modulation trend."
)
if num_changepoints > n:
raise ValueError(
"num_changepoints cannot exceed the length of the time series."
)

# Initialize random number generator
if rng is None:
rng = np.random.RandomState(seed)

# Sample change points and their corresponding amplitudes
changepoints = np.hstack(
[
0,
np.sort(rng.choice(n - 2, size=num_changepoints - 2, replace=False) + 1),
n - 1,
]
)

# Generate random amplitudes for each change point
amplitude = rng.normal(
loc=amplitude_mean, scale=amplitude_variation, size=num_changepoints
)

# Interpolate the modulation trend across the entire time series
if interpolation_method == "linear":
modulation_trend = linear_interpolation(
x_known=changepoints, y_known=amplitude, x_new=np.arange(n)
)
elif interpolation_method == "cubic":
modulation_trend = cubic_spline_interpolation(
x_known=changepoints, y_known=amplitude, x_new=np.arange(n)
)
elif interpolation_method == "lagrange":
modulation_trend = lagrange_interpolation(
x_known=changepoints, y_known=amplitude, x_new=np.arange(n)
)

# Apply the modulation trend to the original time series
modulated_series = time_series * modulation_trend

return modulated_series, np.array(modulation_trend)
74 changes: 74 additions & 0 deletions s2generator/augmentation/_censor_augmentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
Created on 2026/03/05 00:42:51
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""

import numpy as np


def censor_augmentation(
time_series: np.ndarray,
upper_quantile: float = 0.65,
lower_quantile: float = 0.35,
bernoulli_p: float = 0.8,
rng: np.random.RandomState = None,
seed: int = 42,
) -> np.ndarray:
"""
Perform censoring augmentation on the input time series.

This augmentation censors (clips) the input signal either from below or
above, depending on a randomly sampled direction. The clipping threshold is determined by drawing
a quantile uniformly from the empirical distribution of the signal.

:param time_series: Input time series, a 1D numpy array
:param upper_quantile: Upper quantile threshold for censoring, default is 0.65
:param lower_quantile: Lower quantile threshold for censoring, default is 0.35
:param bernoulli_p: Probability of censoring direction (0 for lower censoring, 1 for upper censoring),
the default is 0.5, meaning equal probability for both directions.
:param rng: Optional random number generator for reproducibility.
If None, a new RNG will be created using the provided seed.
:param seed: Random seed for reproducibility if rng is not provided.

:return: Censored time series, a 1D numpy array of the same length as the input series.
"""

# Validate the input time series
time_series = np.asarray(time_series)
if time_series.ndim != 1:
raise ValueError("Input time_series must be a 1D array.")

# Validate bernoulli_p
if not (0 <= bernoulli_p <= 1):
raise ValueError("bernoulli_p must be in the range [0, 1].")

# Get the length of the time series
length = time_series.shape[0]

# Set random seed for reproducibility
if rng is None:
rng = np.random.RandomState(seed)

# Randomly sample quantile thresholds for each time step
quantile_threshold = rng.uniform(lower_quantile, upper_quantile, size=length)

# Compute the threshold value based on the quantile of the time series
threshold_value = np.quantile(time_series, quantile_threshold)

# Sample the censor direction from bernoulli distribution (0.5)
censor_direction = rng.binomial(
n=1, p=bernoulli_p, size=length
) # 0 for lower censoring, 1 for upper censoring

for t in range(length):
if censor_direction[t] == 1:
# Lower censoring
time_series[t] = max(time_series[t], threshold_value[t])
else:
# Upper censoring
time_series[t] = min(time_series[t], threshold_value[t])

return time_series
126 changes: 126 additions & 0 deletions s2generator/augmentation/_empirical_mode_modulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
"""
Created on 2026/03/05 11:05:45
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""
from typing import Optional

import numpy as np

from pysdkit import EMD


def empirical_mode_modulation(
time_series: np.ndarray,
min_scale_factor: float = 0.5,
max_scale_factor: float = 2.0,
low_frequency_enhancement: bool = True,
spline_kind: str = "cubic",
extrema_detection: str = "parabol",
max_imfs: Optional[int] = None,
rng: Optional[np.random.RandomState] = None,
seed: int = 42,
) -> np.ndarray:
"""
Perform empirical mode modulation on the input time series.
This augmentation decomposes the time series into intrinsic mode functions (IMFs)
using Empirical Mode Decomposition (EMD) and then reconstructs the signal
by randomly modifying the IMFs to introduce non-linear trends and variations.

:param time_series: Input time series, a 1D numpy array.
:param min_scale_factor: The minimum scaling factor to apply to each IMF.
:param max_scale_factor: The maximum scaling factor to apply to each IMF.
:param low_frequency_enhancement: Whether to enhance the low-frequency components.
If True, the method will suppress the high-frequency noise in the
empirical mode decomposition results and focus on enhancing
the characterization of the low-frequency part.
:param spline_kind: The kind of spline to use for interpolation,
options are "akima", "cubic", "pchip", "cubic_hermite", "slinear", "quadratic", "linear"
:param extrema_detection: The method for detecting extrema in the EMD process, options are "parabol" or "simple"
:param max_imfs: The maximum number of IMFs to extract,
if None, it will extract all possible IMFs until the residue is a monotonic function.

:return: Empirical mode modulated time series, a 1D numpy array of the same length as the input series.
"""
# Validate the input time series
time_series = np.asarray(time_series)
if time_series.ndim != 1:
raise ValueError("Input time_series must be a 1D array.")

# Normalize the input time series to have zero mean and unit variance
mean, std = np.mean(time_series), np.std(time_series)
time_series = (time_series - mean) / (
std + 1e-8
) # Add a small value to avoid division by zero

# Check if max_imfs is valid
if max_imfs is None:
# 表示会完整的分解所有的IMF分量
# 直到剩余的分量不再满足IMF的定义为止
max_imfs = -1

# Validate the spline kind
assert spline_kind in [
"akima",
"cubic",
"pchip",
"cubic_hermite",
"slinear",
"quadratic",
"linear",
], "spline_kind must be one of 'akima', 'cubic', 'pchip', 'cubic_hermite', 'slinear', 'quadratic', 'linear'."

# Validate the extrema detection method
assert extrema_detection in [
"parabol",
"simple",
], "extrema_detection must be one of 'parabol' or 'simple'."

# Initialize random number generator
if rng is None:
rng = np.random.RandomState(seed=seed)

# Perform Empirical Mode Decomposition
emd = EMD(
max_imfs=max_imfs, spline_kind=spline_kind, extrema_detection=extrema_detection
)
print(type(time_series))
imfs = emd.fit_transform(signal=time_series)

# Get the number of IMFs extracted
num_imfs = imfs.shape[0]

# Randomly select a scaling factor for modulation
scale_factor = rng.uniform(
low=min_scale_factor, high=max_scale_factor, size=num_imfs
)

# Validate the low_frequency_enhancement parameter
assert isinstance(
low_frequency_enhancement, bool
), "low_frequency_enhancement must be a boolean value."
if low_frequency_enhancement is True:
# If low-frequency enhancement is enabled, we can apply a stronger scaling to the lower frequency IMFs
scale_factor = np.sort(
scale_factor
) # Sort the scale factors to enhance low-frequency components more than high-frequency ones

# Randomly modify the IMFs to create modulation
modified_imfs = []
for index, imf in enumerate(imfs):
# Randomly scale each IMF by a factor between min_scale_factor and max_scale_factor
scaled_imf = imf * scale_factor[index]
modified_imfs.append(scaled_imf)

# Reconstruct the signal from the modified IMFs
modulated_time_series = np.sum(modified_imfs, axis=0)

# Denormalize the modulated time series to restore the original scale
modulated_time_series = (modulated_time_series - np.mean(modulated_time_series)) / (
np.std(modulated_time_series) + 1e-8
)
modulated_time_series = modulated_time_series * (std + 1e-8) + mean

return modulated_time_series
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def sample_random_perturbation(


def frequency_perturbation(
series: np.ndarray,
time_series: np.ndarray,
min_alpha: float,
max_alpha: float,
r: float = 0.5,
Expand All @@ -55,17 +55,33 @@ def frequency_perturbation(
This method adds random perturbations to the frequency components of the time series,
which can help to enhance the diversity of the data and improve the robustness of models trained on it.

:param series: Input time series, a 1D numpy array
:param time_series: Input time series, a 1D numpy array
:param min_alpha: Minimum absolute value of the random perturbation added to the frequency components
:param max_alpha: Maximum absolute value of the random perturbation added to the frequency components
:param r: Proportion of frequency components to perturb (default is 0.5, meaning 50% of the frequency components will be perturbed)
:param rng: Optional random number generator, if not provided, the global numpy random number generator will be used.

:return: Perturbed time series, a 1D numpy array of the same length as the input series.
"""
f = fft.rfft(series)
# Validate the input parameters
assert 0 <= r <= 1, "The proportion r must be between 0 and 1."
assert min_alpha >= 0, "min_alpha must be non-negative."
assert (
max_alpha >= min_alpha
), "max_alpha must be greater than or equal to min_alpha."

# Validate that the input time series is ndarray
if isinstance(time_series, list):
time_series = np.array(time_series)

# Validate that the input time series is 1D
if time_series.ndim != 1:
raise ValueError("Input time series must be a 1D array.")

# Perform Fast Fourier Transform to convert the time series to the frequency domain
f = fft.rfft(time_series)
f_perturbed = f.copy()
frequencies = fft.fftfreq(len(series))
frequencies = fft.fftfreq(len(time_series))

# Calculate the number of frequency domain components that can be perturbed
K = int(len(frequencies) * r)
Expand Down
Loading
Loading