Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ venv/
/docs/auto_examples/*.md5

/docs/_build/

.vscode/
.vscode
1 change: 0 additions & 1 deletion s2generator/augmentation/_empirical_mode_modulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def empirical_mode_modulation(
emd = EMD(
max_imfs=max_imfs, spline_kind=spline_kind, extrema_detection=extrema_detection
)
print(type(time_series))
imfs = emd.fit_transform(signal=time_series)

# Get the number of IMFs extracted
Expand Down
25 changes: 21 additions & 4 deletions s2generator/augmentation/_time_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@


def add_linear_trend(
time_series: np.ndarray, trend_strength: float = 1.0, direction: str = "upward"
time_series: np.ndarray,
trend_strength: float = 1.0,
direction: str = "upward",
normalize: bool = True,
) -> np.ndarray:
"""
Perform linear trend augmentation on the input time series.
Expand All @@ -19,6 +22,7 @@ def add_linear_trend(
:param time_series: Input time series, a 1D numpy array
:param trend_strength: The strength of the linear trend to be added, default is 1.0.
:param direction: The direction of the linear trend, either "upward" or "downward", default is "upward".
:param normalize: Whether to normalize the output time series to maintain the same scale as the input, default is True.

:return: Augmented time series with a linear trend, a 1D numpy array of the same length as the input series.
"""
Expand All @@ -31,17 +35,30 @@ def add_linear_trend(

# Create a linear trend
if direction == "upward":
trend = np.linspace(0, trend_strength * seq_length, seq_length)
trend = np.linspace(0, 1, seq_length)
elif direction == "downward":
trend = np.linspace(0, -trend_strength * seq_length, seq_length)
trend = np.linspace(0, -1, seq_length)
else:
raise ValueError("direction must be either 'upward' or 'downward'")

# Scale the trend to have the same energy as the original time series
trend_energy = np.mean(trend**2)

if trend_energy > 0:
trend = trend * np.sqrt(original_energy / trend_energy)
# Scale the trend to have the same energy as the original time series, and then apply the trend strength factor
trend = trend * np.sqrt(original_energy / trend_energy) * trend_strength
else:
# If the trend energy is zero (which can happen if the trend is constant),
# we set the trend to zero to avoid division by zero
trend = np.zeros_like(trend)

if normalize:
augmented_series = time_series + trend
# Normalize the augmented series to maintain the same energy as the original time series
augmented_series = (augmented_series - np.mean(augmented_series)) / np.std(
augmented_series
) * np.std(time_series) + np.mean(time_series)
return augmented_series

# Average the original signal and the trend to maintain the overall scale
return (time_series + trend) / 2
Expand Down
6 changes: 3 additions & 3 deletions s2generator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ def val_diff(self, xs: ndarray, deterministic: Optional[bool] = True) -> ndarray
if xs.ndim > 1:
# For multivariate case, keep other dimensions constant
x_uniform_input = np.tile(np.mean(xs, axis=0), (n_integration_points, 1))
x_uniform_input[:, 0] = (
x_uniform # Replace first dimension with uniform grid
)
x_uniform_input[
:, 0
] = x_uniform # Replace first dimension with uniform grid
else:
x_uniform_input = x_uniform.reshape(-1, 1) # Ensure 2D array for val method

Expand Down
185 changes: 185 additions & 0 deletions s2generator/simulator/gaussia_mixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from typing import Optional

import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
from scipy import stats


class GaussianMixtureSimulator(object):
def __init__(
self,
n_components=3,
covariance_type="full",
tol: float = 1e-3,
reg_covarfloat: float = 1e-6,
max_iter: int = 100,
n_init: int = 1,
init_params: str = "kmeans",
random_state=42,
) -> None:
"""
:param n_components: int, default=3. The number of mixture components.
:param covariance_type: str, default='full'. The type of covariance parameters to use. Must be one of 'full', 'tied', 'diag', 'spherical'.
:param tol: float, default=1e-3. The convergence threshold. EM iterations will stop when the lower bound average gain is below this threshold.
:param reg_covarfloat: float, default=1e-6. Non-negative regularization added to the diagonal of covariance. Allows to assure that the covariance matrices are all positive.
:param max_iter: int, default=100. The number of EM iterations to perform.
:param n_init: int, default=1. The number of initializations to perform. The best results are kept.
:param init_params: str, default='kmeans'. The method used to initialize the weights, the means and the precisions. String must be one of 'kmeans', 'k-means++', 'random', 'random_from_data'.
:param random_state: int, default=42. The seed used by the random number generator.
"""
self.n_components = n_components
self.covariance_type = covariance_type
self.random_state = random_state

# Validate the covariance_type
if covariance_type not in ["full", "tied", "diag", "spherical"]:
raise ValueError(
"Invalid covariance_type. Must be one of 'full', 'tied', 'diag', 'spherical'."
)

# Validate the init_params
if init_params not in ["kmeans", "k-means++", "random", "random_from_data"]:
raise ValueError(
"Invalid init_params. Must be one of 'kmeans', 'k-means++', 'random', 'random_from_data'."
)

# Initialize the GaussianMixture model
self.model = GaussianMixture(
n_components=n_components,
covariance_type=covariance_type,
tol=tol,
reg_covar=reg_covarfloat,
max_iter=max_iter,
n_init=n_init,
init_params=init_params,
random_state=random_state,
)

def fit(self, time_series: np.ndarray) -> None:
"""
Fit the Gaussian Mixture Model to the provided time series data.
:param time_series: 1D np.ndarray. The input time series data.
"""

# Check if the input time series is 1D numpy array
time_series = self._check_inputs(time_series)

# Reshape the time series data to fit the GMM input requirements
time_series = time_series.reshape(-1, 1)

self.model.fit(time_series)

def transform(
self, num_samples: int, seq_len: int, random_state: Optional[int] = None
) -> np.ndarray:
"""
Transform the model to generate new samples.
:param num_samples: int. The number of samples to generate.
:param seq_len: int. The length of each generated sequence.
:param random_state: Optional[int]. The random state for reproducibility.
:return: Generated time series data.
"""

# 根据GMM的组件权重,随机选择每个样本属于哪个高斯组件
component_indices = np.random.choice(
self.n_components, size=(num_samples, seq_len), p=self.model.weights_
)

# 从选中的高斯组件中采样生成数据
generated_series = np.zeros(shape=(num_samples, seq_len))
for i in range(num_samples):
for j in range(seq_len):
comp = component_indices[i, j]
mean = self.model.means_[comp, 0]
cov = self.covariance(comp)
generated_series[i, j] = np.random.normal(loc=mean, scale=np.sqrt(cov))

return generated_series

def _check_inputs(self, time_series: np.ndarray) -> np.ndarray:
"""
Check if the input time series is a 1D numpy array and reshape it to fit the GMM input requirements.
:param time_series: 1D np.ndarray. The input time series data.
:return: Reshaped time series data.
"""
if not isinstance(time_series, np.ndarray):
raise ValueError("Input time_series must be a numpy array.")
if time_series.ndim != 1:
raise ValueError("Input time_series must be a 1D array.")

return time_series.reshape(-1, 1)

def weight(self, component_index: int) -> float:
"""
Get the weight of a specific component in the GMM.
:param component_index: int. The index of the component.
:return: The weight of the specified component.
"""
if component_index < 0 or component_index >= self.n_components:
raise ValueError(
f"Component index must be between 0 and {self.n_components - 1}."
)

return self.model.weights_[component_index]

def weights(self) -> np.ndarray:
"""
Get the weights of all components in the GMM.
:return: The weights of all components.
"""
return self.model.weights_

def mean(self, component_index: int) -> float:
"""
Get the mean of a specific component in the GMM.
:param component_index: int. The index of the component.
:return: The mean of the specified component.
"""
if component_index < 0 or component_index >= self.n_components:
raise ValueError(
f"Component index must be between 0 and {self.n_components - 1}."
)

return self.model.means_[component_index][0]

def means(self) -> np.ndarray:
"""
Get the means of all components in the GMM.
:return: The means of all components.
"""
return self.model.means_.flatten()

def covariance(self, component_index: int) -> float:
"""
Get the covariance of a specific component in the GMM.
:param component_index: int. The index of the component.
:return: The covariance of the specified component.
"""
if component_index < 0 or component_index >= self.n_components:
raise ValueError(
f"Component index must be between 0 and {self.n_components - 1}."
)

if self.covariance_type == "full":
return self.model.covariances_[component_index][0][0]
elif self.covariance_type == "tied":
return self.model.covariances_[0][0][0]
elif self.covariance_type == "diag":
return self.model.covariances_[component_index][0]
elif self.covariance_type == "spherical":
return self.model.covariances_[component_index]

def covariances(self) -> np.ndarray:
"""
Get the covariances of all components in the GMM.
:return: The covariances of all components.
"""
if self.covariance_type == "full":
return np.array([cov[0][0] for cov in self.model.covariances_])
elif self.covariance_type == "tied":
return np.array([self.model.covariances_[0][0][0]] * self.n_components)
elif self.covariance_type == "diag":
return self.model.covariances_.flatten()
elif self.covariance_type == "spherical":
return self.model.covariances_
11 changes: 11 additions & 0 deletions s2generator/simulator/kalman_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
"""
Created on 2026/03/06 23:04:59
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/S2Generator
"""

from typing import Optional

import numpy as np
Loading