diff --git a/examples/timeseries/stock_doppelganger.py b/examples/timeseries/stock_doppelganger.py new file mode 100644 index 00000000..ec8d6bae --- /dev/null +++ b/examples/timeseries/stock_doppelganger.py @@ -0,0 +1,34 @@ + +# Importing necessary libraries +from ydata_synthetic.synthesizers.timeseries import TimeSeriesSynthesizer +from ydata_synthetic.preprocessing.timeseries import processed_stock +from ydata_synthetic.synthesizers import ModelParameters, TrainParameters +import pandas as pd +from os import path + +# Read the data +stock_data = processed_stock(path='../../data/stock_data.csv', seq_len=24) +stock_data = [pd.DataFrame(sd, columns = ["Open", "High", "Low", "Close", "Adj_Close", "Volume"]) for sd in stock_data] +stock_data = pd.concat(stock_data).reset_index(drop=True) + +# Define model parameters +model_args = ModelParameters(batch_size=100, + lr=0.001, + betas=(0.5, 0.9), + latent_dim=3, + gp_lambda=10, + pac=10) + +train_args = TrainParameters(epochs=500, sequence_length=24, + measurement_cols=["Open", "High", "Low", "Close", "Adj_Close", "Volume"]) + +# Training the DoppelGANger synthesizer +if path.exists('doppelganger_stock'): + model_dop_gan = TimeSeriesSynthesizer.load('doppelganger_stock') +else: + model_dop_gan = TimeSeriesSynthesizer(modelname='doppelganger', model_parameters=model_args) + model_dop_gan.fit(stock_data, train_args, num_cols=["Open", "High", "Low", "Close", "Adj_Close", "Volume"]) + +# Generating new synthetic samples +synth_data = model_dop_gan.sample(n_samples=500) +print(synth_data[0]) diff --git a/examples/timeseries/stock_timegan.py b/examples/timeseries/stock_timegan.py index 611952f1..49b20fed 100644 --- a/examples/timeseries/stock_timegan.py +++ b/examples/timeseries/stock_timegan.py @@ -4,62 +4,53 @@ # Importing necessary libraries from os import path -import pandas as pd +from ydata_synthetic.synthesizers.timeseries import TimeSeriesSynthesizer +from ydata_synthetic.preprocessing.timeseries import processed_stock +from ydata_synthetic.synthesizers import ModelParameters, TrainParameters import numpy as np +import pandas as pd import matplotlib.pyplot as plt -from ydata_synthetic.synthesizers import ModelParameters -from ydata_synthetic.preprocessing.timeseries import processed_stock -from ydata_synthetic.synthesizers.timeseries import TimeGAN - # Define model parameters -seq_len=24 -n_seq = 6 -hidden_dim=24 -gamma=1 - -noise_dim = 32 -dim = 128 -batch_size = 128 +gan_args = ModelParameters(batch_size=128, + lr=5e-4, + noise_dim=32, + layers_dim=128, + latent_dim=24, + gamma=1) -log_step = 100 -learning_rate = 5e-4 - -gan_args = ModelParameters(batch_size=batch_size, - lr=learning_rate, - noise_dim=noise_dim, - layers_dim=dim) +train_args = TrainParameters(epochs=50000, + sequence_length=24, + number_sequences=6) # Read the data -stock_data = processed_stock(path='../../data/stock_data.csv', seq_len=seq_len) -print(len(stock_data),stock_data[0].shape) +stock_data = pd.read_csv("../../data/stock_data.csv") +cols = list(stock_data.columns) # Training the TimeGAN synthesizer if path.exists('synthesizer_stock.pkl'): - synth = TimeGAN.load('synthesizer_stock.pkl') + synth = TimeSeriesSynthesizer.load('synthesizer_stock.pkl') else: - synth = TimeGAN(model_parameters=gan_args, hidden_dim=24, seq_len=seq_len, n_seq=n_seq, gamma=1) - synth.train(stock_data, train_steps=50000) + synth = TimeSeriesSynthesizer(modelname='timegan', model_parameters=gan_args) + synth.fit(stock_data, train_args, num_cols=cols) synth.save('synthesizer_stock.pkl') # Generating new synthetic samples -synth_data = synth.sample(len(stock_data)) -print(synth_data.shape) - -# Reshaping the data -cols = ['Open','High','Low','Close','Adj Close','Volume'] +stock_data_blocks = processed_stock(path='../../data/stock_data.csv', seq_len=24) +synth_data = synth.sample(n_samples=len(stock_data_blocks)) +print(synth_data[0].shape) # Plotting some generated samples. Both Synthetic and Original data are still standartized with values between [0,1] fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(15, 10)) axes=axes.flatten() time = list(range(1,25)) -obs = np.random.randint(len(stock_data)) +obs = np.random.randint(len(stock_data_blocks)) for j, col in enumerate(cols): - df = pd.DataFrame({'Real': stock_data[obs][:, j], - 'Synthetic': synth_data[obs][:, j]}) + df = pd.DataFrame({'Real': stock_data_blocks[obs][:, j], + 'Synthetic': synth_data[obs].iloc[:, j]}) df.plot(ax=axes[j], title = col, secondary_y='Synthetic data', style=['-', '--']) -fig.tight_layout() \ No newline at end of file +fig.tight_layout() diff --git a/src/ydata_synthetic/preprocessing/regular/processor.py b/src/ydata_synthetic/preprocessing/regular/processor.py index cdbabb97..cf7716a4 100644 --- a/src/ydata_synthetic/preprocessing/regular/processor.py +++ b/src/ydata_synthetic/preprocessing/regular/processor.py @@ -62,7 +62,7 @@ def fit(self, X: DataFrame) -> RegularDataProcessor: ("scaler", MinMaxScaler()), ]) self._cat_pipeline = Pipeline([ - ("encoder", OneHotEncoder(sparse=False, handle_unknown='ignore')), + ("encoder", OneHotEncoder(sparse_output=False, handle_unknown='ignore')), ]) self.num_pipeline.fit(X[self.num_cols]) if self.num_cols else zeros([len(X), 0]) diff --git a/src/ydata_synthetic/preprocessing/timeseries/doppelganger_processor.py b/src/ydata_synthetic/preprocessing/timeseries/doppelganger_processor.py new file mode 100644 index 00000000..4794ab48 --- /dev/null +++ b/src/ydata_synthetic/preprocessing/timeseries/doppelganger_processor.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +from typing import List, Optional +from dataclasses import dataclass + +from numpy import concatenate, ndarray, zeros, ones, expand_dims, reshape, sum as npsum, repeat, array_split, asarray +from pandas import DataFrame +from typeguard import typechecked + +from ydata_synthetic.preprocessing.regular.processor import RegularDataProcessor + + +@dataclass +class ColumnMetadata: + """ + Dataclass that stores the metadata of each column. + """ + discrete: bool + output_dim: int + name: str + + +@typechecked +class DoppelGANgerProcessor(RegularDataProcessor): + """ + Main class for class the DoppelGANger preprocessing. + It works like any other transformer in scikit learn with the methods fit, transform and inverse transform. + Args: + num_cols (list of strings): + List of names of numerical columns. + measurement_cols (list of strings): + List of measurement columns. + sequence_length (int): + Sequence length. + """ + SUPPORTED_MODEL = 'DoppelGANger' + + def __init__(self, num_cols: Optional[List[str]] = None, + cat_cols: Optional[List[str]] = None, + measurement_cols: Optional[List[str]] = None, + sequence_length: Optional[int] = None): + super().__init__(num_cols, cat_cols) + + if num_cols is None: + num_cols = [] + if cat_cols is None: + cat_cols = [] + if measurement_cols is None: + measurement_cols = [] + self.sequence_length = sequence_length + self._measurement_num_cols = [c for c in self.num_cols if c in measurement_cols] + self._measurement_cat_cols = [c for c in self.cat_cols if c in measurement_cols] + self._attribute_num_cols = [c for c in self.num_cols if c not in measurement_cols] + self._attribute_cat_cols = [c for c in self.cat_cols if c not in measurement_cols] + self._measurement_cols_metadata = None + self._attribute_cols_metadata = None + self._measurement_one_hot_cat_cols = None + self._attribute_one_hot_cat_cols = None + self._has_attributes = self._attribute_num_cols or self._attribute_cat_cols + + @property + def measurement_cols_metadata(self): + return self._measurement_cols_metadata + + @property + def attribute_cols_metadata(self): + return self._attribute_cols_metadata + + def add_gen_flag(self, data_features: ndarray, sample_len: int): + num_sample = data_features.shape[0] + length = data_features.shape[1] + if length % sample_len != 0: + raise Exception("length must be a multiple of sample_len") + data_gen_flag = ones((num_sample, length)) + data_gen_flag = expand_dims(data_gen_flag, 2) + shift_gen_flag = concatenate( + [data_gen_flag[:, 1:, :], + zeros((data_gen_flag.shape[0], 1, 1))], + axis=1) + data_gen_flag_t = reshape( + data_gen_flag, + [num_sample, int(length / sample_len), sample_len]) + data_gen_flag_t = npsum(data_gen_flag_t, 2) + data_gen_flag_t = data_gen_flag_t > 0.5 + data_gen_flag_t = repeat(data_gen_flag_t, sample_len, axis=1) + data_gen_flag_t = expand_dims(data_gen_flag_t, 2) + data_features = concatenate( + [data_features, + shift_gen_flag, + (1 - shift_gen_flag) * data_gen_flag_t], + axis=2) + + return data_features + + def transform(self, X: DataFrame) -> tuple[ndarray, ndarray]: + """Transforms the passed DataFrame with the fit DataProcessor. + Args: + X (DataFrame): + DataFrame used to fit the processor parameters. + Should be aligned with the columns types defined in initialization. + Returns: + transformed (ndarray, ndarray): + Processed version of the passed DataFrame. + """ + self._check_is_fitted() + + measurement_cols = self._measurement_num_cols + self._measurement_cat_cols + if not measurement_cols: + raise ValueError("At least one measurement column must be supplied.") + if not all(c in self.num_cols + self.cat_cols for c in measurement_cols): + raise ValueError("At least one of the supplied measurement columns does not exist in the dataset.") + if self.sequence_length is None: + raise ValueError("The sequence length is mandatory.") + + num_data = DataFrame(self.num_pipeline.transform(X[self.num_cols]) if self.num_cols else zeros([len(X), 0]), columns=self.num_cols) + one_hot_cat_cols = self.cat_pipeline.get_feature_names_out() + cat_data = DataFrame(self.cat_pipeline.transform(X[self.cat_cols]) if self.cat_cols else zeros([len(X), 0]), columns=one_hot_cat_cols) + + self._measurement_one_hot_cat_cols = [c for c in one_hot_cat_cols if c.split("_")[0] in self._measurement_cat_cols] + measurement_num_data = num_data[self._measurement_num_cols].to_numpy() if self._measurement_num_cols else zeros([len(X), 0]) + self._measurement_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name=c) for c in self._measurement_num_cols] + measurement_cat_data = cat_data[self._measurement_one_hot_cat_cols].to_numpy() if self._measurement_one_hot_cat_cols else zeros([len(X), 0]) + self._measurement_cols_metadata += [ColumnMetadata(discrete=True, output_dim=X[c].nunique(), name=c) for c in self._measurement_cat_cols] + data_features = concatenate([measurement_num_data, measurement_cat_data], axis=1) + + if self._has_attributes: + self._attribute_one_hot_cat_cols = [c for c in one_hot_cat_cols if c.split("_")[0] in self._attribute_cat_cols] + attribute_num_data = num_data[self._attribute_num_cols].to_numpy() if self._attribute_num_cols else zeros([len(X), 0]) + self._attribute_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name=c) for c in self._attribute_num_cols] + attribute_cat_data = cat_data[self._attribute_one_hot_cat_cols].to_numpy() if self._attribute_one_hot_cat_cols else zeros([len(X), 0]) + self._attribute_cols_metadata += [ColumnMetadata(discrete=True, output_dim=X[c].nunique(), name=c) for c in self._attribute_cat_cols] + data_attributes = concatenate([attribute_num_data, attribute_cat_data], axis=1) + else: + self._attribute_one_hot_cat_cols = [] + data_attributes = zeros((data_features.shape[0], 1)) + self._attribute_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name="zeros_attribute")] + + num_samples = int(X.shape[0] / self.sequence_length) + data_features = asarray(array_split(data_features, num_samples)) + data_attributes = asarray(array_split(data_attributes, num_samples)) + + data_features = self.add_gen_flag(data_features, sample_len=self.sequence_length) + self._measurement_cols_metadata += [ColumnMetadata(discrete=True, output_dim=2, name="gen_flags")] + return data_features, data_attributes.mean(axis=1) + + def inverse_transform(self, X_features: ndarray, X_attributes: ndarray) -> list[DataFrame]: + """Inverts the data transformation pipelines on a passed DataFrame. + Args: + X_features (ndarray): + Numpy array with the measurement data to be brought back to the original format. + X_attributes (ndarray): + Numpy array with the attribute data to be brought back to the original format. + Returns: + result (DataFrame): + DataFrame with all performed transformations inverted. + """ + self._check_is_fitted() + + num_samples = X_attributes.shape[0] + if self._has_attributes: + X_attributes = repeat(X_attributes.reshape((num_samples, 1, X_attributes.shape[1])), repeats=X_features.shape[1], axis=1) + generated_data = concatenate((X_features, X_attributes), axis=2) + else: + generated_data = X_features + output_cols = self._measurement_num_cols + self._measurement_one_hot_cat_cols + self._attribute_num_cols + self._attribute_one_hot_cat_cols + one_hot_cat_cols = self._measurement_one_hot_cat_cols + self._attribute_one_hot_cat_cols + + samples = [] + for i in range(num_samples): + df = DataFrame(generated_data[i], columns=output_cols) + df_num = self.num_pipeline.inverse_transform(df[self.num_cols]) if self.num_cols else zeros([len(df), 0]) + df_cat = self.cat_pipeline.inverse_transform(df[one_hot_cat_cols].round(0)) if self.cat_cols else zeros([len(df), 0]) + df = DataFrame(concatenate((df_num, df_cat), axis=1), columns=self.num_cols+self.cat_cols) + df = df.loc[:, self._col_order_] + for col in df.columns: + df[col] = df[col].astype(self._types[col]) + samples.append(df) + + return samples diff --git a/src/ydata_synthetic/synthesizers/base.py b/src/ydata_synthetic/synthesizers/base.py index aa9275fc..9e4e8358 100644 --- a/src/ydata_synthetic/synthesizers/base.py +++ b/src/ydata_synthetic/synthesizers/base.py @@ -26,21 +26,23 @@ from ydata_synthetic.preprocessing.timeseries.timeseries_processor import ( TimeSeriesDataProcessor, TimeSeriesModels) from ydata_synthetic.preprocessing.regular.ctgan_processor import CTGANDataProcessor +from ydata_synthetic.preprocessing.timeseries.doppelganger_processor import DoppelGANgerProcessor from ydata_synthetic.synthesizers.saving_keras import make_keras_picklable _model_parameters = ['batch_size', 'lr', 'betas', 'layers_dim', 'noise_dim', 'n_cols', 'seq_len', 'condition', 'n_critic', 'n_features', 'tau_gs', 'generator_dims', 'critic_dims', 'l2_scale', - 'latent_dim', 'gp_lambda', 'pac'] + 'latent_dim', 'gp_lambda', 'pac', 'gamma'] _model_parameters_df = [128, 1e-4, (None, None), 128, 264, None, None, None, 1, None, 0.2, [256, 256], - [256, 256], 1e-6, 128, 10.0, 10] + [256, 256], 1e-6, 128, 10.0, 10, 1] _train_parameters = ['cache_prefix', 'label_dim', 'epochs', 'sample_interval', - 'labels', 'n_clusters', 'epsilon', 'log_frequency'] + 'labels', 'n_clusters', 'epsilon', 'log_frequency', + 'measurement_cols', 'sequence_length', 'number_sequences'] ModelParameters = namedtuple('ModelParameters', _model_parameters, defaults=_model_parameters_df) -TrainParameters = namedtuple('TrainParameters', _train_parameters, defaults=('', None, 300, 50, None, 10, 0.005, True)) +TrainParameters = namedtuple('TrainParameters', _train_parameters, defaults=('', None, 300, 50, None, 10, 0.005, True, None, 1, 1)) @typechecked class BaseModel(ABC): @@ -185,6 +187,12 @@ def fit(self, epsilon = train_arguments.epsilon self.processor = CTGANDataProcessor(n_clusters=n_clusters, epsilon=epsilon, num_cols=num_cols, cat_cols=cat_cols).fit(data) + elif self.__MODEL__ == DoppelGANgerProcessor.SUPPORTED_MODEL: + measurement_cols = train_arguments.measurement_cols + sequence_length = train_arguments.sequence_length + self.processor = DoppelGANgerProcessor(num_cols=num_cols, cat_cols=cat_cols, + measurement_cols=measurement_cols, + sequence_length=sequence_length).fit(data) else: print(f'A DataProcessor is not available for the {self.__MODEL__}.') diff --git a/src/ydata_synthetic/synthesizers/timeseries/__init__.py b/src/ydata_synthetic/synthesizers/timeseries/__init__.py index a3523536..0309d113 100644 --- a/src/ydata_synthetic/synthesizers/timeseries/__init__.py +++ b/src/ydata_synthetic/synthesizers/timeseries/__init__.py @@ -1,5 +1,5 @@ -from ydata_synthetic.synthesizers.timeseries.timegan.model import TimeGAN +from ydata_synthetic.synthesizers.timeseries.model import TimeSeriesSynthesizer __all__ = [ - 'TimeGAN', + 'TimeSeriesSynthesizer' ] diff --git a/src/ydata_synthetic/synthesizers/timeseries/doppelganger/__init__.py b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ydata_synthetic/synthesizers/timeseries/doppelganger/doppelganger.py b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/doppelganger.py new file mode 100644 index 00000000..3706c254 --- /dev/null +++ b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/doppelganger.py @@ -0,0 +1,569 @@ +import tensorflow as tf +import numpy as np +from tqdm import tqdm +import math +from joblib import dump + + +class DoppelGANgerNetwork(object): + """ + Adapted from https://github.com/fjxmlzn/DoppelGANger/blob/master/gan/doppelganger.py. + """ + def __init__(self, + sess, + epoch, + batch_size, + data_feature, + data_attribute, + sample_len, + generator, + discriminator, + d_gp_coe, + num_packing, + attr_discriminator=None, + attr_d_gp_coe=None, + g_attr_d_coe=None, + attribute_latent_dim=5, + feature_latent_dim=5, + fix_feature_network=False, + g_lr=0.001, + g_beta1=0.5, + d_lr=0.001, + d_beta1=0.5, + attr_d_lr=0.001, + attr_d_beta1=0.5): + """Constructor of DoppelGANger + Args: + sess: A tensorflow session + epoch: Number of training epochs + batch_size: Training batch size + data_feature: Training features, in numpy float32 array format. + The size is [(number of training samples) x (maximum length) x + (total dimension of features)]. + data_attribute: Training attributes, in numpy float32 array format. + The size is [(number of training samples) x (total dimension + of attributes)] + sample_len: The time series batch size + generator: An instance of network.DoppelGANgerGenerator + discriminator: An instance of network.Discriminator + d_gp_coe: Weight of gradient penalty loss in Wasserstein GAN + num_packing: Packing degree in PacGAN (a method for solving mode + collapse in NeurIPS 2018, see https://arxiv.org/abs/1712.04086) + attr_discriminator: An instance of network.AttrDiscriminator. None + if you do not want to use this auxiliary discriminator + attr_d_gp_coe: Weight of gradient penalty loss in Wasserstein GAN + for the auxiliary discriminator + g_attr_d_coe: Weight of the auxiliary discriminator in the + generator's loss + attribute_latent_dim: The dimension of noise for generating + attributes + feature_latent_dim: The dimension of noise for generating + features + fix_feature_network: Whether to fix the feature network during + training + g_lr: The learning rate in Adam for training the generator + g_beta1: The beta1 in Adam for training the generator + d_lr: The learning rate in Adam for training the discriminator + d_beta1: The beta1 in Adam for training the discriminator + attr_d_lr: The learning rate in Adam for training the auxiliary + discriminator + attr_d_beta1: The beta1 in Adam for training the auxiliary + discriminator + """ + self.sess = sess + self.epoch = epoch + self.batch_size = batch_size + self.data_feature = data_feature + self.data_attribute = data_attribute + self.sample_len = sample_len + self.generator = generator + self.discriminator = discriminator + self.attr_discriminator = attr_discriminator + self.d_gp_coe = d_gp_coe + self.attr_d_gp_coe = attr_d_gp_coe + self.g_attr_d_coe = g_attr_d_coe + self.num_packing = num_packing + self.attribute_latent_dim = attribute_latent_dim + self.feature_latent_dim = feature_latent_dim + self.fix_feature_network = fix_feature_network + self.g_lr = g_lr + self.g_beta1 = g_beta1 + self.d_lr = d_lr + self.d_beta1 = d_beta1 + self.attr_d_lr = attr_d_lr + self.attr_d_beta1 = attr_d_beta1 + + if self.data_feature is not None: + if self.data_feature.shape[1] % self.sample_len != 0: + raise Exception("Length must be a multiple of sample_len.") + self.sample_time = int(self.data_feature.shape[1] / self.sample_len) + self.sample_feature_dim = self.data_feature.shape[2] + if self.data_attribute is not None: + self.sample_attribute_dim = self.data_attribute.shape[1] + + self.EPS = 1e-8 + + def build(self): + self.build_connection() + self.build_loss() + + def build_connection(self): + # build connections for train-fake + self.g_feature_input_noise_train_pl_l = [] + for i in range(self.num_packing): + self.g_feature_input_noise_train_pl_l.append( + tf.compat.v1.placeholder( + tf.float32, + [None, self.sample_time, self.feature_latent_dim], + name="g_feature_input_noise_train_{}".format(i))) + self.g_real_attribute_input_noise_train_pl_l = [] + for i in range(self.num_packing): + self.g_real_attribute_input_noise_train_pl_l.append( + tf.compat.v1.placeholder( + tf.float32, + [None, self.attribute_latent_dim], + name="g_real_attribute_input_noise_train_{}".format(i))) + self.g_addi_attribute_input_noise_train_pl_l = [] + for i in range(self.num_packing): + self.g_addi_attribute_input_noise_train_pl_l.append( + tf.compat.v1.placeholder( + tf.float32, + [None, self.attribute_latent_dim], + name=("g_addi_attribute_input_noise_train_{}".format(i)))) + self.g_feature_input_data_train_pl_l = [] + for i in range(self.num_packing): + self.g_feature_input_data_train_pl_l.append( + tf.compat.v1.placeholder( + tf.float32, + [None, self.sample_len * self.sample_feature_dim], + name="g_feature_input_data_train_{}".format(i))) + + self.g_output_feature_train_tf_l = [] + self.g_output_attribute_train_tf_l = [] + self.g_output_gen_flag_train_tf_l = [] + self.g_output_length_train_tf_l = [] + self.g_output_argmax_train_tf_l = [] + for i in range(self.num_packing): + (g_output_feature_train_tf, g_output_attribute_train_tf, + g_output_gen_flag_train_tf, g_output_length_train_tf, + g_output_argmax_train_tf) = \ + self.generator.build( + self.g_real_attribute_input_noise_train_pl_l[i], + self.g_addi_attribute_input_noise_train_pl_l[i], + self.g_feature_input_noise_train_pl_l[i], + self.g_feature_input_data_train_pl_l[i]) + + if self.fix_feature_network: + g_output_feature_train_tf = tf.zeros_like( + g_output_feature_train_tf) + g_output_gen_flag_train_tf = tf.zeros_like( + g_output_gen_flag_train_tf) + + self.g_output_feature_train_tf_l.append( + g_output_feature_train_tf) + self.g_output_attribute_train_tf_l.append( + g_output_attribute_train_tf) + self.g_output_gen_flag_train_tf_l.append( + g_output_gen_flag_train_tf) + self.g_output_length_train_tf_l.append( + g_output_length_train_tf) + self.g_output_argmax_train_tf_l.append( + g_output_argmax_train_tf) + self.g_output_feature_train_tf = tf.concat( + self.g_output_feature_train_tf_l, + axis=1) + self.g_output_attribute_train_tf = tf.concat( + self.g_output_attribute_train_tf_l, + axis=1) + + self.d_fake_train_tf = self.discriminator.build( + self.g_output_feature_train_tf, + self.g_output_attribute_train_tf) + + if self.attr_discriminator is not None: + self.attr_d_fake_train_tf = self.attr_discriminator.build( + self.g_output_attribute_train_tf) + + self.real_feature_pl_l = [] + for i in range(self.num_packing): + real_feature_pl = tf.compat.v1.placeholder( + tf.float32, + [None, + self.sample_time * self.sample_len, + self.sample_feature_dim], + name="real_feature_{}".format(i)) + if self.fix_feature_network: + real_feature_pl = tf.zeros_like( + real_feature_pl) + self.real_feature_pl_l.append(real_feature_pl) + self.real_attribute_pl_l = [] + for i in range(self.num_packing): + real_attribute_pl = tf.compat.v1.placeholder( + tf.float32, + [None, self.sample_attribute_dim], + name="real_attribute_{}".format(i)) + self.real_attribute_pl_l.append(real_attribute_pl) + self.real_feature_pl = tf.concat( + self.real_feature_pl_l, + axis=1) + self.real_attribute_pl = tf.concat( + self.real_attribute_pl_l, + axis=1) + + self.d_real_train_tf = self.discriminator.build( + self.real_feature_pl, + self.real_attribute_pl) + self.d_real_test_tf = self.discriminator.build( + self.real_feature_pl, + self.real_attribute_pl) + + if self.attr_discriminator is not None: + self.attr_d_real_train_tf = self.attr_discriminator.build( + self.real_attribute_pl) + + self.g_real_attribute_input_noise_test_pl = tf.compat.v1.placeholder( + tf.float32, + [None, self.attribute_latent_dim], + name="g_real_attribute_input_noise_test") + self.g_addi_attribute_input_noise_test_pl = tf.compat.v1.placeholder( + tf.float32, + [None, self.attribute_latent_dim], + name="g_addi_attribute_input_noise_test") + self.g_feature_input_noise_test_pl = tf.compat.v1.placeholder( + tf.float32, + [None, None, self.feature_latent_dim], + name="g_feature_input_noise_test") + + self.g_feature_input_data_test_teacher_pl = tf.compat.v1.placeholder( + tf.float32, + [None, None, self.sample_len * self.sample_feature_dim], + name="g_feature_input_data_test_teacher") + (self.g_output_feature_test_teacher_tf, + self.g_output_attribute_test_teacher_tf, + self.g_output_gen_flag_test_teacher_tf, + self.g_output_length_test_teacher_tf, _) = \ + self.generator.build( + self.g_real_attribute_input_noise_test_pl, + self.g_addi_attribute_input_noise_test_pl, + self.g_feature_input_noise_test_pl, + self.g_feature_input_data_test_teacher_pl) + + self.g_feature_input_data_test_free_pl = tf.compat.v1.placeholder( + tf.float32, + [None, self.sample_len * self.sample_feature_dim], + name="g_feature_input_data_test_free") + (self.g_output_feature_test_free_tf, + self.g_output_attribute_test_free_tf, + self.g_output_gen_flag_test_free_tf, + self.g_output_length_test_free_tf, _) = \ + self.generator.build( + self.g_real_attribute_input_noise_test_pl, + self.g_addi_attribute_input_noise_test_pl, + self.g_feature_input_noise_test_pl, + self.g_feature_input_data_test_free_pl) + + self.given_attribute_attribute_pl = tf.compat.v1.placeholder( + tf.float32, + [None, self.sample_attribute_dim], + name="given_attribute") + (self.g_output_feature_given_attribute_test_free_tf, + self.g_output_attribute_given_attribute_test_free_tf, + self.g_output_gen_flag_given_attribute_test_free_tf, + self.g_output_length_given_attribute_test_free_tf, _) = \ + self.generator.build( + None, + self.g_addi_attribute_input_noise_test_pl, + self.g_feature_input_noise_test_pl, + self.g_feature_input_data_test_free_pl, + attribute=self.given_attribute_attribute_pl) + + def build_loss(self): + batch_size = tf.shape(input=self.g_feature_input_noise_train_pl_l[0])[0] + + self.g_loss_d = -tf.reduce_mean(input_tensor=self.d_fake_train_tf) + if self.attr_discriminator is not None: + self.g_loss_attr_d = -tf.reduce_mean(input_tensor=self.attr_d_fake_train_tf) + self.g_loss = (self.g_loss_d + + self.g_attr_d_coe * self.g_loss_attr_d) + else: + self.g_loss = self.g_loss_d + + self.d_loss_fake = tf.reduce_mean(input_tensor=self.d_fake_train_tf) + self.d_loss_fake_unflattened = self.d_fake_train_tf + self.d_loss_real = -tf.reduce_mean(input_tensor=self.d_real_train_tf) + self.d_loss_real_unflattened = -self.d_real_train_tf + alpha_dim2 = tf.random.uniform( + shape=[batch_size, 1], + minval=0., + maxval=1.) + alpha_dim3 = tf.expand_dims(alpha_dim2, 2) + differences_input_feature = (self.g_output_feature_train_tf - + self.real_feature_pl) + interpolates_input_feature = (self.real_feature_pl + + alpha_dim3 * differences_input_feature) + differences_input_attribute = (self.g_output_attribute_train_tf - + self.real_attribute_pl) + interpolates_input_attribute = (self.real_attribute_pl + + (alpha_dim2 * + differences_input_attribute)) + gradients = tf.gradients( + ys=self.discriminator.build( + interpolates_input_feature, + interpolates_input_attribute), + xs=[interpolates_input_feature, interpolates_input_attribute]) + slopes1 = tf.reduce_sum(input_tensor=tf.square(gradients[0]), + axis=[1, 2]) + slopes2 = tf.reduce_sum(input_tensor=tf.square(gradients[1]), + axis=[1]) + slopes = tf.sqrt(slopes1 + slopes2 + self.EPS) + self.d_loss_gp = tf.reduce_mean(input_tensor=(slopes - 1.)**2) + self.d_loss_gp_unflattened = (slopes - 1.)**2 + + self.d_loss = (self.d_loss_fake + + self.d_loss_real + + self.d_gp_coe * self.d_loss_gp) + + self.d_loss_unflattened = (self.d_loss_fake_unflattened + + self.d_loss_real_unflattened + + self.d_gp_coe * self.d_loss_gp_unflattened) + + if self.attr_discriminator is not None: + self.attr_d_loss_fake = tf.reduce_mean(input_tensor=self.attr_d_fake_train_tf) + self.attr_d_loss_fake_unflattened = self.attr_d_fake_train_tf + self.attr_d_loss_real = -tf.reduce_mean(input_tensor=self.attr_d_real_train_tf) + self.attr_d_loss_real_unflattened = -self.attr_d_real_train_tf + alpha_dim2 = tf.random.uniform( + shape=[batch_size, 1], + minval=0., + maxval=1.) + differences_input_attribute = (self.g_output_attribute_train_tf - + self.real_attribute_pl) + interpolates_input_attribute = (self.real_attribute_pl + + (alpha_dim2 * + differences_input_attribute)) + gradients = tf.gradients( + ys=self.attr_discriminator.build( + interpolates_input_attribute), + xs=[interpolates_input_attribute]) + slopes1 = tf.reduce_sum(input_tensor=tf.square(gradients[0]), + axis=[1]) + slopes = tf.sqrt(slopes1 + self.EPS) + self.attr_d_loss_gp = tf.reduce_mean(input_tensor=(slopes - 1.)**2) + self.attr_d_loss_gp_unflattened = (slopes - 1.)**2 + + self.attr_d_loss = (self.attr_d_loss_fake + + self.attr_d_loss_real + + self.attr_d_gp_coe * self.attr_d_loss_gp) + + self.attr_d_loss_unflattened = \ + (self.attr_d_loss_fake_unflattened + + self.attr_d_loss_real_unflattened + + self.attr_d_gp_coe * self.attr_d_loss_gp_unflattened) + + self.g_op = \ + tf.compat.v1.train.AdamOptimizer(self.g_lr, self.g_beta1)\ + .minimize( + self.g_loss, + var_list=self.generator.trainable_vars) + + self.d_op = \ + tf.compat.v1.train.AdamOptimizer(self.d_lr, self.d_beta1)\ + .minimize( + self.d_loss, + var_list=self.discriminator.trainable_vars) + + if self.attr_discriminator is not None: + self.attr_d_op = \ + tf.compat.v1.train.AdamOptimizer(self.attr_d_lr, self.attr_d_beta1)\ + .minimize( + self.attr_d_loss, + var_list=self.attr_discriminator.trainable_vars) + + def sample_from(self, real_attribute_input_noise, + addi_attribute_input_noise, feature_input_noise, + feature_input_data, given_attribute=None, + return_gen_flag_feature=False): + features = [] + attributes = [] + gen_flags = [] + lengths = [] + round_ = int( + math.ceil(float(feature_input_noise.shape[0]) / self.batch_size)) + for i in range(round_): + if given_attribute is None: + if feature_input_data.ndim == 2: + (sub_features, sub_attributes, sub_gen_flags, + sub_lengths) = self.sess.run( + [self.g_output_feature_test_free_tf, + self.g_output_attribute_test_free_tf, + self.g_output_gen_flag_test_free_tf, + self.g_output_length_test_free_tf], + feed_dict={ + self.g_real_attribute_input_noise_test_pl: + real_attribute_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_addi_attribute_input_noise_test_pl: + addi_attribute_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_noise_test_pl: + feature_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_data_test_free_pl: + feature_input_data[ + i * self.batch_size: + (i + 1) * self.batch_size]}) + else: + (sub_features, sub_attributes, sub_gen_flags, + sub_lengths) = self.sess.run( + [self.g_output_feature_test_teacher_tf, + self.g_output_attribute_test_teacher_tf, + self.g_output_gen_flag_test_teacher_tf, + self.g_output_length_test_teacher_tf], + feed_dict={ + self.g_real_attribute_input_noise_test_pl: + real_attribute_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_addi_attribute_input_noise_test_pl: + addi_attribute_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_noise_test_pl: + feature_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_data_test_teacher_pl: + feature_input_data[ + i * self.batch_size: + (i + 1) * self.batch_size]}) + else: + (sub_features, sub_attributes, sub_gen_flags, + sub_lengths) = self.sess.run( + [self.g_output_feature_given_attribute_test_free_tf, + self.g_output_attribute_given_attribute_test_free_tf, + self.g_output_gen_flag_given_attribute_test_free_tf, + self.g_output_length_given_attribute_test_free_tf], + feed_dict={ + self.g_addi_attribute_input_noise_test_pl: + addi_attribute_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_noise_test_pl: + feature_input_noise[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.g_feature_input_data_test_free_pl: + feature_input_data[ + i * self.batch_size: + (i + 1) * self.batch_size], + self.given_attribute_attribute_pl: + given_attribute[ + i * self.batch_size: + (i + 1) * self.batch_size]}) + features.append(sub_features) + attributes.append(sub_attributes) + gen_flags.append(sub_gen_flags) + lengths.append(sub_lengths) + + features = np.concatenate(features, axis=0) + attributes = np.concatenate(attributes, axis=0) + gen_flags = np.concatenate(gen_flags, axis=0) + lengths = np.concatenate(lengths, axis=0) + + if not return_gen_flag_feature: + features = np.delete(features, [features.shape[2] - 2, features.shape[2] - 1], axis=2) + + assert len(gen_flags.shape) == 3 + assert gen_flags.shape[2] == 1 + gen_flags = gen_flags[:, :, 0] + + return features, attributes, gen_flags, lengths + + def gen_attribute_input_noise(self, num_sample): + return np.random.normal( + size=[num_sample, self.attribute_latent_dim]) + + def gen_feature_input_noise(self, num_sample, length=1): + return np.random.normal( + size=[num_sample, length, self.feature_latent_dim]) + + def gen_feature_input_data_free(self, num_sample): + return np.zeros( + [num_sample, self.sample_len * self.sample_feature_dim], + dtype=np.float32) + + def train(self): + tf.compat.v1.global_variables_initializer().run() + + batch_num = self.data_feature.shape[0] // self.batch_size + + for _ in tqdm(range(self.epoch)): + data_id = np.random.choice( + self.data_feature.shape[0], + size=(self.data_feature.shape[0], self.num_packing)) + + for batch_id in range(batch_num): + feed_dict = {} + for i in range(self.num_packing): + batch_data_id = data_id[batch_id * self.batch_size: + (batch_id + 1) * self.batch_size, + i] + batch_data_feature = self.data_feature[batch_data_id] + batch_data_attribute = self.data_attribute[batch_data_id] + + batch_real_attribute_input_noise = \ + self.gen_attribute_input_noise(self.batch_size) + batch_addi_attribute_input_noise = \ + self.gen_attribute_input_noise(self.batch_size) + batch_feature_input_noise = \ + self.gen_feature_input_noise( + self.batch_size, self.sample_time) + batch_feature_input_data = \ + self.gen_feature_input_data_free(self.batch_size) + + feed_dict[self.real_feature_pl_l[i]] = \ + batch_data_feature + feed_dict[self.real_attribute_pl_l[i]] = \ + batch_data_attribute + feed_dict[self. + g_real_attribute_input_noise_train_pl_l[i]] = \ + batch_real_attribute_input_noise + feed_dict[self. + g_addi_attribute_input_noise_train_pl_l[i]] = \ + batch_addi_attribute_input_noise + feed_dict[self.g_feature_input_noise_train_pl_l[i]] = \ + batch_feature_input_noise + feed_dict[self.g_feature_input_data_train_pl_l[i]] = \ + batch_feature_input_data + + self.sess.run(self.d_op, feed_dict=feed_dict) + if self.attr_discriminator is not None: + self.sess.run(self.attr_d_op, feed_dict=feed_dict) + + self.sess.run(self.g_op, feed_dict=feed_dict) + + def save(self, path): + dump({ + "epoch": self.epoch, + "batch_size": self.batch_size, + "sample_len": self.sample_len, + "d_gp_coe": self.d_gp_coe, + "attr_d_gp_coe": self.attr_d_gp_coe, + "g_attr_d_coe": self.g_attr_d_coe, + "num_packing": self.num_packing, + "attribute_latent_dim": self.attribute_latent_dim, + "feature_latent_dim": self.feature_latent_dim, + "fix_feature_network": self.fix_feature_network, + "g_lr": self.g_lr, + "g_beta1": self.g_beta1, + "d_lr": self.d_lr, + "d_beta1": self.d_beta1, + "attr_d_lr": self.attr_d_lr, + "attr_d_beta1": self.attr_d_beta1, + "sample_time": self.sample_time, + "sample_feature_dim": self.sample_feature_dim, + "sample_attribute_dim": self.sample_attribute_dim + }, path) diff --git a/src/ydata_synthetic/synthesizers/timeseries/doppelganger/model.py b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/model.py new file mode 100644 index 00000000..a86a2d48 --- /dev/null +++ b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/model.py @@ -0,0 +1,188 @@ +from pandas import DataFrame +import tensorflow as tf +import os +from joblib import dump, load + +from ydata_synthetic.synthesizers.timeseries.doppelganger.network import DoppelGANgerGenerator, AttrDiscriminator, Discriminator +from ydata_synthetic.synthesizers.timeseries.doppelganger.doppelganger import DoppelGANgerNetwork +from ydata_synthetic.synthesizers.base import BaseGANModel, ModelParameters, TrainParameters +from ydata_synthetic.preprocessing.timeseries.doppelganger_processor import DoppelGANgerProcessor + +class DoppelGANger(BaseGANModel): + """ + DoppelGANger model. + Based on the paper https://dl.acm.org/doi/pdf/10.1145/3419394.3423643. + + Args: + model_parameters: Parameters used to create the DoppelGANger model. + """ + __MODEL__ = 'DoppelGANger' + + def __init__(self, model_parameters: ModelParameters): + super().__init__(model_parameters) + self._model_parameters = model_parameters + self._gan_model = None + self._tf_session = None + self._sequence_length = None + tf.compat.v1.disable_eager_execution() + + def fit(self, data: DataFrame, + train_arguments: TrainParameters, + num_cols: list[str] | None = None, + cat_cols: list[str] | None = None): + """ + Fits the DoppelGANger model. + + Args: + data: A pandas DataFrame with the data to be synthesized. + train_arguments: DoppelGANger training arguments. + num_cols: List of columns to be handled as numerical + cat_cols: List of columns to be handled as categorical + """ + super().fit(data=data, num_cols=num_cols, cat_cols=cat_cols, train_arguments=train_arguments) + + self._sequence_length = train_arguments.sequence_length + if data.shape[0] % self._sequence_length != 0: + raise ValueError("The sequence length must be a multiple of the number of samples.") + + data_features, data_attributes = self.processor.transform(data) + measurement_cols_metadata = self.processor.measurement_cols_metadata + attribute_cols_metadata = self.processor.attribute_cols_metadata + + generator = DoppelGANgerGenerator( + feed_back=False, + noise=True, + measurement_cols_metadata=measurement_cols_metadata, + attribute_cols_metadata=attribute_cols_metadata, + sample_len=self._sequence_length) + discriminator = Discriminator() + attr_discriminator = AttrDiscriminator() + + self._tf_session = tf.compat.v1.Session() + with self._tf_session.as_default() as sess: + self._gan_model = DoppelGANgerNetwork( + sess=sess, + epoch=train_arguments.epochs, + batch_size=self.batch_size, + data_feature=data_features, + data_attribute=data_attributes, + sample_len=self._sequence_length, + generator=generator, + discriminator=discriminator, + attr_discriminator=attr_discriminator, + d_gp_coe=self.gp_lambda, + attr_d_gp_coe=self.gp_lambda, + g_attr_d_coe=self.gp_lambda, + num_packing=self.pac, + attribute_latent_dim=self.latent_dim, + feature_latent_dim=self.latent_dim, + fix_feature_network=False, + g_lr=self.g_lr, + g_beta1=self.beta_1, + d_lr=self.d_lr, + d_beta1=self.beta_1, + attr_d_lr=self.d_lr, + attr_d_beta1=self.beta_1) + self._gan_model.build() + self._gan_model.train() + + def sample(self, n_samples: int): + """ + Samples new data from the DoppelGANger. + + Args: + n_samples: Number of samples to be generated. + """ + if n_samples <= 0: + raise ValueError("Invalid number of samples.") + + real_attribute_input_noise = self._gan_model.gen_attribute_input_noise(n_samples) + addi_attribute_input_noise = self._gan_model.gen_attribute_input_noise(n_samples) + feature_input_noise = self._gan_model.gen_feature_input_noise(n_samples) + input_data = self._gan_model.gen_feature_input_data_free(n_samples) + + with self._tf_session.as_default() as sess: + self._gan_model.sess = sess + data_features, data_attributes, _, _ = self._gan_model.sample_from( + real_attribute_input_noise, addi_attribute_input_noise, + feature_input_noise, input_data) + + return self.processor.inverse_transform(data_features, data_attributes) + + def save(self, path): + """ + Save the DoppelGANger model in a directory. + + Args: + path: Path of the directory where the files will be saved. + """ + saver = tf.compat.v1.train.Saver() + with self._tf_session.as_default() as sess: + saver.save(sess, os.path.join(path, "doppelganger"), write_meta_graph=False) + self._gan_model.save(os.path.join(path, "doppelganger_network.pkl")) + dump({ + "processor": self.processor.__dict__, + "measurement_cols_metadata": self.processor.measurement_cols_metadata, + "attribute_cols_metadata": self.processor.attribute_cols_metadata + }, os.path.join(path, "doppelganger_metadata.pkl")) + + @staticmethod + def load(path): + """ + Load the DoppelGANger model from a directory. + Only the required components to sample new data are loaded. + + Args: + class_dict: Path of the directory where the files were saved. + """ + dp_model = DoppelGANger(ModelParameters()) + dp_network_parms = load(os.path.join(path, "doppelganger_network.pkl")) + dp_metadata = load(os.path.join(path, "doppelganger_metadata.pkl")) + + dp_model.processor = DoppelGANgerProcessor() + dp_model.processor.__dict__ = dp_metadata["processor"] + + generator = DoppelGANgerGenerator( + feed_back=False, + noise=True, + measurement_cols_metadata=dp_metadata["measurement_cols_metadata"], + attribute_cols_metadata=dp_metadata["attribute_cols_metadata"], + sample_len=dp_network_parms["sample_len"]) + discriminator = Discriminator() + attr_discriminator = AttrDiscriminator() + + with tf.compat.v1.Session().as_default() as sess: + dp_model._gan_model = DoppelGANgerNetwork( + sess=sess, + epoch=dp_network_parms["epoch"], + batch_size=dp_network_parms["batch_size"], + data_feature=None, + data_attribute=None, + sample_len=dp_network_parms["sample_len"], + generator=generator, + discriminator=discriminator, + attr_discriminator=attr_discriminator, + d_gp_coe=dp_network_parms["d_gp_coe"], + attr_d_gp_coe=dp_network_parms["attr_d_gp_coe"], + g_attr_d_coe=dp_network_parms["g_attr_d_coe"], + num_packing=dp_network_parms["num_packing"], + attribute_latent_dim=dp_network_parms["attribute_latent_dim"], + feature_latent_dim=dp_network_parms["feature_latent_dim"], + fix_feature_network=dp_network_parms["fix_feature_network"], + g_lr=dp_network_parms["g_lr"], + g_beta1=dp_network_parms["g_beta1"], + d_lr=dp_network_parms["d_lr"], + d_beta1=dp_network_parms["d_beta1"], + attr_d_lr=dp_network_parms["attr_d_lr"], + attr_d_beta1=dp_network_parms["attr_d_beta1"]) + + dp_model._gan_model.sample_time = dp_network_parms["sample_time"] + dp_model._gan_model.sample_feature_dim = dp_network_parms["sample_feature_dim"] + dp_model._gan_model.sample_attribute_dim = dp_network_parms["sample_attribute_dim"] + dp_model._gan_model.build() + + saver = tf.compat.v1.train.Saver() + saver.restore(sess, tf.compat.v1.train.latest_checkpoint(path)) + dp_model._tf_session = sess + + return dp_model diff --git a/src/ydata_synthetic/synthesizers/timeseries/doppelganger/network.py b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/network.py new file mode 100644 index 00000000..774f909a --- /dev/null +++ b/src/ydata_synthetic/synthesizers/timeseries/doppelganger/network.py @@ -0,0 +1,382 @@ +import tensorflow as tf +import numpy as np + + +class Network(object): + """ + Adapted from https://github.com/fjxmlzn/DoppelGANger/blob/master/gan/network.py. + """ + def __init__(self, scope_name): + self.scope_name = scope_name + + @property + def trainable_vars(self): + return tf.compat.v1.get_collection( + tf.compat.v1.GraphKeys.TRAINABLE_VARIABLES, + scope=self.scope_name) + + +class Discriminator(Network): + """ + Adapted from https://github.com/fjxmlzn/DoppelGANger/blob/master/gan/network.py. + """ + def __init__(self, + num_layers=5, num_units=200, + scope_name="discriminator", *args, **kwargs): + super(Discriminator, self).__init__( + scope_name=scope_name, *args, **kwargs) + self.num_layers = num_layers + self.num_units = num_units + + def build(self, input_feature, input_attribute): + with tf.compat.v1.variable_scope(self.scope_name, reuse=tf.compat.v1.AUTO_REUSE): + input_feature = tf.keras.layers.Flatten()(input_feature) + input_attribute = tf.keras.layers.Flatten()(input_attribute) + input_ = tf.concat([input_feature, input_attribute], 1) + layers = [input_feature, input_attribute, input_] + for i in range(self.num_layers - 1): + with tf.compat.v1.variable_scope("layer{}".format(i)): + layers.append(tf.keras.layers.Dense(units=self.num_units)(layers[-1])) + layers.append(tf.nn.relu(layers[-1])) + with tf.compat.v1.variable_scope("layer{}".format(self.num_layers - 1)): + layers.append(tf.keras.layers.Dense(units=1)(layers[-1])) + layers.append(tf.squeeze(layers[-1], 1)) + return layers[-1] + + +class AttrDiscriminator(Network): + """ + Adapted from https://github.com/fjxmlzn/DoppelGANger/blob/master/gan/network.py. + """ + def __init__(self, + num_layers=5, num_units=200, + scope_name="attrDiscriminator", *args, **kwargs): + super(AttrDiscriminator, self).__init__( + scope_name=scope_name, *args, **kwargs) + self.num_layers = num_layers + self.num_units = num_units + + def build(self, input_attribute): + with tf.compat.v1.variable_scope(self.scope_name, reuse=tf.compat.v1.AUTO_REUSE): + input_attribute = tf.keras.layers.Flatten()(input_attribute) + layers = [input_attribute] + for i in range(self.num_layers - 1): + with tf.compat.v1.variable_scope("layer{}".format(i)): + layers.append(tf.keras.layers.Dense(units=self.num_units)(layers[-1])) + layers.append(tf.nn.relu(layers[-1])) + with tf.compat.v1.variable_scope("layer{}".format(self.num_layers - 1)): + layers.append(tf.keras.layers.Dense(units=1)(layers[-1])) + layers.append(tf.squeeze(layers[-1], 1)) + return layers[-1] + + +class DoppelGANgerGenerator(Network): + """ + Adapted from https://github.com/fjxmlzn/DoppelGANger/blob/master/gan/network.py. + """ + def __init__(self, feed_back, noise, + measurement_cols_metadata, attribute_cols_metadata, sample_len, + attribute_num_units=100, attribute_num_layers=3, + feature_num_units=100, feature_num_layers=1, + scope_name="DoppelGANgerGenerator", *args, **kwargs): + super(DoppelGANgerGenerator, self).__init__( + scope_name=scope_name, *args, **kwargs) + self.feed_back = feed_back + self.noise = noise + self.attribute_num_units = attribute_num_units + self.attribute_num_layers = attribute_num_layers + self.feature_num_units = feature_num_units + self.measurement_cols_metadata = measurement_cols_metadata + self.attribute_cols_metadata = attribute_cols_metadata + self.feature_num_layers = feature_num_layers + self.sample_len = sample_len + self.feature_out_dim = (np.sum([t.output_dim for t in measurement_cols_metadata]) * + self.sample_len) + self.attribute_out_dim = np.sum([t.output_dim for t in attribute_cols_metadata]) + if not self.noise and not self.feed_back: + raise Exception("noise and feed_back should have at least " + "one True") + + self.real_attribute_outputs = self.attribute_cols_metadata # [] + self.addi_attribute_outputs = [] + self.real_attribute_out_dim = sum([c.output_dim for c in self.attribute_cols_metadata]) + self.addi_attribute_out_dim = 0 + + self.gen_flag_id = len(self.measurement_cols_metadata) - 1 + self.STR_REAL = "real" + self.STR_ADDI = "addi" + + # noqa: MC0001 + def build(self, attribute_input_noise, addi_attribute_input_noise, + feature_input_noise, feature_input_data, attribute=None): + with tf.compat.v1.variable_scope(self.scope_name, reuse=tf.compat.v1.AUTO_REUSE): + batch_size = tf.shape(input=feature_input_noise)[0] + + if attribute is None: + all_attribute = [] + all_discrete_attribute = [] + if len(self.addi_attribute_outputs) > 0: + all_attribute_input_noise = \ + [attribute_input_noise, + addi_attribute_input_noise] + all_attribute_outputs = \ + [self.real_attribute_outputs, + self.addi_attribute_outputs] + all_attribute_part_name = \ + [self.STR_REAL, self.STR_ADDI] + all_attribute_out_dim = \ + [self.real_attribute_out_dim, + self.addi_attribute_out_dim] + else: + all_attribute_input_noise = [attribute_input_noise] + all_attribute_outputs = [self.real_attribute_outputs] + all_attribute_part_name = [self.STR_REAL] + all_attribute_out_dim = [self.real_attribute_out_dim] + else: + all_attribute = [attribute] + all_discrete_attribute = [attribute] + if len(self.addi_attribute_outputs) > 0: + all_attribute_input_noise = \ + [addi_attribute_input_noise] + all_attribute_outputs = \ + [self.addi_attribute_outputs] + all_attribute_part_name = \ + [self.STR_ADDI] + all_attribute_out_dim = [self.addi_attribute_out_dim] + else: + all_attribute_input_noise = [] + all_attribute_outputs = [] + all_attribute_part_name = [] + all_attribute_out_dim = [] + + for part_i, _ in enumerate(all_attribute_input_noise): + with tf.compat.v1.variable_scope( + "attribute_{}".format(all_attribute_part_name[part_i]), + reuse=tf.compat.v1.AUTO_REUSE): + + if len(all_discrete_attribute) > 0: + layers = [tf.concat( + [all_attribute_input_noise[part_i]] + + all_discrete_attribute, + axis=1)] + else: + layers = [all_attribute_input_noise[part_i]] + + for i in range(self.attribute_num_layers - 1): + with tf.compat.v1.variable_scope("layer{}".format(i)): + layers.append(tf.keras.layers.Dense(units=self.attribute_num_units)(layers[-1])) + layers.append(tf.nn.relu(layers[-1])) + layers.append(tf.keras.layers.BatchNormalization(momentum=0.9, + epsilon=1e-5, + scale=True)(layers[-1])) + with tf.compat.v1.variable_scope( + "layer{}".format(self.attribute_num_layers - 1), + reuse=tf.compat.v1.AUTO_REUSE): + part_attribute = [] + part_discrete_attribute = [] + for i in range(len(all_attribute_outputs[part_i])): + with tf.compat.v1.variable_scope("output{}".format(i), + reuse=tf.compat.v1.AUTO_REUSE): + output = all_attribute_outputs[part_i][i] + + sub_output_ori = tf.keras.layers.Dense(units=output.output_dim)(layers[-1]) + if output.discrete: + sub_output = tf.nn.softmax(sub_output_ori) + sub_output_discrete = tf.one_hot( + tf.argmax(input=sub_output, axis=1), + output.output_dim) + else: + sub_output = tf.nn.sigmoid(sub_output_ori) + sub_output_discrete = sub_output + part_attribute.append(sub_output) + part_discrete_attribute.append( + sub_output_discrete) + part_attribute = tf.concat(part_attribute, axis=1) + part_discrete_attribute = tf.concat( + part_discrete_attribute, axis=1) + part_attribute = tf.reshape( + part_attribute, + [batch_size, all_attribute_out_dim[part_i]]) + part_discrete_attribute = tf.reshape( + part_discrete_attribute, + [batch_size, all_attribute_out_dim[part_i]]) + # batch_size * dim + + part_discrete_attribute = tf.stop_gradient( + part_discrete_attribute) + + all_attribute.append(part_attribute) + all_discrete_attribute.append(part_discrete_attribute) + + all_attribute = tf.concat(all_attribute, axis=1) + all_discrete_attribute = tf.concat(all_discrete_attribute, axis=1) + all_attribute = tf.reshape( + all_attribute, + [batch_size, self.attribute_out_dim]) + all_discrete_attribute = tf.reshape( + all_discrete_attribute, + [batch_size, self.attribute_out_dim]) + + with tf.compat.v1.variable_scope("feature", reuse=tf.compat.v1.AUTO_REUSE): + all_cell = [] + for i in range(self.feature_num_layers): + with tf.compat.v1.variable_scope("unit{}".format(i), + reuse=tf.compat.v1.AUTO_REUSE): + cell = tf.keras.layers.LSTMCell(units=self.feature_num_units) + all_cell.append(cell) + rnn_network = tf.keras.layers.StackedRNNCells(all_cell) + + feature_input_data_dim = \ + len(feature_input_data.get_shape().as_list()) + if feature_input_data_dim == 3: + feature_input_data_reshape = tf.transpose( + a=feature_input_data, perm=[1, 0, 2]) + feature_input_noise_reshape = tf.transpose( + a=feature_input_noise, perm=[1, 0, 2]) + + initial_state = tf.random.normal( + shape=(self.feature_num_layers, + 2, + batch_size, + self.feature_num_units), + mean=0.0, stddev=1.0) + initial_state = tf.unstack(initial_state, axis=0) + initial_state = tuple( + [tf.compat.v1.nn.rnn_cell.LSTMStateTuple( + initial_state[idx][0], initial_state[idx][1]) + for idx in range(self.feature_num_layers)]) + + time = feature_input_noise.get_shape().as_list()[1] + if time is None: + time = tf.shape(input=feature_input_noise)[1] + + def compute(i, state, last_output, all_output, + gen_flag, all_gen_flag, all_cur_argmax, + last_cell_output): + input_all = [all_discrete_attribute] + if self.noise: + input_all.append(feature_input_noise_reshape[i]) + if self.feed_back: + if feature_input_data_dim == 3: + input_all.append(feature_input_data_reshape[i]) + else: + input_all.append(last_output) + input_all = tf.concat(input_all, axis=1) + + cell_new_output, new_state = rnn_network(input_all, state) + new_output_all = [] + id_ = 0 + for j in range(self.sample_len): + for k, _ in enumerate(self.measurement_cols_metadata): + with tf.compat.v1.variable_scope("output{}".format(id_), + reuse=tf.compat.v1.AUTO_REUSE): + output = self.measurement_cols_metadata[k] + sub_output = tf.keras.layers.Dense(units=output.output_dim)(cell_new_output) + if output.discrete: + sub_output = tf.nn.softmax(sub_output) + else: + sub_output = tf.nn.sigmoid(sub_output) + new_output_all.append(sub_output) + id_ += 1 + new_output = tf.concat(new_output_all, axis=1) + + for j in range(self.sample_len): + all_gen_flag = all_gen_flag.write( + i * self.sample_len + j, gen_flag) + cur_gen_flag = tf.cast(tf.equal(tf.argmax( + input=new_output_all[(j * len(self.measurement_cols_metadata) + + self.gen_flag_id)], + axis=1), 0), dtype=tf.float32) + cur_gen_flag = tf.reshape(cur_gen_flag, [-1, 1]) + all_cur_argmax = all_cur_argmax.write( + i * self.sample_len + j, + tf.argmax( + input=new_output_all[(j * len(self.measurement_cols_metadata) + + self.gen_flag_id)], + axis=1)) + gen_flag = gen_flag * cur_gen_flag + + return (i + 1, + new_state, + new_output, + all_output.write(i, new_output), + gen_flag, + all_gen_flag, + all_cur_argmax, + cell_new_output) + + (i, _, _, feature, _, gen_flag, cur_argmax, _) = \ + tf.while_loop( + cond=lambda a, b, c, d, e, f, g, h: + tf.logical_and(a < time, + tf.equal(tf.reduce_max(input_tensor=e), 1)), + body=compute, + loop_vars=(0, + initial_state, + feature_input_data if feature_input_data_dim == 2 + else feature_input_data_reshape[0], + tf.TensorArray(tf.float32, time), + tf.ones((batch_size, 1)), + tf.TensorArray(tf.float32, time * self.sample_len), + tf.TensorArray(tf.int64, time * self.sample_len), + tf.zeros((batch_size, self.feature_num_units)))) + + def fill_rest(i, all_output, all_gen_flag, all_cur_argmax): + all_output = all_output.write( + i, tf.zeros((batch_size, self.feature_out_dim))) + + for j in range(self.sample_len): + all_gen_flag = all_gen_flag.write( + i * self.sample_len + j, + tf.zeros((batch_size, 1))) + all_cur_argmax = all_cur_argmax.write( + i * self.sample_len + j, + tf.zeros((batch_size,), dtype=tf.int64)) + return (i + 1, + all_output, + all_gen_flag, + all_cur_argmax) + + _, feature, gen_flag, cur_argmax = tf.while_loop( + cond=lambda a, b, c, d: a < time, + body=fill_rest, + loop_vars=(i, feature, gen_flag, cur_argmax)) + + feature = feature.stack() + # time * batch_size * (dim * sample_len) + gen_flag = gen_flag.stack() + # (time * sample_len) * batch_size * 1 + cur_argmax = cur_argmax.stack() + + gen_flag = tf.transpose(a=gen_flag, perm=[1, 0, 2]) + # batch_size * (time * sample_len) * 1 + cur_argmax = tf.transpose(a=cur_argmax, perm=[1, 0]) + # batch_size * (time * sample_len) + length = tf.reduce_sum(input_tensor=gen_flag, axis=[1, 2]) + # batch_size + + feature = tf.transpose(a=feature, perm=[1, 0, 2]) + # batch_size * time * (dim * sample_len) + gen_flag_t = tf.reshape( + gen_flag, + [batch_size, time, self.sample_len]) + # batch_size * time * sample_len + gen_flag_t = tf.reduce_sum(input_tensor=gen_flag_t, axis=[2]) + # batch_size * time + gen_flag_t = tf.cast(gen_flag_t > 0.5, dtype=tf.float32) + gen_flag_t = tf.expand_dims(gen_flag_t, 2) + # batch_size * time * 1 + gen_flag_t = tf.tile( + gen_flag_t, + [1, 1, self.feature_out_dim]) + # batch_size * time * (dim * sample_len) + # zero out the parts after sequence ends + feature = feature * gen_flag_t + feature = tf.reshape( + feature, + [batch_size, + time * self.sample_len, + self.feature_out_dim / self.sample_len]) + # batch_size * (time * sample_len) * dim + + return feature, all_attribute, gen_flag, length, cur_argmax diff --git a/src/ydata_synthetic/synthesizers/timeseries/model.py b/src/ydata_synthetic/synthesizers/timeseries/model.py new file mode 100644 index 00000000..1b985313 --- /dev/null +++ b/src/ydata_synthetic/synthesizers/timeseries/model.py @@ -0,0 +1,51 @@ +""" + Main time-series synthesizer class +""" +from enum import Enum, unique +import os +from joblib import load + +from tensorflow import config as tfconfig + +from ydata_synthetic.synthesizers.timeseries.timegan.model import TimeGAN +from ydata_synthetic.synthesizers.timeseries.doppelganger.model import DoppelGANger + + +@unique +class Model(Enum): + TIMEGAN = 'timegan' + DOPPELGANGER = 'doppelganger' + + __MAPPING__ = { + TIMEGAN : TimeGAN, + DOPPELGANGER: DoppelGANger + } + + @property + def function(self): + return self.__MAPPING__[self.value] + +class TimeSeriesSynthesizer(): + "Abstraction class " + def __new__(cls, modelname: str, model_parameters=None, **kwargs): + return Model(modelname).function(model_parameters, **kwargs) + + @staticmethod + def load(path): + """ + ### Description: + Loads a saved synthesizer from a pickle. + + ### Args: + `path` (str): Path to read the synthesizer pickle from. + """ + gpu_devices = tfconfig.list_physical_devices('GPU') + if len(gpu_devices) > 0: + try: + tfconfig.experimental.set_memory_growth(gpu_devices[0], True) + except (ValueError, RuntimeError): + # Invalid device or cannot modify virtual devices once initialized. + pass + if os.path.isdir(path): + return DoppelGANger.load(path) + return load(path) diff --git a/src/ydata_synthetic/synthesizers/timeseries/timegan/model.py b/src/ydata_synthetic/synthesizers/timeseries/timegan/model.py index 60464566..4648b479 100644 --- a/src/ydata_synthetic/synthesizers/timeseries/timegan/model.py +++ b/src/ydata_synthetic/synthesizers/timeseries/timegan/model.py @@ -2,10 +2,11 @@ TimeGAN class implemented accordingly with: Original code can be found here: https://bitbucket.org/mvdschaar/mlforhealthlabpub/src/master/alg/timegan/ """ -from tqdm import tqdm, trange +from tqdm import tqdm import numpy as np +from pandas import DataFrame -from tensorflow import function, GradientTape, sqrt, abs, reduce_mean, ones_like, zeros_like, convert_to_tensor,float32 +from tensorflow import function, GradientTape, sqrt, abs, reduce_mean, ones_like, zeros_like, convert_to_tensor, float32 from tensorflow import data as tfdata from tensorflow import nn from keras import (Model, Sequential, Input) @@ -13,7 +14,8 @@ from keras.optimizers import Adam from keras.losses import (BinaryCrossentropy, MeanSquaredError) -from ....synthesizers.base import BaseGANModel +from ydata_synthetic.synthesizers.base import BaseGANModel, ModelParameters, TrainParameters +from ydata_synthetic.preprocessing.timeseries.utils import real_data_loading def make_net(model, n_layers, hidden_units, output_units, net_type='GRU'): if net_type=='GRU': @@ -35,14 +37,51 @@ def make_net(model, n_layers, hidden_units, output_units, net_type='GRU'): class TimeGAN(BaseGANModel): - __MODEL__='TimeGAN' + __MODEL__ = 'TimeGAN' - def __init__(self, model_parameters, hidden_dim, seq_len, n_seq, gamma): - self.seq_len=seq_len - self.n_seq=n_seq - self.hidden_dim=hidden_dim - self.gamma=gamma + def __init__(self, model_parameters: ModelParameters): super().__init__(model_parameters) + self.seq_len = None + self.n_seq = None + self.hidden_dim = model_parameters.latent_dim + self.gamma = model_parameters.gamma + self.num_cols = None + + def fit(self, data: DataFrame, + train_arguments: TrainParameters, + num_cols: list[str] | None = None, + cat_cols: list[str] | None = None): + """ + Fits the TimeGAN model. + + Args: + data: A pandas DataFrame with the data to be synthesized. + train_arguments: TimeGAN training arguments. + num_cols: List of columns to be handled as numerical + cat_cols: List of columns to be handled as categorical + """ + super().fit(data=data, num_cols=num_cols, cat_cols=cat_cols, train_arguments=train_arguments) + if cat_cols: + raise NotImplementedError("TimeGAN does not support categorical features.") + self.num_cols = num_cols + self.seq_len = train_arguments.sequence_length + self.n_seq = train_arguments.number_sequences + processed_data = real_data_loading(data[self.num_cols].values, seq_len=self.seq_len) + self.train(data=processed_data, train_steps=train_arguments.epochs) + + def sample(self, n_samples: int): + """ + Samples new data from the TimeGAN. + + Args: + n_samples: Number of samples to be generated. + """ + Z_ = next(self.get_batch_noise(size=n_samples)) + records = self.generator(Z_) + data = [] + for i in range(records.shape[0]): + data.append(DataFrame(records[i], columns=self.num_cols)) + return data def define_gan(self): self.generator_aux=Generator(self.hidden_dim).build() @@ -220,9 +259,9 @@ def _generate_noise(self): while True: yield np.random.uniform(low=0, high=1, size=(self.seq_len, self.n_seq)) - def get_batch_noise(self): + def get_batch_noise(self, size=None): return iter(tfdata.Dataset.from_generator(self._generate_noise, output_types=float32) - .batch(self.batch_size) + .batch(self.batch_size if size is None else size) .repeat()) def train(self, data, train_steps): @@ -270,15 +309,6 @@ def train(self, data, train_steps): if step_d_loss > 0.15: step_d_loss = self.train_discriminator(X_, Z_, discriminator_opt) - def sample(self, n_samples): - steps = n_samples // self.batch_size + 1 - data = [] - for _ in trange(steps, desc='Synthetic data generation'): - Z_ = next(self.get_batch_noise()) - records = self.generator(Z_) - data.append(records) - return np.array(np.vstack(data)) - class Generator(Model): def __init__(self, hidden_dim, net_type='GRU'):