Skip to content

Commit

Permalink
fix: improvements to the doppelganger model
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardodcpereira committed Sep 3, 2023
1 parent ce75895 commit 6422500
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 77 deletions.
143 changes: 111 additions & 32 deletions src/ydata_synthetic/preprocessing/timeseries/doppelganger_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from typing import List, Optional
from dataclasses import dataclass

from numpy import concatenate, ndarray, zeros, ones, expand_dims, reshape, sum as npsum, repeat, array_split, asarray
from numpy import concatenate, ndarray, zeros, ones, expand_dims, reshape, sum as npsum, repeat, array_split, asarray, amin, amax, stack
from pandas import DataFrame
from typeguard import typechecked
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder

from ydata_synthetic.preprocessing.regular.processor import RegularDataProcessor
from ydata_synthetic.preprocessing.base_processor import BaseProcessor


@dataclass
Expand All @@ -18,10 +20,11 @@ class ColumnMetadata:
discrete: bool
output_dim: int
name: str
real: bool = True


@typechecked
class DoppelGANgerProcessor(RegularDataProcessor):
class DoppelGANgerProcessor(BaseProcessor):
"""
Main class for class the DoppelGANger preprocessing.
It works like any other transformer in scikit learn with the methods fit, transform and inverse transform.
Expand All @@ -38,7 +41,8 @@ class DoppelGANgerProcessor(RegularDataProcessor):
def __init__(self, num_cols: Optional[List[str]] = None,
cat_cols: Optional[List[str]] = None,
measurement_cols: Optional[List[str]] = None,
sequence_length: Optional[int] = None):
sequence_length: Optional[int] = None,
sample_length: Optional[int] = None):
super().__init__(num_cols, cat_cols)

if num_cols is None:
Expand All @@ -47,7 +51,15 @@ def __init__(self, num_cols: Optional[List[str]] = None,
cat_cols = []
if measurement_cols is None:
measurement_cols = []

self._col_order_ = None
self.sequence_length = sequence_length
self.sample_length = sample_length

if self.sequence_length is not None and self.sample_length is not None:
if self.sequence_length % self.sample_length != 0:
raise ValueError("The sequence length must be a multiple of the sample length.")

self._measurement_num_cols = [c for c in self.num_cols if c in measurement_cols]
self._measurement_cat_cols = [c for c in self.cat_cols if c in measurement_cols]
self._attribute_num_cols = [c for c in self.num_cols if c not in measurement_cols]
Expand All @@ -56,7 +68,8 @@ def __init__(self, num_cols: Optional[List[str]] = None,
self._attribute_cols_metadata = None
self._measurement_one_hot_cat_cols = None
self._attribute_one_hot_cat_cols = None
self._has_attributes = self._attribute_num_cols or self._attribute_cat_cols
self._has_attributes = bool(self._attribute_num_cols or self._attribute_cat_cols)
self._eps = 1e-4

@property
def measurement_cols_metadata(self):
Expand All @@ -69,8 +82,6 @@ def attribute_cols_metadata(self):
def add_gen_flag(self, data_features: ndarray, sample_len: int):
num_sample = data_features.shape[0]
length = data_features.shape[1]
if length % sample_len != 0:
raise Exception("length must be a multiple of sample_len")
data_gen_flag = ones((num_sample, length))
data_gen_flag = expand_dims(data_gen_flag, 2)
shift_gen_flag = concatenate(
Expand All @@ -92,17 +103,17 @@ def add_gen_flag(self, data_features: ndarray, sample_len: int):

return data_features

def transform(self, X: DataFrame) -> tuple[ndarray, ndarray]:
"""Transforms the passed DataFrame with the fit DataProcessor.
# pylint: disable=W0106
def fit(self, X: DataFrame) -> DoppelGANgerProcessor:
"""Fits the data processor to a passed DataFrame.
Args:
X (DataFrame):
DataFrame used to fit the processor parameters.
Should be aligned with the columns types defined in initialization.
Should be aligned with the num/cat columns defined in initialization.
Returns:
transformed (ndarray, ndarray):
Processed version of the passed DataFrame.
self (DoppelGANgerProcessor): The fitted data processor.
"""
self._check_is_fitted()
self._validate_cols(X.columns)

measurement_cols = self._measurement_num_cols + self._measurement_cat_cols
if not measurement_cols:
Expand All @@ -111,51 +122,118 @@ def transform(self, X: DataFrame) -> tuple[ndarray, ndarray]:
raise ValueError("At least one of the supplied measurement columns does not exist in the dataset.")
if self.sequence_length is None:
raise ValueError("The sequence length is mandatory.")

self._col_order_ = [c for c in X.columns if c in self.num_cols + self.cat_cols]
self._types = X.dtypes
self._num_pipeline = Pipeline([
("scaler", MinMaxScaler()),
])
self._cat_pipeline = Pipeline([
("encoder", OneHotEncoder(sparse_output=False, handle_unknown='ignore', drop='if_binary')),
])
self._num_pipeline.fit(X[self._attribute_num_cols]) if self._attribute_num_cols else zeros([len(X), 0])
self._cat_pipeline.fit(X[self.cat_cols]) if self.cat_cols else zeros([len(X), 0])

num_data = DataFrame(self.num_pipeline.transform(X[self.num_cols]) if self.num_cols else zeros([len(X), 0]), columns=self.num_cols)
one_hot_cat_cols = self.cat_pipeline.get_feature_names_out()
cat_data = DataFrame(self.cat_pipeline.transform(X[self.cat_cols]) if self.cat_cols else zeros([len(X), 0]), columns=one_hot_cat_cols)
return self

def transform(self, X: DataFrame) -> tuple[ndarray, ndarray]:
"""Transforms the passed DataFrame with the fit DataProcessor.
Args:
X (DataFrame):
DataFrame used to fit the processor parameters.
Should be aligned with the columns types defined in initialization.
Returns:
transformed (ndarray, ndarray):
Processed version of the passed DataFrame.
"""
self._check_is_fitted()

one_hot_cat_cols = self._cat_pipeline.get_feature_names_out() if self.cat_cols else []
cat_data = DataFrame(self._cat_pipeline.transform(X[self.cat_cols]) if self.cat_cols else zeros([len(X), 0]), columns=one_hot_cat_cols)

self._measurement_one_hot_cat_cols = [c for c in one_hot_cat_cols if c.split("_")[0] in self._measurement_cat_cols]
measurement_num_data = num_data[self._measurement_num_cols].to_numpy() if self._measurement_num_cols else zeros([len(X), 0])
self._measurement_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name=c) for c in self._measurement_num_cols]
self._measurement_cols_metadata = [ColumnMetadata(discrete=False,
output_dim=1,
name=c) for c in self._measurement_num_cols]
measurement_cat_data = cat_data[self._measurement_one_hot_cat_cols].to_numpy() if self._measurement_one_hot_cat_cols else zeros([len(X), 0])
self._measurement_cols_metadata += [ColumnMetadata(discrete=True, output_dim=X[c].nunique(), name=c) for c in self._measurement_cat_cols]
data_features = concatenate([measurement_num_data, measurement_cat_data], axis=1)
self._measurement_cols_metadata += [ColumnMetadata(discrete=True,
output_dim=X[c].nunique() if X[c].nunique() != 2 else 1,
name=c) for c in self._measurement_cat_cols]
data_features = concatenate([X[self._measurement_num_cols].to_numpy(), measurement_cat_data], axis=1)

if self._has_attributes:
self._attribute_one_hot_cat_cols = [c for c in one_hot_cat_cols if c.split("_")[0] in self._attribute_cat_cols]
attribute_num_data = num_data[self._attribute_num_cols].to_numpy() if self._attribute_num_cols else zeros([len(X), 0])
self._attribute_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name=c) for c in self._attribute_num_cols]
attribute_num_data = self._num_pipeline.transform(X[self._attribute_num_cols]) if self._attribute_num_cols else zeros([len(X), 0])
self._attribute_cols_metadata = [ColumnMetadata(discrete=False,
output_dim=1,
name=c) for c in self._attribute_num_cols]
attribute_cat_data = cat_data[self._attribute_one_hot_cat_cols].to_numpy() if self._attribute_one_hot_cat_cols else zeros([len(X), 0])
self._attribute_cols_metadata += [ColumnMetadata(discrete=True, output_dim=X[c].nunique(), name=c) for c in self._attribute_cat_cols]
self._attribute_cols_metadata += [ColumnMetadata(discrete=True,
output_dim=X[c].nunique() if X[c].nunique() != 2 else 1,
name=c) for c in self._attribute_cat_cols]
data_attributes = concatenate([attribute_num_data, attribute_cat_data], axis=1)
else:
data_attributes = zeros((data_features.shape[0], 0))
self._attribute_one_hot_cat_cols = []
data_attributes = zeros((data_features.shape[0], 1))
self._attribute_cols_metadata = [ColumnMetadata(discrete=False, output_dim=1, name="zeros_attribute")]
self._attribute_cols_metadata = []

num_samples = int(X.shape[0] / self.sequence_length)
data_features = asarray(array_split(data_features, num_samples))
data_attributes = asarray(array_split(data_attributes, num_samples))

data_features = self.add_gen_flag(data_features, sample_len=self.sequence_length)
additional_attributes = []
for ix, col_meta in enumerate(self._measurement_cols_metadata):
if not col_meta.discrete:
col_data = X[col_meta.name].to_numpy().reshape(num_samples, -1)
max_col = amax(col_data, axis=1) + self._eps
min_col = amin(col_data, axis=1) - self._eps
additional_attributes.append((max_col + min_col) / 2.0)
additional_attributes.append((max_col - min_col) / 2.0)
self._attribute_cols_metadata += [ColumnMetadata(discrete=False,
output_dim=1,
name=f"addi_{col_meta.name}_{ix}",
real=False) for ix in range (1, 3)]
max_col = expand_dims(max_col, axis=1)
min_col = expand_dims(min_col, axis=1)
data_features[:, :, ix] = (data_features[:, :, ix] - min_col) / (max_col - min_col)

data_attributes = asarray(array_split(data_attributes, num_samples))
data_attributes = data_attributes.mean(axis=1)

if additional_attributes:
additional_attributes = stack(additional_attributes, axis=1)
data_attributes = concatenate([data_attributes, additional_attributes], axis=1)

data_features = self.add_gen_flag(data_features, sample_len=self.sample_length)
self._measurement_cols_metadata += [ColumnMetadata(discrete=True, output_dim=2, name="gen_flags")]
return data_features, data_attributes.mean(axis=1)
return data_features, data_attributes

def inverse_transform(self, X_features: ndarray, X_attributes: ndarray) -> list[DataFrame]:
def inverse_transform(self, X_features: ndarray, X_attributes: ndarray, gen_flags: ndarray) -> list[DataFrame]:
"""Inverts the data transformation pipelines on a passed DataFrame.
Args:
X_features (ndarray):
Numpy array with the measurement data to be brought back to the original format.
X_attributes (ndarray):
Numpy array with the attribute data to be brought back to the original format.
gen_flags (ndarray):
Numpy array with the flags indicating the activation of features.
Returns:
result (DataFrame):
DataFrame with all performed transformations inverted.
"""
self._check_is_fitted()

addi_cols_idx = addi_cols_idx_start = sum([c.output_dim for c in self._attribute_cols_metadata if c.real])
for m_col_ix in range(len(self._measurement_num_cols)):
max_plus_min = X_attributes[:, addi_cols_idx]
max_minus_min = X_attributes[:, addi_cols_idx + 1]
max_val = expand_dims(max_plus_min + max_minus_min, axis=1)
min_val = expand_dims(max_plus_min - max_minus_min, axis=1)
X_features[:, :, m_col_ix] = X_features[:, :, m_col_ix] * (max_val - min_val) + min_val
addi_cols_idx += 2

X_features = X_features * expand_dims(gen_flags, axis=2)
X_attributes = X_attributes[:, :addi_cols_idx_start]

num_samples = X_attributes.shape[0]
if self._has_attributes:
X_attributes = repeat(X_attributes.reshape((num_samples, 1, X_attributes.shape[1])), repeats=X_features.shape[1], axis=1)
Expand All @@ -168,9 +246,10 @@ def inverse_transform(self, X_features: ndarray, X_attributes: ndarray) -> list[
samples = []
for i in range(num_samples):
df = DataFrame(generated_data[i], columns=output_cols)
df_num = self.num_pipeline.inverse_transform(df[self.num_cols]) if self.num_cols else zeros([len(df), 0])
df_cat = self.cat_pipeline.inverse_transform(df[one_hot_cat_cols].round(0)) if self.cat_cols else zeros([len(df), 0])
df = DataFrame(concatenate((df_num, df_cat), axis=1), columns=self.num_cols+self.cat_cols)
df_num_feat = df[self._measurement_num_cols].to_numpy()
df_num_attr = self._num_pipeline.inverse_transform(df[self._attribute_num_cols]) if self._attribute_num_cols else zeros([len(df), 0])
df_cat = self._cat_pipeline.inverse_transform(df[one_hot_cat_cols]) if self.cat_cols else zeros([len(df), 0])
df = DataFrame(concatenate((df_num_feat, df_num_attr, df_cat), axis=1), columns=self._measurement_num_cols+self._attribute_num_cols+self.cat_cols)
df = df.loc[:, self._col_order_]
for col in df.columns:
df[col] = df[col].astype(self._types[col])
Expand Down
9 changes: 6 additions & 3 deletions src/ydata_synthetic/synthesizers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@

_train_parameters = ['cache_prefix', 'label_dim', 'epochs', 'sample_interval',
'labels', 'n_clusters', 'epsilon', 'log_frequency',
'measurement_cols', 'sequence_length', 'number_sequences']
'measurement_cols', 'sequence_length', 'number_sequences',
'sample_length', 'rounds']

ModelParameters = namedtuple('ModelParameters', _model_parameters, defaults=_model_parameters_df)
TrainParameters = namedtuple('TrainParameters', _train_parameters, defaults=('', None, 300, 50, None, 10, 0.005, True, None, 1, 1))
TrainParameters = namedtuple('TrainParameters', _train_parameters, defaults=('', None, 300, 50, None, 10, 0.005, True, None, 1, 1, 1, 1))

@typechecked
class BaseModel(ABC):
Expand Down Expand Up @@ -190,9 +191,11 @@ def fit(self,
elif self.__MODEL__ == DoppelGANgerProcessor.SUPPORTED_MODEL:
measurement_cols = train_arguments.measurement_cols
sequence_length = train_arguments.sequence_length
sample_length = train_arguments.sample_length
self.processor = DoppelGANgerProcessor(num_cols=num_cols, cat_cols=cat_cols,
measurement_cols=measurement_cols,
sequence_length=sequence_length).fit(data)
sequence_length=sequence_length,
sample_length=sample_length).fit(data)
else:
print(f'A DataProcessor is not available for the {self.__MODEL__}.')

Expand Down
Loading

0 comments on commit 6422500

Please sign in to comment.