diff --git a/.gitignore b/.gitignore index bb7fb5c..ba09183 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ docs/assets/ *.nc *.pkl +TODO.md # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..93d9ca5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,20 @@ +pytest +numpy +scipy +xarray +bottleneck +matplotlib +seaborn +jupyter +pip +setuptools +sphinx +sphinx-autobuild +sphinx_rtd_theme +myst-parser +nbsphinx +sphinxcontrib-bibtex==1 +sphinx-copybutton +torch +derivative +tqdm \ No newline at end of file diff --git a/synthia/copulas/copula_gan.py b/synthia/copulas/copula_gan.py new file mode 100644 index 0000000..44a44d1 --- /dev/null +++ b/synthia/copulas/copula_gan.py @@ -0,0 +1,261 @@ +from typing import Literal, Union, Annotated +from .copula import Copula +from ..generators.gan import GAN +from derivative import dxdt +from tqdm import tqdm +import numpy as np +import torch +import zfit + +def derivative( + x: Annotated[torch.Tensor, 'CDF'], + t: Annotated[torch.Tensor, 'Support'] = None, + device: torch.device = torch.device('cpu') + ) -> torch.Tensor: + """ + Derivative of the CDF with respect to support t. + + Args: + x (Annotated[torch.Tensor, 'CDF']): CDF of the data. + t (Annotated[torch.Tensor, 'Support']): Support of the data. Will default to the interval [0,1]. + device (torch.device): Device to use for computation. (Default: torch.device('cpu')) + """ + if t is None: + t = torch.linspace(0,1,x.shape[0], device=device) + return dxdt(x.T, t).T + +def compute_cdf(kde_model, n_samples=1000): + """ + Computes the CDF of a KDE model using the cumulative trapezoidal rule. + + Args: + kde_model: KDE model to compute CDF of. + n_samples: Number of samples to use for computing the CDF. (Default: 1000) + """ + lower_limit = float(kde_model.space.limits[0]) + upper_limit = float(kde_model.space.limits[1]) + + support = torch.linspace(lower_limit, upper_limit, n_samples) + pdf_values = torch.Tensor([float(kde_model.pdf(x)) for x in support]) + + # Compute the CDF using cumulative trapezoidal rule + cdf_values = torch.cumulative_trapezoid(pdf_values, dx=(support[1] - support[0])) + + # Normalize the CDF to [0, 1] by dividing by the final value + cdf_values /= cdf_values[-1].clone() + #Add zero on beginning + cdf_values = torch.cat((torch.zeros(1), cdf_values)) + + return cdf_values, support + +def map_cdf(cdf_values, support): + """ + Maps the data in the support to its corresponding CDF value. + + Args: + cdf_values: CDF values to map to. + support: Support of the data. + """ + return lambda x: torch.from_numpy(np.interp(x, support, cdf_values)) + +def inverse_map_cdf(cdf_values, support): + """ + Computes the inverse CDF of a KDE model using linear interpolation. + + Args: + cdf_values: CDF values to compute inverse of. + support: Support of the data. + """ + return lambda x: np.interp(x, cdf_values, support) + +class GRU(torch.nn.Module): + def __init__(self, n_features: int, hidden_size: int = 1): + super().__init__() + self.gru = torch.nn.GRU( + input_size = n_features, + hidden_size = hidden_size, + batch_first=True + ) + self.linear = torch.nn.Linear(hidden_size, 1) + def forward(self, x: torch.Tensor) -> torch.Tensor: + x, _ = self.gru(x) + x = x[:,-1,:] + x = self.linear(x) + return x + +class CopulaGAN(Copula, GAN): + """ + Learns Copula from data using Generative Adversarial Networks. + """ + _support = None + _initialized = False + def __init__( + self, + generator_deep_layers: list[int] = [32, 32], + device: Literal['cpu', 'cuda', 'auto'] = 'auto', + optimizer: Literal['adam', 'sgd'] = 'adam', + generator_fake_size: int = 1000, + ) -> None: + super().__init__( + generator_deep_layers, + [], + device, + optimizer, + generator_fake_size + ) + + def initialize(self, X: torch.Tensor, verbose: bool = False, **kwargs): + if not self._initialized: + # Generate marginals and sample space + n_features = X.shape[1] + + # Create target KDEs and CDFs + target_kdes = [] + target_pdfs = [] + cdfs = [] + real_spaces = [] + real_U = [] + for i in range(n_features): + obs_space = zfit.Space('X' + str(i), limits=(X[:, i].min(), X[:, i].max())) + kde = zfit.pdf.KDE1DimGrid( + obs=obs_space, + data=zfit.Data.from_tensor(tensor=X[:, i], obs=obs_space) + ) + cdf, support = compute_cdf(kde, **kwargs) + real_U.append(map_cdf(cdf, support)(X[:,i])) + pdf = torch.from_numpy(kde.pdf( + torch.linspace(X[:,i].min().item(),X[:,i].max().item(),len(support)), + obs_space + ).numpy()) + target_kdes.append(kde) + cdfs.append(cdf) + real_spaces.append(obs_space) + target_pdfs.append(pdf) + + self.real_space = zfit.dimension.combine_spaces(*real_spaces) + self.target_kdes = target_kdes + self.target_cdf = torch.stack(cdfs, dim=1).float().to(self.device) + self.target_pdf = torch.stack(target_pdfs, dim=1).float().to(self.device) + self.real_U = torch.stack(real_U, dim=1).float().to(self.device) + self._initialized = True + else: + tqdm.write("Already initialized") if verbose else None + + @property + def support(self): + if hasattr(self, 'real_space'): + M_samples = self.target_cdf.shape[0] + self._support = torch.stack([ + torch.linspace(float(x_0), float(x_1), M_samples)\ + for x_0, x_1 in zip(self.real_space.limits[0][0],self.real_space.limits[1][0]) + ], dim=1).detach().cpu().numpy() + else: + raise AttributeError("Cannot compute support without real_space. Run initialize() first.") + return self._support + + def construct_discriminator( + self, + n_features: int, + hidden_size: int = 32, + ) -> torch.nn.Module: + """ + Constructs the discriminator for the GAN. It is a GRU that receives samples from CDFs as input + and outputs a single value. + + Input (batch_size, sequence_length, n_features) + Output (batch_size, 1) + + Args: + n_features (int): Number of features for the discriminator. + hidden_size (int): Hidden size for the GRU. (Default: 32) + """ + self.discriminator = GRU(n_features, hidden_size).to(self.device) + self.discriminator.apply(self.init_weights) + return self.discriminator + + def fit( + self, + X: Union[torch.Tensor, np.ndarray], + global_iterations: int, + discriminator_iterations: int = 1, + lr: float = 0.001, + batch_size: int = 32, + generator_kwargs: dict = {}, + discriminator_kwargs: dict = {}, + CDFN: int = 1000, + verbose: bool = True + ): + """ + Uses GAN to learn a Copula flow from vector U sampled from the marginals of an arbitrary dataset. + + If each variable $X_i$ in a dataset has a CDF denoted by $F_i$, then the marginals are defined as $U_i = F_i(X_i)$. + + Args: + X (Union[torch.Tensor, np.ndarray]): Dataset in format (n_samples, n_features). + global_iterations (int): Number of global iterations to train for. + discriminator_iterations (int): Number of discriminator iterations per global iteration. (Default: 1) + lr (float): Learning rate for the optimizer. (Default: 0.001) + batch_size (int): Batch size for training. (Default: 32) + generator_kwargs (dict): Keyword arguments to pass to the generator. (Default: {}) + discriminator_kwargs (dict): Keyword arguments to pass to the discriminator. (Default: {}) + CDFN (int): Number of samples to use for computing the CDF. (Default: 1000). + verbose (bool): Whether to show progress bar and print status. (Default: True) + """ + if isinstance(X, np.ndarray): + X = torch.from_numpy(X).float() + elif isinstance(X, torch.Tensor): + X = X.float() + else: + raise TypeError(f"Invalid type {type(X)} for X. Use torch.Tensor or np.array.") + + tqdm.write("Initializing") if verbose else None + self.initialize(X, n_samples = CDFN, verbose = verbose) + + self.construct_generator(X.shape[1], activation='sigmoid', **generator_kwargs) + self.construct_discriminator(X.shape[1], **discriminator_kwargs) + + with self.device: + + self.generator_optimizer = self.get_optimizer(self.opt, lr) + self.discriminator_optimizer = self.get_optimizer(self.opt, lr) + + #Train + for _ in tqdm(range(global_iterations)): + for __ in range(discriminator_iterations): + #Train Discriminator + rand = torch.rand(batch_size, self.latent_dimension, device=self.device) + fake_U = self.generator(rand).unsqueeze(0) + real_U = self.real_U[torch.randperm(self.real_U.shape[0], device=self.device)[:batch_size]].unsqueeze(0) + real_discriminant = self.discriminator(real_U) + fake_discriminant = self.discriminator(fake_U) + self.discriminator.zero_grad() + ( + self.loss(real_discriminant, torch.ones_like(real_discriminant)) +\ + self.loss(fake_discriminant, torch.zeros_like(fake_discriminant)) + ).backward() + self.discriminator_optimizer.step() + self.generator.zero_grad() + rand = torch.rand(batch_size, self.latent_dimension, device=self.device) + fake_U = self.generator(rand).unsqueeze(0) + fake_discriminant = self.discriminator(fake_U) + loss = self.loss(fake_discriminant, torch.ones_like(fake_discriminant)) + loss.backward() + self.generator_optimizer.step() + def sample_copula(self, n_samples: int, **kws) -> torch.Tensor: + return super().sample(n_samples, **kws) + + def sample(self, n_samples: int) -> np.ndarray: + """ + Samples from the learned Copula. + + Args: + n_samples (int): Number of samples to generate. + """ + U = self.sample_copula(n_samples).detach().cpu().numpy() + + target_cdf = self.target_cdf.detach().cpu().numpy() + + return np.vstack([inverse_map_cdf(target_cdf[:,i], self.support[:,i])(U[:,1]) for i in range (U.shape[1])]).T + + def generate(self, n_samples: int, **kws) -> np.ndarray: + return self.sample(n_samples, **kws) \ No newline at end of file diff --git a/synthia/generators/gan.py b/synthia/generators/gan.py new file mode 100644 index 0000000..a690eb7 --- /dev/null +++ b/synthia/generators/gan.py @@ -0,0 +1,234 @@ +from typing import Literal, Union +import torch +from torch.nn import Module +import numpy as np +import os +os.environ['CUDA_LAUNCH_BLOCKING'] = '1' + +class GAN(Module): + """ + PyTorch implementation of a Generative Adversarial Network. + """ + def __init__( + self, + generator_deep_layers: list[int] = [32, 32], + discriminator_deep_layers: list[int] = [32, 32], + device: Literal['cpu', 'cuda', 'auto'] = 'auto', + optimizer: Literal['adam', 'sgd'] = 'adam', + generator_fake_size: int = 1000, + )->None: + """ + Args: + generator_deep_layers (list[int]): Number of deep layers for the generator. (Default: [32, 32]) + discriminator_deep_layers (list[int]): Number of deep layers for the discriminator. (Default: [32, 32]) + device (Literal['cpu', 'cuda', 'auto']): Device to use for training. Use 'auto' to automatically select the device. + optimizer (Literal['adam', 'sgd']): Optimizer to use for training. + generator_fake_size (int): Number of fake samples to generate for the generator. (Default: 1000). + + Returns: + None + """ + super().__init__() + + self.device = None + match device: + case 'auto': + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + case 'cpu': + self.device = torch.device('cpu') + case 'cuda': + self.device = torch.device('cuda') + case _: + raise ValueError(f"Invalid device {device}. Use 'cpu', 'cuda' or 'auto'.") + + self.n_gen = generator_deep_layers + self.n_dis = discriminator_deep_layers + self.opt = optimizer + self.loss = torch.nn.BCEWithLogitsLoss() + self.fake_batch_size = generator_fake_size + + def init_weights(self, m: torch.nn.Module) -> None: + """ + Initializes weights for the generator and discriminator. + + Args: + m (torch.nn.Module): Module to initialize weights for. + + Returns: + None + """ + if isinstance(m, torch.nn.Linear): + torch.nn.init.xavier_uniform_(m.weight) + m.bias.data.fill_(0.1) + + def get_optimizer(self, optimizer_type: Literal['adam', 'sgd'], lr: float) -> torch.optim.Optimizer: + """ + Get the optimizer for the generator or discriminator. + + Args: + optimizer (Literal['adam', 'sgd']): The optimizer to get. + lr (float): The learning rate for the optimizer. + + Returns: + torch.optim.Optimizer: The optimizer. + """ + match optimizer_type: + case 'adam': + return torch.optim.Adam(self.generator.parameters(), lr=lr) + case 'sgd': + return torch.optim.SGD(self.generator.parameters(), lr=lr) + case _: + raise ValueError(f"Invalid optimizer {optimizer_type}. Use 'adam' or 'sgd'.") + + def construct_generator( + self, + n_features: int, + dropout: float = 0.1, + latent_dimension: int = 10, + activation: Literal['tanh', 'sigmoid'] = 'tanh' + ) -> torch.nn.Sequential: + """ + Constructs the generator for the GAN. + + Args: + n_features (int): Number of features for the generator. + dropout (float): Dropout probability for the generator. + latent_dimension (int): Dimension of the latent space. + activation (Literal['tanh', 'sigmoid']): Activation function for the generator. + """ + self.latent_dimension = latent_dimension + match activation: + case 'tanh': + activation = torch.nn.Tanh() + case 'sigmoid': + activation = torch.nn.Sigmoid() + case _: + raise ValueError(f"Invalid activation {activation}. Use 'tanh' or 'sigmoid'.") + #Create Generator + self.deep_gen_layers = [latent_dimension] + self.n_gen + [n_features] + self.deep_gen_layers = list(zip(self.deep_gen_layers[:-1], self.deep_gen_layers[1:])) + self.generator = torch.nn.Sequential( + *[x for i, j in self.deep_gen_layers for x in [torch.nn.Linear(i, j), torch.nn.LeakyReLU()]], + torch.nn.Dropout(dropout), + torch.nn.Linear(self.deep_gen_layers[-1][1], self.deep_gen_layers[-1][1]), + activation + ) + #Send to device + self.generator.to(self.device) + #Initialize weights + self.generator.apply(self.init_weights) + + def construct_discriminator( + self, + n_features: int + )->torch.nn.Sequential: + """ + Constructs the discriminator for the GAN. + + Args: + n_features (int): Number of features for the discriminator. + """ + #Create Discriminator + self.deep_dis_layers = [n_features] + self.n_dis + [2] + self.deep_dis_layers = list(zip(self.deep_dis_layers[:-1], self.deep_dis_layers[1:])) + self.discriminator = torch.nn.Sequential( + *[x for i,j in self.deep_dis_layers for x in [torch.nn.Linear(i, j), torch.nn.LeakyReLU()]], + torch.nn.Linear(2,1) + ) + #Send to device + self.discriminator.to(self.device) + #Initialize weights + self.discriminator.apply(self.init_weights) + + def fit( + self, + X: Union[torch.Tensor, np.array], + global_iterations: int, + discriminator_iterations: int = 1, + lr: float = 0.001, + batch_size: int = 32, + generator_kwargs: dict = {}, + discriminator_kwargs: dict = {} + ) -> tuple[torch.Tensor, torch.Tensor]: + """ + Fits the model to data. + + Args: + X (Union[torch.Tensor, np.array]): Data to fit the model to. + global_iterations (int): Number of global iterations for training. + discriminator_iterations (int): Number of discriminator iterations for training. + lr (float): Learning rate for the optimizer. + batch_size (int): Batch size for training. + generator_kwargs (dict): Keyword arguments for the generator. See :func:`construct_generator`. + discriminator_kwargs (dict): Keyword arguments for the discriminator. See :func:`construct_discriminator`. + + Returns: + tuple[torch.Tensor, torch.Tensor]: Generator and discriminator loss. + """ + + #Check X values + assert X.max() <= 1 and X.min() >= -1, "Values must be within [-1,1]. Try using np.tanh first." + + self.construct_discriminator(X.shape[1], **discriminator_kwargs) + self.construct_generator(X.shape[1], **generator_kwargs) + + with self.device: + if isinstance(X, np.ndarray): + X = torch.from_numpy(X).float() + elif isinstance(X, torch.Tensor): + X = X.float() + else: + raise TypeError(f"Invalid type {type(X)} for X. Use torch.Tensor or np.array.") + X = X.to(self.device) + self.n_features = X.shape[1] + + self.generator_optimizer = self.get_optimizer(self.opt, lr) + self.discriminator_optimizer = self.get_optimizer(self.opt, lr) + + #Train GAN + for _ in range(global_iterations): + for i in range(discriminator_iterations): + + #Sample from X + real = X[torch.randperm(X.shape[0])[:batch_size]] + #At the end the batch size might be smaller than the specified batch size + actual_batch_size = real.shape[0] + fake = self.generator(torch.randn(actual_batch_size, self.latent_dimension)) + + self.discriminator.zero_grad() + disc_real = self.discriminator(real) + disc_fake = self.discriminator(fake) + + loss_real = self.loss(disc_real, torch.ones(actual_batch_size, 1)) + #Inserting 1 for y effectively calculates -log(D(x)), as the other term on + #the loss vanishes as it is proportional to y + + loss_fake = self.loss(disc_fake, torch.zeros(actual_batch_size, 1)) + #In a similar fashion, inserting 0 for y yields -log(1-D(G(z))) + loss_discriminator = loss_real + loss_fake + loss_discriminator.backward() + self.discriminator_optimizer.step() + + self.generator.zero_grad() + new_fake = self.generator(torch.randn(self.fake_batch_size, self.latent_dimension)) + new_disc_fake = self.discriminator(new_fake) + loss_generator = self.loss(new_disc_fake, torch.ones(self.fake_batch_size, 1)) + #Inserting 0 for y calculates -log(D(G(z))) + loss_generator.backward() + self.generator_optimizer.step() + + return loss_discriminator, loss_generator + + def sample(self, n_samples: int) -> torch.Tensor: + """ + Generates samples from the copula. + + Args: + n_samples (int): Number of samples to generate. + + Returns: + torch.Tensor: Samples from the copula. + """ + with torch.no_grad(): + with self.device: + return self.generator(torch.randn(n_samples, self.latent_dimension)) \ No newline at end of file