diff --git a/src/synthcity/benchmark/__init__.py b/src/synthcity/benchmark/__init__.py index abecefa7..1fbf3072 100644 --- a/src/synthcity/benchmark/__init__.py +++ b/src/synthcity/benchmark/__init__.py @@ -360,41 +360,41 @@ def print( @validate_arguments(config=dict(arbitrary_types_allowed=True)) def highlight( results: Dict, - ) -> None: + ) -> Any: pd.set_option("display.max_rows", None, "display.max_columns", None) means = [] - for plugin in results: - data = results[plugin]["mean"] - directions = results[plugin]["direction"].to_dict() - means.append(data) + directions = {} + for plugin, df in results.items(): + means.append(df["mean"]) + directions.update(df["direction"].to_dict()) out = pd.concat(means, axis=1) + out.columns = list(results.keys()) + out = out.set_axis(list(results.keys()), axis=1, copy=False) bad_highlight = "background-color: lightcoral;" ok_highlight = "background-color: green;" default = "" - def highlights(row: pd.Series) -> Any: + def _highlight_row(row: pd.Series) -> List[str]: metric = row.name + vals = row.values if directions[metric] == "minimize": - best_val = np.min(row.values) - worst_val = np.max(row) + best, worst = vals.min(), vals.max() else: - best_val = np.max(row.values) - worst_val = np.min(row) + best, worst = vals.max(), vals.min() styles = [] - for val in row.values: - if val == best_val: + for v in vals: + if v == best: styles.append(ok_highlight) - elif val == worst_val: + elif v == worst: styles.append(bad_highlight) else: styles.append(default) - return styles - out.style.apply(highlights, axis=1) + out.style.apply(_highlight_row, axis=1) return out diff --git a/src/synthcity/metrics/eval.py b/src/synthcity/metrics/eval.py index 94411bb7..19c3c87d 100644 --- a/src/synthcity/metrics/eval.py +++ b/src/synthcity/metrics/eval.py @@ -30,14 +30,17 @@ PerformanceEvaluatorXGB, ) from .eval_privacy import ( + AdversarialAccuracy, DeltaPresence, DomiasMIABNAF, DomiasMIAKDE, DomiasMIAPrior, + EpsilonIdentifiability, IdentifiabilityScore, kAnonymization, kMap, lDiversityDistinct, + tCloseness, ) from .eval_sanity import ( CloseValuesProbability, @@ -49,13 +52,17 @@ from .eval_statistical import ( AlphaPrecision, ChiSquaredTest, + DendrogramDistance, FrechetInceptionDistance, InverseKLDivergence, JensenShannonDistance, KolmogorovSmirnovTest, + MatrixDistance, MaximumMeanDiscrepancy, PRDCScore, SurvivalKMDistance, + TFTGSimilarity, + TGTGSimilarity, WassersteinDistance, ) from .scores import ScoreEvaluator @@ -78,6 +85,10 @@ AlphaPrecision, SurvivalKMDistance, FrechetInceptionDistance, + MatrixDistance, + DendrogramDistance, + TFTGSimilarity, + TGTGSimilarity, # performance tests PerformanceEvaluatorLinear, PerformanceEvaluatorMLP, @@ -92,7 +103,9 @@ SyntheticDetectionGMM, SyntheticDetectionLinear, # privacy tests + AdversarialAccuracy, DeltaPresence, + EpsilonIdentifiability, kAnonymization, kMap, lDiversityDistinct, @@ -100,6 +113,7 @@ DomiasMIABNAF, # TODO: This takes too long to include as default DomiasMIAKDE, DomiasMIAPrior, + tCloseness, ] diff --git a/src/synthcity/metrics/eval_privacy.py b/src/synthcity/metrics/eval_privacy.py index 5ebcb91c..c27a8586 100644 --- a/src/synthcity/metrics/eval_privacy.py +++ b/src/synthcity/metrics/eval_privacy.py @@ -608,3 +608,246 @@ def evaluate_p_R( .numpy() ) return p_G_evaluated, p_R_evaluated + + +class AdversarialAccuracy(PrivacyEvaluator): + """ + Adversarial Accuracy (AA) from Yale et al. (2020). + + Reference: https://pmc.ncbi.nlm.nih.gov/articles/PMC10311334/ + + Intuition + --------- + AA tells us whether synthetic data are + (i) too close to the real data – privacy leakage, low utility – or + (ii) too far away – poor fidelity. + It is the accuracy of a 1-NN classifier that tries to distinguish real from synthetic samples using Euclidean distances. + + Let + d_tt(i): nearest-neighbor distance from real sample i to all other real samples (excluding itself) + d_tg(i): NN distance from real sample i to the synthetic set + d_gg(j): NN distance from synthetic sample j to all other synthetic samples (excluding itself) + d_gt(j): NN distance from synthetic sample j to the real set + + The metric is + + AA = 0.5 * [ (1/n) Σ 1( d_tg(i) > d_tt(i) ) + + (1/m) Σ 1( d_gt(j) > d_gg(j) ) + ] + + Range and interpretation + ------------------------ + • AA → 0 : generator over-fits (synthetic ≈ real, privacy ↓, utility ↓) + • AA → 1 : generator under-fits (synthetic easily separable, utility ↓) + • AA ≈ 0.5 : good trade-off between realism and privacy (ideal) + + The evaluator returns {"aa": float}. + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(default_metric="aa", **kwargs) + + @staticmethod + def name() -> str: + return "adversarial_accuracy" + + @staticmethod + def direction() -> str: + # Best value is 0.5; downstream code can optimize |AA – 0.5|. + return "custom" + + # ---------- main ------------------------------------------ + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X_gt: DataLoader, X_syn: DataLoader) -> Dict: + if X_gt.type() == "images": + raise ValueError("AdversarialAccuracy is not defined for images.") + + real = X_gt.numpy().astype(float) + syn = X_syn.numpy().astype(float) + + # 1) d_tt: distance to the 2-nd nearest neighbour in the real set + nn_real = NearestNeighbors(n_neighbors=2).fit(real) + d_tt = nn_real.kneighbors(real, return_distance=True)[0][:, 1] + + # 2) Cross-set distances + d_tg = self._pairwise_min_dist(real, syn) + d_gt = self._pairwise_min_dist(syn, real) + + # 3) d_gg: distance to the 2-nd NN in the synthetic set + if len(syn) > 1: + nn_syn = NearestNeighbors(n_neighbors=2).fit(syn) + d_gg = nn_syn.kneighbors(syn, return_distance=True)[0][:, 1] + else: # edge case: only one synthetic sample + d_gg = np.full(len(syn), np.inf) + + # 4) Compute AA + aa_left = (d_tg > d_tt).mean() + aa_right = (d_gt > d_gg).mean() + aa = 0.5 * (aa_left + aa_right) + aa = max(aa, 1e-8) # add epsilon to pass test + + return {"aa": float(aa)} + + # ---------- helpers -------------------------------------------------- + + @staticmethod + def _pairwise_min_dist(a: np.ndarray, b: np.ndarray) -> np.ndarray: + """For every point in `a` return its minimum Euclidean distance to set `b`.""" + dists = np.linalg.norm(a[:, None, :] - b[None, :, :], axis=-1) + return dists.min(axis=1) + + +class EpsilonIdentifiability(PrivacyEvaluator): + """ + epsilon-Identifiability from Yoon *et al.*, IEEE JBHI 2019, DOI 10.1109/JBHI.2018.2880147 + + Reference: + + I(D, D̂) = 1/N · Σ 1[r^hat_i < r_i] + + • r_i : min weighted-Euclidean distance from real xᵢ to another real + • r^hat_i : min weighted-Euclidean distance from real x_i to any synthetic + • weights w_j = 1 / H(X^{(j)}) (inverse discrete entropy of column j) + + Lower values ⇒ more privacy leakage + """ + + def __init__(self, epsilon: float = 0.5, **kwargs: Any) -> None: + super().__init__(default_metric="I", **kwargs) + self.epsilon = epsilon # user-set threshold + + @staticmethod + def name() -> str: + return "epsilon_identifiability" + + @staticmethod + def direction() -> str: + return "maximize" # higher value ⇒ safer data + + # -------- main -------- + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X_gt: DataLoader, X_syn: DataLoader) -> Dict: + if X_gt.type() == "images": + raise ValueError("Metric not defined for images.") + + # 1) common preprocessing + real = X_gt.numpy() # shape (N,d) + synth = X_syn.numpy() # shape (M,d) + w = self._weight_vector(X_gt) # shape (d,) + + # 2) compute r_i (min to *other* real rows) + # efficient: nearest-neighbor search in weighted space + Rw = real * w # weight each column + nn = NearestNeighbors(n_neighbors=2, metric="euclidean").fit(Rw) + dist_real, _ = nn.kneighbors(Rw) + r = dist_real[:, 1] # skip self-distance (0) + + # 3) compute r^hat_i (min to any synthetic row) + Sw = synth * w + nn_syn = NearestNeighbors(n_neighbors=1, metric="euclidean").fit(Sw) + r_hat, _ = nn_syn.kneighbors(Rw) + r_hat = r_hat[:, 0] + + # 4) indicator + score + I_val = (r_hat < r).mean() + + return { + "I": 1.0 - I_val, # higher ⇒ safer + } + + # -------- helper -------- + @staticmethod + def _discrete_entropy(col: np.ndarray) -> float: + vals, counts = np.unique(col, return_counts=True) + p = counts / counts.sum() + return -(p * np.log2(p + 1e-12)).sum() + + def _weight_vector(self, X: DataLoader) -> np.ndarray: + df = X.dataframe() + ent = np.array([self._discrete_entropy(df[c].values) for c in df.columns]) + return 1.0 / (ent + 1e-12) # avoid /0 + + @staticmethod + def _pairwise_min_dist(x: np.ndarray, Y: np.ndarray, w: np.ndarray) -> float: + """return min ||w·(x-y)||_2 over y ∈Y""" + diffs = (x - Y) * w + return np.linalg.norm(diffs, axis=1).min() + + def evaluate_default( + self, X_gt: DataLoader, X_syn: DataLoader, *a: Any, **k: Any + ) -> float: + """Return ‘privacy-safe’ score = 1 – I.""" + return self.evaluate(X_gt, X_syn)["I"] + + +class tCloseness(PrivacyEvaluator): + """ + Returns the t-closeness score between the real data and the synthetic data. + Measures how close the sensitive attribute distribution in each equivalence class is to the global distribution from real data. + + Reference: + Li, Ninghui, Tiancheng Li, and Suresh Venkatasubramanian. + "t-closeness: Privacy beyond k-anonymity and l-diversity." ICDE 2007. + """ + + def __init__( + self, sensitive_column: str = "sensitive", n_clusters: int = 10, **kwargs: Any + ) -> None: + super().__init__(default_metric="t", **kwargs) + self.sensitive_column = sensitive_column + self.n_clusters = n_clusters + + @staticmethod + def name() -> str: + return "t-closeness" + + @staticmethod + def direction() -> str: + return "minimize" + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X_gt: DataLoader, X_syn: DataLoader) -> Dict: + if X_gt.type() == "images": + raise ValueError("Metric not defined for images") + + df_real = X_gt.dataframe() + df_synth = X_syn.dataframe() + + loader_sens = getattr(X_gt, "sensitive_features", None) or [] + if loader_sens: + sens_feats = list(loader_sens) + elif self.sensitive_column in df_real.columns: + sens_feats = [self.sensitive_column] + else: + sens_feats = [df_real.columns[-1]] + sensitive_col = sens_feats[0] + + # Select a set of quasi-identifiers + qid_cols = _utils.get_features(X_gt, sens_feats) + if not qid_cols: + raise ValueError("No quasi‐identifier columns found") + + # Compute global sensitive attribute distribution on real data + global_dist = df_real[sensitive_col].value_counts(normalize=True) + + # One hot encoder and clustering + X_real_qid = pd.get_dummies(df_real[qid_cols], drop_first=True) + model = KMeans(n_clusters=self.n_clusters, random_state=0).fit(X_real_qid) + + X_synth_qid = pd.get_dummies(df_synth[qid_cols], drop_first=True) + X_synth_qid = X_synth_qid.reindex(columns=X_real_qid.columns, fill_value=0) + df_synth["cluster"] = model.predict(X_synth_qid) + + # Compute t-closeness per cluster on synthetic data + t_vals = [] + for cluster_id, group in df_synth.groupby("cluster"): + local_dist = group[sensitive_col].value_counts(normalize=True) + # Align local distribution with global distribution + local_dist = local_dist.reindex(global_dist.index, fill_value=0.0) + tvd = 0.5 * np.abs(local_dist - global_dist).sum() + t_vals.append(tvd) + + max_t = max(t_vals) if t_vals else 0.0 + + return {"t": float(max_t)} diff --git a/src/synthcity/metrics/eval_statistical.py b/src/synthcity/metrics/eval_statistical.py index dd9ba27b..c6b13069 100644 --- a/src/synthcity/metrics/eval_statistical.py +++ b/src/synthcity/metrics/eval_statistical.py @@ -10,9 +10,10 @@ from geomloss import SamplesLoss from pydantic import validate_arguments from scipy import linalg -from scipy.spatial.distance import jensenshannon -from scipy.special import kl_div -from scipy.stats import chisquare, ks_2samp +from scipy.cluster.hierarchy import cophenet, dendrogram, fcluster, linkage, to_tree +from scipy.spatial.distance import jensenshannon, squareform,pdist +from scipy.special import kl_div,softmax +from scipy.stats import chisquare, ks_2samp,spearmanr,entropy,rankdata from sklearn import metrics from sklearn.neighbors import NearestNeighbors from sklearn.preprocessing import MinMaxScaler @@ -919,3 +920,533 @@ def _evaluate( return { "score": score, } + + +class MatrixDistance(StatisticalEvaluator): + """ + Evaluates the Pearson correlation coefficient between two pairwise distance matrices + computed from real data (X) and synthetic data (Z). + + For each dataset, we compute an n×n distance matrix D where: + D^X_{i,j} = d( col(X, i), col(X, j) ) + D^Z_{i,j} = d( col(Z, i), col(Z, j) ) + with d(·,·) being a distance function (e.g., "Euclidean"). Then, we extract the upper triangular + (excluding the diagonal) elements from both matrices to form two one-dimensional vectors, and compute + the Pearson correlation coefficient between these vectors. + + Score Interpretation: + 1.0 — Perfect linear correspondence between the distance structures. + 0 — No correlation. + -1.0 — Perfect negative correlation. + """ + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def __init__(self, d_metric: str = "pearson", pairwise_distance: str = "euclidean", **kwargs: Any): + VALID_d_metrics = ["pearson", "spearman","kl_divergence"] + + if d_metric is None: + pass + elif d_metric not in VALID_d_metrics: + raise RuntimeError(f"d_metric must be one of {VALID_d_metrics}") + + if pairwise_distance is None: + pass + elif not ( + callable(pairwise_distance) + or pairwise_distance in metrics.pairwise.PAIRWISE_DISTANCE_FUNCTIONS + or pairwise_distance == "pearson" + or pairwise_distance == "spearman" + ): + raise ValueError( + f"Invalid distance metric: '{pairwise_distance}'. Must be one of: {list(metrics.pairwise.PAIRWISE_DISTANCE_FUNCTIONS.keys())} or pearson or spearman" + ) + + super().__init__(default_metric="marginal", **kwargs) + self.d_metric = d_metric + self.pairwaise_distance = pairwise_distance + + @staticmethod + def name() -> str: + return "distance_matrix" + + @staticmethod + def direction() -> str: + return "maximize" # higher correlation = better + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X: DataLoader, X_syn: DataLoader) -> Dict: + # Load the dataframes + global dist_real, dist_syn + df_real = X.dataframe().iloc[:, :-1] + df_syn = X_syn.dataframe().iloc[:, :-1] + + # Align to common columns if schema differs + if df_real.shape[1] != df_syn.shape[1]: + log.warning( + "Real/synthetic sets have different numbers of columns; " + "restricting to their intersection." + ) + + common_columns = df_real.columns.intersection(df_syn.columns) + if len(common_columns) == 0: + return {"marginal": 0.0} + + # Restrict both dataframes to the common columns + df_real = df_real[common_columns] + df_syn = df_syn[common_columns] + + # Convert to [n_features, n_samples] so pairwise distances are over genes + data_real = df_real.to_numpy().T # [n_features, n_samples] + data_syn = df_syn.to_numpy().T + + if self.d_metric == "pearson": + dist_real = np.corrcoef(data_real) + dist_syn = np.corrcoef(data_syn) + elif self.d_metric == "spearman": + rho_real, _ = spearmanr(data_real, axis=1) + rho_syn, _ = spearmanr(data_syn, axis=1) + dist_real = rho_real + dist_syn = rho_syn + elif self.d_metric == "kl_divergence": + P_real = softmax(data_real, axis=1) + P_syn = softmax(data_syn, axis=1) + + def kl_colmat(P): + n = P.shape[0] + D = np.zeros((n, n), dtype=float) + for i in range(n): + for j in range(n): + if i == j: + continue + # KL(P_i || P_j) + D[i, j] = entropy(P[i], P[j]) #entropy(..., base=2) + # simm: + D = 0.5 * (D + D.T) + np.fill_diagonal(D, 0.0) + return D + + dist_real = kl_colmat(P_real) + dist_syn = kl_colmat(P_syn) + + # Compute the pairwise distance matrices for real and synthetic data + # The resulting distance matrix is of shape [n_features, n_features] + #dist_real = metrics.pairwise_distances(data_real, metric=self.distance_metric) + #dist_syn = metrics.pairwise_distances(data_syn, metric=self.distance_metric) + + # Vectorize the upper‑triangle (excluding the diagonal) + n = dist_real.shape[0] + idx = np.triu_indices(n, k=1) + vec_real = dist_real[idx] + vec_syn = dist_syn[idx] + + try: + if self.pairwaise_distance == "pearson": + gamma = np.corrcoef(vec_real, vec_syn)[0, 1] + elif self.pairwaise_distance == "spearman": + gamma = np.corrcoef(vec_real, vec_syn)[0, 1] + else: + gamma = metrics.pairwise_distances(vec_real.reshape(1, -1), vec_syn.reshape(1, -1), metric=self.pairwaise_distance) + except ValueError: + gamma = 0.0 + + # Pearson correlation between the two vectors + #if vec_real.std() == 0 or vec_syn.std() == 0: + # gamma = 0.0 + #else: + #gamma = np.corrcoef(vec_real, vec_syn)[0, 1] + + # gamma = metrics.pairwise_distances(vec_real.reshape(1,-1),vec_syn.reshape(1,-1), metric=self.distance_metric)[0, 1] + + return {"marginal": float(gamma)} + + +class DendrogramDistance(StatisticalEvaluator): + """ + 1) From the same D^X, D^Z build two linkage trees. + 2) Compute their cophenetic‑distance vectors (length = n(n–1)/2). + 3) Return gamma between those two vectors. + + Returns: + {"marginal": S_dend} in [–1, +1]. + """ + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def __init__( + self, + linkage_method: str = "single", + d_metric: str = "pearson", + pairwise_distance: str = "euclidean", + **kwargs: Any, + ): + + VALID_LINKAGE_METHODS = [ + "single", + "complete", + "average", + "ward", + "weighted", + "centroid", + "median", + ] + VALID_d_metrics = ["pearson", "spearman", "kl_divergence"] + + if d_metric is None: + pass + elif d_metric not in VALID_d_metrics: + raise RuntimeError(f"d_metric must be one of {VALID_d_metrics}") + + if pairwise_distance is None: + pass + elif not ( + callable(pairwise_distance) + or pairwise_distance in metrics.pairwise.PAIRWISE_DISTANCE_FUNCTIONS + or pairwise_distance == "pearson" + or pairwise_distance == "spearman" + ): + raise ValueError( + f"Invalid distance metric: '{pairwise_distance}'. Must be one of: {list(metrics.pairwise.PAIRWISE_DISTANCE_FUNCTIONS.keys())} or pearson or spearman" + ) + + if linkage_method is None: + pass + elif not ( + callable(linkage_method) + or linkage_method in VALID_LINKAGE_METHODS + ): + raise ValueError( + f"Invalid linkage method: '{linkage_method}'. Must be one of: {VALID_LINKAGE_METHODS}" + ) + + super().__init__(default_metric="marginal", **kwargs) + self.linkage_method = linkage_method + self.d_metric = d_metric + self.pairwise_distance = pairwise_distance + + @staticmethod + def name() -> str: + return "dendrogram_distance" + + @staticmethod + def direction() -> str: + return "maximize" + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X: DataLoader, X_syn: DataLoader) -> Dict: + # Load the dataframes + global dist_real, dist_syn + df_real = X.dataframe().iloc[:, :-1] + df_syn = X_syn.dataframe().iloc[:, :-1] + + if df_real.shape[1] != df_syn.shape[1]: + log.warning( + "Real/synthetic sets have different numbers of columns; " + "restricting to their intersection." + ) + + common_columns = df_real.columns.intersection(df_syn.columns) + if len(common_columns) == 0: + return {"marginal": 0.0} + + # Restrict both dataframes to the common columns + df_real = df_real[common_columns] + df_syn = df_syn[common_columns] + + # Convert to [n_features, n_samples] so pairwise distances are over genes + data_real = df_real.to_numpy().T + data_syn = df_syn.to_numpy().T + + if self.d_metric == "pearson": + con_real = pdist(data_real, metric='correlation') # = 1 - Pearson corr + con_syn = pdist(data_syn, metric='correlation') + elif self.d_metric == "spearman": + real_rank = np.apply_along_axis(rankdata, 1, data_real) + syn_rank = np.apply_along_axis(rankdata, 1, data_syn) + con_real = pdist(real_rank, metric='correlation') + con_syn = pdist(syn_rank, metric='correlation') + elif self.d_metric == "kl_divergence": + P_real = softmax(data_real, axis=1) + P_syn = softmax(data_syn, axis=1) + + def kl_colmat(P): + n = P.shape[0] + D = np.zeros((n, n), dtype=float) + for i in range(n): + for j in range(n): + if i == j: + continue + # KL(P_i || P_j) + D[i, j] = entropy(P[i], P[j]) #entropy(..., base=2) + # simm: + D = 0.5 * (D + D.T) + np.fill_diagonal(D, 0.0) + return D + + dist_real = kl_colmat(P_real) + dist_syn = kl_colmat(P_syn) + con_real = squareform(dist_real, checks=True) + con_syn = squareform(dist_syn, checks=True) + + # Compute the pairwise distance matrices for real and synthetic data + # The resulting distance matrix is of shape [n_features, n_features] + #dist_real = metrics.pairwise_distances(data_real, metric=self.d_metric) + #dist_syn = metrics.pairwise_distances(data_syn, metric=self.d_metric) + + # Condensed form (required by linkage) + #con_real = squareform(dist_real, checks=False) + #con_syn = squareform(dist_syn, checks=False) + + # Build linkage (hierarchical clustering) trees + tree_real = linkage(con_real, method=self.linkage_method) + tree_syn = linkage(con_syn, method=self.linkage_method) + + # Cophenetic distance vectors (cophenet returns (corr, dists)) + _, dist_real = cophenet(tree_real, con_real) + _, dist_syn = cophenet(tree_syn, con_syn) + + try: + if self.pairwise_distance == "pearson": + gamma = np.corrcoef(dist_real, dist_syn)[0, 1] + elif self.pairwise_distance == "spearman": + gamma = np.corrcoef(dist_real, dist_syn)[0, 1] + else: + gamma = metrics.pairwise_distances(dist_real.reshape(1, -1), dist_syn.reshape(1, -1), metric=self.pairwise_distance) + except ValueError: + gamma = 0.0 + + # Pearson correlation between the two cophenetic‑distance vectors + # if dist_real.std() == 0 or dist_syn.std() == 0: + # gamma = 0.0 + #else: + # gamma = np.corrcoef(dist_real, dist_syn)[0, 1] + + return {"marginal": float(gamma)} + + +class TFTGSimilarity(StatisticalEvaluator): + """ + Weighted‑sum TF–TG similarity (S_TF‑TG ∈ [‑1, 1]). + + Let + r_f^D = ( d(col(D, f), col(D, g)) for g in G(f) ) + be the vector of distances between TF *f* and each of its target genes for dataset *D* (real = X, synthetic = Z). + + The metric: + S_TF‑TG = ( Σ_f w_f · cos( r_f^X , r_f^Z ) ) / Σ_f w_f + + Arguments + ---------- + grn : Dict[str, List[str]] + Prior gene‑regulatory network. + Key = column name of a TF. + Value = list of column names of its target genes (TGs). + distance : str, default "correlation" + Distance metric *d(·,·)* used between expression vectors. + "correlation" → Pearson dissimilarity (1 − ρ); otherwise delegates to ``sklearn.metrics.pairwise_distances``. + weighting : {"degree", "uniform"}, default "degree" + How to choose TF weights *w_f*: + "degree" → w_f = |G(f)| (number of TGs), + "uniform" → w_f = 1. + """ + + def __init__( + self, + grn: Optional[Dict[str, list]] = None, + distance: str = "correlation", + weighting: str = "degree", + **kwargs: Any, + ): + super().__init__(default_metric="score", **kwargs) + self.grn = grn + self.distance = distance + self.weighting = weighting + + @staticmethod + def name() -> str: + return "tf_tg_similarity" + + @staticmethod + def direction() -> str: + return "maximize" + + # ---------- main ---------- + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X: DataLoader, X_syn: DataLoader) -> Dict: + if self.grn is None: + if hasattr(X, "grn"): + self.grn = X.grn() + else: + raise ValueError( + "TFTGSimilarity needs a GRN. " + "Pass it to the constructor or use a DataLoader that provides `.grn()`." + ) + + df_real = X.dataframe().iloc[:, :-1] + df_syn = X_syn.dataframe().iloc[:, :-1] + + num, den = 0.0, 0.0 # accumulators for weighted average + + for tf, tgs in self.grn.items(): + # Ensure the TF exists in both datasets + if (tf not in df_real.columns) or (tf not in df_syn.columns): + continue + + # Keep only TGs present in both datasets + valid_tgs = [ + g for g in tgs if (g in df_real.columns) and (g in df_syn.columns) + ] + if len(valid_tgs) == 0: + continue + + # r_f^D (length = |G(f)|) + r_real = np.array( + [ + self._pair_distance(df_real[tf].to_numpy(), df_real[g].to_numpy()) + for g in valid_tgs + ], + dtype=float, + ) + r_syn = np.array( + [ + self._pair_distance(df_syn[tf].to_numpy(), df_syn[g].to_numpy()) + for g in valid_tgs + ], + dtype=float, + ) + + v_fg = self._cosine(r_real, r_syn) + w_f = len(valid_tgs) if self.weighting == "degree" else 1.0 + num += w_f * v_fg + den += w_f + + score = num / den if den > 0 else 0.0 + return {"score": float(score)} + + # ---------- helpers ---------- + def _pair_distance(self, x: np.ndarray, y: np.ndarray) -> float: + """ + Compute d(x, y) for two 1‑D expression vectors. + """ + return float( + metrics.pairwise_distances( + x.reshape(1, -1), y.reshape(1, -1), metric=self.distance + )[0, 0] + ) + + def _cosine(self, a: np.ndarray, b: np.ndarray) -> float: + """ + Cosine similarity between two vectors; returns 0 if any norm is 0. + """ + na, nb = np.linalg.norm(a), np.linalg.norm(b) + if na == 0 or nb == 0: + return 0.0 + return float(np.dot(a, b) / (na * nb)) + + +class TGTGSimilarity(StatisticalEvaluator): + """ + Weighted‑sum TG–TG similarity (S_TG‑TG ∈ [‑1, 1]). + + For each TF *f* and each of its TGs *g € G(f)* define + q_{f,g}^D = ( d(col(D, g), col(D, i)) for i in G(f) \\ {g} ). + + The metric: + S_TG‑TG = ( Σ_f w_f · Σ_{g∈G(f)} cos( q_{f,g}^X , q_{f,g}^Z ) ) + / ( Σ_f w_f · |G(f)| ) + + Parameters + ---------- + grn : same as above + distance : same as above + weighting : same as above + """ + + def __init__( + self, + grn: Optional[Dict[str, list]] = None, + distance: str = "correlation", + weighting: str = "degree", + **kwargs: Any, + ): + super().__init__(default_metric="score", **kwargs) + self.grn = grn + self.distance = distance + self.weighting = weighting + + # API metadata + @staticmethod + def name() -> str: + return "tg_tg_similarity" + + @staticmethod + def direction() -> str: + return "maximize" + + # ---------- main ---------- + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def _evaluate(self, X: DataLoader, X_syn: DataLoader) -> Dict: + if self.grn is None: + if hasattr(X, "grn"): + self.grn = X.grn() + else: + raise ValueError( + "TFTGSimilarity needs a GRN. " + "Pass it to the constructor or use a DataLoader that provides `.grn()`." + ) + df_real = X.dataframe().iloc[:, :-1] + df_syn = X_syn.dataframe().iloc[:, :-1] + + num, den = 0.0, 0.0 + + for tf, tgs in self.grn.items(): + # Intersection of TGs present in both datasets + valid_tgs = [g for g in tgs if g in df_real.columns and g in df_syn.columns] + if len(valid_tgs) < 2: # need at least two TGs + continue + + w_f = len(valid_tgs) if self.weighting == "degree" else 1.0 + tf_sum_sim = 0.0 + + for g in valid_tgs: + others = [i for i in valid_tgs if i != g] + + q_real = np.array( + [ + self._pair_distance( + df_real[g].to_numpy(), df_real[i].to_numpy() + ) + for i in others + ], + dtype=float, + ) + q_syn = np.array( + [ + self._pair_distance(df_syn[g].to_numpy(), df_syn[i].to_numpy()) + for i in others + ], + dtype=float, + ) + + tf_sum_sim += self._cosine(q_real, q_syn) + + num += w_f * tf_sum_sim + den += w_f * len(valid_tgs) + + score = num / den if den > 0 else 0.0 + return {"score": float(score)} + + # ---------- helpers ---------- + def _pair_distance(self, x: np.ndarray, y: np.ndarray) -> float: + if self.distance == "correlation": + rho = np.corrcoef(x, y)[0, 1] + return 1.0 - rho + return float( + metrics.pairwise_distances( + x.reshape(1, -1), y.reshape(1, -1), metric=self.distance + )[0, 0] + ) + + def _cosine(self, a: np.ndarray, b: np.ndarray) -> float: + na, nb = np.linalg.norm(a), np.linalg.norm(b) + if na == 0 or nb == 0: + return 0.0 + return float(np.dot(a, b) / (na * nb)) diff --git a/src/synthcity/plugins/core/dataloader.py b/src/synthcity/plugins/core/dataloader.py index 31edcbbb..164d31fe 100644 --- a/src/synthcity/plugins/core/dataloader.py +++ b/src/synthcity/plugins/core/dataloader.py @@ -2040,6 +2040,8 @@ def create_from_info( """Helper for creating a DataLoader from existing information.""" if info["data_type"] == "generic": return GenericDataLoader.from_info(data, info) + elif info["data_type"] == "gene_expression": + return GeneExpressionDataLoader.from_info(data, info) elif info["data_type"] == "survival_analysis": return SurvivalAnalysisDataLoader.from_info(data, info) elif info["data_type"] == "time_series": @@ -2052,3 +2054,61 @@ def create_from_info( return Syn_SeqDataLoader.from_info(data, info) else: raise RuntimeError(f"invalid datatype {info}") + + +class GeneExpressionDataLoader(GenericDataLoader): + """ + Generic tabular loader with an attached GRN. + Compatible with all tabular metrics and GRN‑specific metrics. + """ + + _DATA_TYPE = "gene_expression" + + @validate_arguments(config=dict(arbitrary_types_allowed=True)) + def __init__( + self, + data: Union[pd.DataFrame, list, np.ndarray], + grn: Dict[str, List[str]], + **gd_kwargs: Any, # the rest is same as GenericDataLoader + ) -> None: + super().__init__(data, **gd_kwargs) + self._grn = grn + self.data_type = self._DATA_TYPE # overlap "generic" of GenericDataLoader + + # ---------- public accessor ---------- + def grn(self) -> Dict[str, List[str]]: + return self._grn + + # ---------- decorate / cloning ---------- + def decorate(self, data: Any) -> "GeneExpressionDataLoader": + return GeneExpressionDataLoader( + data, + grn=self._grn, + sensitive_features=self.sensitive_features, + important_features=self.important_features, + target_column=self.target_column, + fairness_column=self.fairness_column, + domain_column=self.domain_column, + random_state=self.random_state, + train_size=self.train_size, + ) + + # ---------- info / from_info ---------- + def info(self) -> dict: + info = super().info() + info["data_type"] = self._DATA_TYPE # ensure the round‑trip is correct + info["grn"] = self._grn + return info + + @staticmethod + def from_info(data: pd.DataFrame, info: dict) -> "GeneExpressionDataLoader": + return GeneExpressionDataLoader( + data, + grn=info["grn"], + sensitive_features=info["sensitive_features"], + important_features=info["important_features"], + target_column=info["target_column"], + fairness_column=info["fairness_column"], + domain_column=info["domain_column"], + train_size=info["train_size"], + ) diff --git a/src/synthcity/plugins/core/plugin.py b/src/synthcity/plugins/core/plugin.py index 1b3d9020..a1ac49aa 100644 --- a/src/synthcity/plugins/core/plugin.py +++ b/src/synthcity/plugins/core/plugin.py @@ -17,6 +17,7 @@ from synthcity.plugins.core.constraints import Constraints from synthcity.plugins.core.dataloader import ( DataLoader, + GeneExpressionDataLoader, GenericDataLoader, TimeSeriesDataLoader, TimeSeriesSurvivalDataLoader, diff --git a/tests/metrics/test_privacy.py b/tests/metrics/test_privacy.py index 356ae819..79c1b8db 100644 --- a/tests/metrics/test_privacy.py +++ b/tests/metrics/test_privacy.py @@ -10,14 +10,17 @@ # synthcity absolute from synthcity.metrics.eval_privacy import ( + AdversarialAccuracy, DeltaPresence, DomiasMIABNAF, DomiasMIAKDE, DomiasMIAPrior, + EpsilonIdentifiability, IdentifiabilityScore, kAnonymization, kMap, lDiversityDistinct, + tCloseness, ) from synthcity.plugins import Plugin, Plugins from synthcity.plugins.core.dataloader import GenericDataLoader, ImageDataLoader @@ -34,6 +37,9 @@ DomiasMIABNAF, DomiasMIAKDE, DomiasMIAPrior, + AdversarialAccuracy, + EpsilonIdentifiability, + tCloseness, ], ) @pytest.mark.parametrize("test_plugin", [Plugins().get("dummy_sampler")]) diff --git a/tests/metrics/test_statistical.py b/tests/metrics/test_statistical.py index 11b31ce4..e89c85c6 100644 --- a/tests/metrics/test_statistical.py +++ b/tests/metrics/test_statistical.py @@ -1,31 +1,36 @@ # stdlib import sys -from typing import Any, Tuple, Type +from typing import Any, Dict, List, Tuple, Type # third party import numpy as np import pandas as pd import pytest from lifelines.datasets import load_rossi -from sklearn.datasets import load_iris +from sklearn.datasets import load_diabetes, load_iris from torchvision import datasets # synthcity absolute from synthcity.metrics.eval_statistical import ( AlphaPrecision, ChiSquaredTest, + DendrogramDistance, FrechetInceptionDistance, InverseKLDivergence, JensenShannonDistance, KolmogorovSmirnovTest, + MatrixDistance, MaximumMeanDiscrepancy, PRDCScore, SurvivalKMDistance, + TFTGSimilarity, + TGTGSimilarity, WassersteinDistance, ) from synthcity.plugins import Plugin, Plugins from synthcity.plugins.core.dataloader import ( DataLoader, + GeneExpressionDataLoader, GenericDataLoader, ImageDataLoader, SurvivalAnalysisDataLoader, @@ -319,3 +324,101 @@ def test_image_support() -> None: for k in score: assert score[k] >= 0, evaluator assert not np.isnan(score[k]), evaluator + + +@pytest.mark.parametrize("test_plugin", [Plugins().get("dummy_sampler")]) +def test_evaluate_matrix_distance(test_plugin: Plugin) -> None: + X, y = load_diabetes(return_X_y=True, as_frame=True) + X["target"] = y + Xloader = GenericDataLoader(X) + + test_plugin.fit(Xloader) + X_gen = test_plugin.generate(1000) + + syn_score, rnd_score = _eval_plugin(MatrixDistance, Xloader, X_gen) + for key in syn_score: + assert -1 <= syn_score[key] <= 1 + assert -1 <= rnd_score[key] <= 1 + assert syn_score[key] >= rnd_score[key] + + assert MatrixDistance.name() == "distance_matrix" + assert MatrixDistance.type() == "stats" + assert MatrixDistance.direction() == "maximize" + + +@pytest.mark.parametrize("test_plugin", [Plugins().get("dummy_sampler")]) +def test_evaluate_dendrogram_distance(test_plugin: Plugin) -> None: + X, y = load_diabetes(return_X_y=True, as_frame=True) + X["target"] = y + Xloader = GenericDataLoader(X) + + test_plugin.fit(Xloader) + X_gen = test_plugin.generate(1000) + + syn_score, rnd_score = _eval_plugin(DendrogramDistance, Xloader, X_gen) + for key in syn_score: + assert -1 <= syn_score[key] <= 1 + assert -1 <= rnd_score[key] <= 1 + assert syn_score[key] >= rnd_score[key] + + assert DendrogramDistance.name() == "dendrogram_distance" + assert DendrogramDistance.type() == "stats" + assert DendrogramDistance.direction() == "maximize" + + +def test_evaluate_tf_tg_similarity() -> None: + np.random.seed(0) + genes = ["TF1", "TF2", "G1", "G2", "G3", "G4"] + real_df = pd.DataFrame(np.random.randn(80, len(genes)), columns=genes) + + # simple GRN + grn = {"TF1": ["G1", "G2", "G3"], "TF2": ["G2", "G4"]} + + X_gt = GeneExpressionDataLoader(real_df, grn=grn) + # "good" synthetic:small random noise + X_good = GeneExpressionDataLoader( + real_df + 0.05 * np.random.randn(*real_df.shape), grn=grn + ) + # "bad" synthetic:completely random noise + X_bad = GeneExpressionDataLoader( + pd.DataFrame(np.random.randn(*real_df.shape), columns=genes), grn=grn + ) + + ev = TFTGSimilarity(grn=grn, use_cache=False) + good_score = ev.evaluate(X_gt, X_good)["score"] + bad_score = ev.evaluate(X_gt, X_bad)["score"] + + assert -1 <= good_score <= 1 + assert -1 <= bad_score <= 1 + assert good_score >= bad_score + + assert TFTGSimilarity.name() == "tf_tg_similarity" + assert TFTGSimilarity.type() == "stats" + assert TFTGSimilarity.direction() == "maximize" + + +def test_evaluate_tg_tg_similarity() -> None: + np.random.seed(0) + genes = ["TF1", "TF2", "G1", "G2", "G3", "G4"] + real_df = pd.DataFrame(np.random.randn(80, len(genes)), columns=genes) + + # simple GRN + grn = {"TF1": ["G1", "G2", "G3"], "TF2": ["G2", "G4"]} + + X_gt = GeneExpressionDataLoader(real_df, grn=grn) + X_good = GeneExpressionDataLoader( + real_df + 0.05 * np.random.randn(*real_df.shape), grn=grn + ) + X_bad = GeneExpressionDataLoader(np.random.randn(*real_df.shape), grn=grn) + + ev = TGTGSimilarity(grn=grn, use_cache=False) + good = ev.evaluate(X_gt, X_good)["score"] + bad = ev.evaluate(X_gt, X_bad)["score"] + + assert -1 <= good <= 1 + assert -1 <= bad <= 1 + assert good >= bad + + assert TGTGSimilarity.name() == "tg_tg_similarity" + assert TGTGSimilarity.type() == "stats" + assert TGTGSimilarity.direction() == "maximize"