diff --git a/src/mightypy/ml/_ensemble.py b/src/mightypy/ml/_ensemble.py index 4bee7c5..f76b409 100644 --- a/src/mightypy/ml/_ensemble.py +++ b/src/mightypy/ml/_ensemble.py @@ -1,6 +1,7 @@ """ Ensemble methods for Machine Learning """ + from __future__ import annotations from typing import Union, Tuple, List, Optional import numpy as np diff --git a/src/mightypy/ml/_linear.py b/src/mightypy/ml/_linear.py index 7b740c9..28ae440 100644 --- a/src/mightypy/ml/_linear.py +++ b/src/mightypy/ml/_linear.py @@ -11,6 +11,7 @@ __copyright__ = "Nishant Baheti" __license__ = "MIT" + class LinearRegression: """Linear Regression Model Class @@ -23,8 +24,7 @@ class LinearRegression: """ def __init__(self, alpha: float = 0.01, iterations: int = 10000): - """Constructor - """ + """Constructor""" self.alpha = alpha self.iterations = iterations self._theta = None @@ -127,13 +127,15 @@ def predict(self, X: np.ndarray) -> np.ndarray: else: raise Warning("Model is not trained yet. Theta is None.") - def train(self, - X: np.ndarray, - y: np.ndarray, - verbose: bool = True, - method: str = "SGD", - theta_precision: float = 0.001, - batch_size: int = 30) -> None: + def train( + self, + X: np.ndarray, + y: np.ndarray, + verbose: bool = True, + method: str = "SGD", + theta_precision: float = 0.001, + batch_size: int = 30, + ) -> None: """train model /theta estimator Args: @@ -203,8 +205,8 @@ def train(self, # creating batch for this iteration # X_batch = np.take(self._X, indices, axis=0) # y_batch = np.take(self._y, indices, axis=0) - X_batch = self._X[indices,:] - y_batch = self._y[indices,:] + X_batch = self._X[indices, :] + y_batch = self._y[indices, :] # calculate y_pred y_pred = self.predict(X_batch) @@ -247,8 +249,7 @@ class RidgeRegression: """ def __init__(self, alpha: float = 0.01, iterations: int = 10000): - """Constructor - """ + """Constructor""" self.alpha = alpha self.iterations = iterations self._theta = None @@ -351,14 +352,16 @@ def predict(self, X: np.ndarray) -> np.ndarray: else: raise Warning("Model is not trained yet. Theta is None.") - def train(self, - X: np.ndarray, - y: np.ndarray, - verbose: bool = True, - method: str = "SGD", - theta_precision: float = 0.001, - penalty: Union[float, int] = 1.0, - batch_size: int = 30) -> None: + def train( + self, + X: np.ndarray, + y: np.ndarray, + verbose: bool = True, + method: str = "SGD", + theta_precision: float = 0.001, + penalty: Union[float, int] = 1.0, + batch_size: int = 30, + ) -> None: """train model /theta estimator Args: @@ -406,7 +409,7 @@ def train(self, # theta_0 will not be effected by penalty new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) # type: ignore # rest of theta's will be effected by it - new_theta_rest = self._theta[:, range(1, self._n)] * (1 - (penalty/self._m)) - (self.alpha * gradient[1:]) # type: ignore + new_theta_rest = self._theta[:, range(1, self._n)] * (1 - (penalty / self._m)) - (self.alpha * gradient[1:]) # type: ignore new_theta = np.hstack((new_theta_0, new_theta_rest)) @@ -434,8 +437,8 @@ def train(self, # X_batch = np.take(self._X, indices, axis=0) # y_batch = np.take(self._y, indices, axis=0) - X_batch = self._X[indices,:] - y_batch = self._y[indices,:] + X_batch = self._X[indices, :] + y_batch = self._y[indices, :] # calculate y_pred y_pred = self.predict(X_batch) @@ -444,11 +447,10 @@ def train(self, # simultaneous operation gradient = np.mean((y_pred - y_batch) * X_batch, axis=0) # type: ignore - new_theta_0 = self._theta[:,[0]] - (self.alpha * gradient[0]) # type: ignore - new_theta_rest = self._theta[:,range(1,self._n)] * (1 - (penalty/self._m) ) - (self.alpha * gradient[1:]) # type: ignore - - new_theta = np.hstack((new_theta_0,new_theta_rest)) + new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) # type: ignore + new_theta_rest = self._theta[:, range(1, self._n)] * (1 - (penalty / self._m)) - (self.alpha * gradient[1:]) # type: ignore + new_theta = np.hstack((new_theta_0, new_theta_rest)) if np.isnan(np.sum(new_theta)) or np.isinf(np.sum(new_theta)): print("breaking. found inf or nan.") @@ -464,8 +466,11 @@ def train(self, self._theta_history.append(self._theta[0]) elif method == "NORMAL": - self._theta = np.linalg.inv( - self._X.T @ self._X + (penalty * np.identity(self._n))) @ self._X.T @ self._y + self._theta = ( + np.linalg.inv(self._X.T @ self._X + (penalty * np.identity(self._n))) + @ self._X.T + @ self._y + ) else: print("No Method Defined.") @@ -483,8 +488,7 @@ class LassoRegression: """ def __init__(self, alpha: float = 0.01, iterations: int = 10000): - """Constructor - """ + """Constructor""" self.alpha = alpha self.iterations = iterations self._theta = None @@ -587,14 +591,16 @@ def predict(self, X: np.ndarray) -> np.ndarray: else: raise Warning("Model is not trained yet. Theta is None.") - def train(self, - X: np.ndarray, - y: np.ndarray, - verbose: bool = True, - method: str = "SGD", - theta_precision: float = 0.001, - penalty: Union[int, float] = 1.0, - batch_size: int = 30) -> None: + def train( + self, + X: np.ndarray, + y: np.ndarray, + verbose: bool = True, + method: str = "SGD", + theta_precision: float = 0.001, + penalty: Union[int, float] = 1.0, + batch_size: int = 30, + ) -> None: """train model /theta estimator Args: @@ -638,7 +644,7 @@ def train(self, gradient = np.mean((y_pred - self._y) * self._X, axis=0) # type: ignore new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) # type: ignore - new_theta_rest = self._theta[:, range(1, self._n)] - (self.alpha * gradient[1:]) - (penalty/self._m) # type: ignore + new_theta_rest = self._theta[:, range(1, self._n)] - (self.alpha * gradient[1:]) - (penalty / self._m) # type: ignore new_theta = np.hstack((new_theta_0, new_theta_rest)) @@ -679,7 +685,7 @@ def train(self, gradient = np.mean((y_pred - y_batch) * X_batch, axis=0) # type: ignore new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) # type: ignore - new_theta_rest = self._theta[:, range(1, self._n)] - (self.alpha * gradient[1:]) - (penalty/self._m) # type: ignore + new_theta_rest = self._theta[:, range(1, self._n)] - (self.alpha * gradient[1:]) - (penalty / self._m) # type: ignore new_theta = np.hstack((new_theta_0, new_theta_rest)) @@ -815,15 +821,17 @@ def predict(self, X: np.ndarray) -> np.ndarray: else: raise Warning("Model is not trained yet. Theta is None.") - def train(self, - X: np.ndarray, - y: np.ndarray, - verbose: bool = True, - method: str = "SGD", - theta_precision: float = 0.001, - batch_size: int = 30, - regularization: bool = False, - penalty: Union[float, int] = 1.0) -> None: + def train( + self, + X: np.ndarray, + y: np.ndarray, + verbose: bool = True, + method: str = "SGD", + theta_precision: float = 0.001, + batch_size: int = 30, + regularization: bool = False, + penalty: Union[float, int] = 1.0, + ) -> None: """train theta / estimator Args: @@ -837,7 +845,7 @@ def train(self, "SGD"(Stochastic Gradient Descent) theta_precision (float, optional): theta initialization value precision. Defaults to 0.001. - batch_size (int, optional): batch size only for BGD. Defaults to 30. + batch_size (int, optional): batch size only for BGD. Defaults to 30. regularization (bool, optional): Apply Regularization. Defaults to False. penalty (Union[float, int], optional): regularization penalty only works for regularization=True. Defaults to 1.0. """ @@ -869,8 +877,9 @@ def train(self, if regularization: gradient = np.mean((y_pred - self._y) * self._X, axis=0) new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) - new_theta_rest = self._theta[:, range( - 1, self._n)] * (1 - (penalty/self._m)) - (self.alpha * gradient[1:]) + new_theta_rest = self._theta[:, range(1, self._n)] * ( + 1 - (penalty / self._m) + ) - (self.alpha * gradient[1:]) new_theta = np.hstack((new_theta_0, new_theta_rest)) else: @@ -915,8 +924,9 @@ def train(self, if regularization: gradient = np.mean((y_pred - y_batch) * X_batch, axis=0) new_theta_0 = self._theta[:, [0]] - (self.alpha * gradient[0]) - new_theta_rest = self._theta[:, range( - 1, self._n)] * (1 - (penalty/self._m)) - (self.alpha * gradient[1:]) + new_theta_rest = self._theta[:, range(1, self._n)] * ( + 1 - (penalty / self._m) + ) - (self.alpha * gradient[1:]) new_theta = np.hstack((new_theta_0, new_theta_rest)) else: @@ -940,7 +950,9 @@ def train(self, print("No Method Defined.") -def polynomial_regression(x: np.ndarray, y: np.ndarray, degree: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: +def polynomial_regression( + x: np.ndarray, y: np.ndarray, degree: int +) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ fit Regression line with polynomial degree. @@ -963,15 +975,17 @@ def polynomial_regression(x: np.ndarray, y: np.ndarray, degree: int) -> Tuple[np >>> plt.show() """ a = np.polynomial.Polynomial.fit(x, y, deg=degree).convert().coef - + if len(a) == 1: slope = a resid = np.array([0]) else: slope = a[1:] resid = a[0] - fit_line = np.array([(x**(degree - i))*slope[i] - for i in range(0, degree)]).sum(axis=0) + resid + fit_line = ( + np.array([(x ** (degree - i)) * slope[i] for i in range(0, degree)]).sum(axis=0) + + resid + ) return slope, resid, fit_line @@ -984,10 +998,10 @@ def trend(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndar y (np.ndarray): dependent variable. Returns: - Tuple[np.ndarray, np.ndarray, np.ndarray]: slope, residual, trendline. + Tuple[np.ndarray, np.ndarray, np.ndarray]: slope, residual, trendline. Examples; - >>> import matplotlib.pyplot as plt + >>> import matplotlib.pyplot as plt >>> x = np.array([1, 2, 3, 4, 5, 6, 7, 8]) >>> y = np.array([1, 2, 3, 3, 4, 5, 7, 10]) >>> s, r, t = trend(x, y) @@ -1006,18 +1020,17 @@ def trend(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndar x = np.array([1, 2, 3, 4, 5, 6, 7, 8]) y = np.array([1, 2, 3, 3, 4, 5, 7, 10]) s, r, t = trend(x, y) - plt.plot(x, y, 'o', label='original', alpha=0.6) - plt.plot(x, t, '.-', label='regression line') + plt.plot(x, y, "o", label="original", alpha=0.6) + plt.plot(x, t, ".-", label="regression line") plt.legend() plt.show(block=True) - x = np.arange(1, 10) y = x**2 + x**3 s, r, l = polynomial_regression(x, y, 1) - plt.plot(x, y, 'ko', label='original', alpha=0.6) - plt.plot(x, l, '.-', label='regression line') + plt.plot(x, y, "ko", label="original", alpha=0.6) + plt.plot(x, l, ".-", label="regression line") plt.legend() plt.show() diff --git a/src/mightypy/ml/_tree.py b/src/mightypy/ml/_tree.py index 692623e..3282ff3 100644 --- a/src/mightypy/ml/_tree.py +++ b/src/mightypy/ml/_tree.py @@ -13,9 +13,13 @@ class Question: header (str): column/header name. """ - def __init__(self, column_index: int, value: Union[int, str, float, np.int64, np.float64], header: str): - """Constructor - """ + def __init__( + self, + column_index: int, + value: Union[int, str, float, np.int64, np.float64], + header: str, + ): + """Constructor""" self.column_index = column_index self.value = value self.header = header @@ -57,9 +61,16 @@ class Node: leaf_value (Union[dict,int,float], optional): Leaf node/final node's value. Defaults to None. """ - def __init__(self, question: Question = None, true_branch: Node = None, false_branch: Node = None, uncertainty: float = None, *, leaf_value: Union[dict, int, float] = None): - """Constructor - """ + def __init__( + self, + question: Question = None, + true_branch: Node = None, + false_branch: Node = None, + uncertainty: float = None, + *, + leaf_value: Union[dict, int, float] = None, + ): + """Constructor""" self.question = question self.true_branch = true_branch self.false_branch = false_branch @@ -84,9 +95,10 @@ class DecisionTreeClassifier: criteria (str, optional): what criteria to use for information. Defaults to 'gini'. available 'gini','entropy'. """ - def __init__(self, max_depth: int = 100, min_samples_split: int = 2, criteria: str = 'gini'): - """Constructor - """ + def __init__( + self, max_depth: int = 100, min_samples_split: int = 2, criteria: str = "gini" + ): + """Constructor""" self._X = None self._y = None self._feature_names = None @@ -156,7 +168,9 @@ def _uncertainty(self, a: np.ndarray) -> float: value = self._gini_impurity(a) return value - def _partition(self, rows: np.ndarray, question: Union[Question, None]) -> Tuple[list, list]: + def _partition( + self, rows: np.ndarray, question: Union[Question, None] + ) -> Tuple[list, list]: """partition the rows based on the question Args: @@ -174,7 +188,9 @@ def _partition(self, rows: np.ndarray, question: Union[Question, None]) -> Tuple false_idx.append(idx) return true_idx, false_idx - def _info_gain(self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float) -> float: + def _info_gain( + self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float + ) -> float: """Calculate information gain after splitting Args: @@ -190,19 +206,22 @@ def _info_gain(self, left: np.ndarray, right: np.ndarray, parent_uncertainty: fl pr = left.shape[0] / (left.shape[0] + right.shape[0]) # calcualte child uncertainity - child_uncertainty = pr * \ - self._uncertainty(left) - (1 - pr) * self._uncertainty(right) + child_uncertainty = pr * self._uncertainty(left) - (1 - pr) * self._uncertainty( + right + ) # calculate information gain info_gain_value = parent_uncertainty - child_uncertainty return info_gain_value - def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, Union[Question, None], float]: + def _find_best_split( + self, X: np.ndarray, y: np.ndarray + ) -> Tuple[float, Union[Question, None], float]: """method to find best split possible for the sample - + Args: X (np.ndarray): Feature matrix. y (np.ndarray): target matrix. - + Returns: Tuple[float,Union[Question,None],float]: maximum gain from the split, best question of it, and parent node uncertainty. """ @@ -217,12 +236,14 @@ def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, Union[Q for col_index in range(n_labels): # iterate over feature columns # get unique values from the feature unique_values = np.unique(X[:, col_index]) - for val in unique_values: # check for every value and find maximum info gain + for ( + val + ) in unique_values: # check for every value and find maximum info gain ques = Question( column_index=col_index, value=val, - header=self._feature_names[col_index] + header=self._feature_names[col_index], ) t_idx, f_idx = self._partition(X, ques) @@ -245,7 +266,7 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: """Recursive funtion to build tree. Args: - X (np.ndarray): input features matrix. + X (np.ndarray): input features matrix. y (np.ndarray): target matrix. depth (int, optional): depth count of the recursion. Defaults to 0. @@ -257,7 +278,11 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: # if depth is greater than max depth defined or labels/features are left to 1 # or number of samples are less than the minimum size of samples to split then # stop recursion and return a node - if (depth > self.max_depth or n_labels == 1 or m_samples < self.min_samples_split): + if ( + depth > self.max_depth + or n_labels == 1 + or m_samples < self.min_samples_split + ): return Node(leaf_value=self._count_dict(y)) gain, ques, uncertainty = self._find_best_split(X, y) @@ -269,17 +294,25 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: t_idx, f_idx = self._partition(X, ques) # get partition idxs true_branch = self._build_tree( - X[t_idx, :], y[t_idx, :], depth + 1) # recog true branch samples + X[t_idx, :], y[t_idx, :], depth + 1 + ) # recog true branch samples false_branch = self._build_tree( - X[f_idx, :], y[f_idx, :], depth + 1) # recog false branch samples + X[f_idx, :], y[f_idx, :], depth + 1 + ) # recog false branch samples return Node( question=ques, true_branch=true_branch, false_branch=false_branch, - uncertainty=uncertainty + uncertainty=uncertainty, ) - def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_name: list = None, target_name: list = None) -> None: + def train( + self, + X: Union[np.ndarray, list], + y: Union[np.ndarray, list], + feature_name: list = None, + target_name: list = None, + ) -> None: """Train the model Args: @@ -289,10 +322,12 @@ def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_ target_name (list, optional): target name list. Defaults to None. """ - X = np.array(X, dtype='O') if not isinstance( - X, (np.ndarray)) else X # converting to numpy array - y = np.array(y, dtype='O') if not isinstance( - y, (np.ndarray)) else y # converting to numpy array + X = ( + np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X + ) # converting to numpy array + y = ( + np.array(y, dtype="O") if not isinstance(y, (np.ndarray)) else y + ) # converting to numpy array # reshaping to vectors self._X = X.reshape(-1, 1) if len(X.shape) == 1 else X @@ -300,17 +335,15 @@ def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_ # creating feature names if not mentioned self._feature_names = feature_name or [ - f"C_{i}" for i in range(self._X.shape[1])] + f"C_{i}" for i in range(self._X.shape[1]) + ] # creating target name if not mentioned - self._target_name = target_name or ['target'] + self._target_name = target_name or ["target"] # BOOOM # building the tree - self._tree = self._build_tree( - X=self._X, - y=self._y - ) + self._tree = self._build_tree(X=self._X, y=self._y) def print_tree(self, node: Union[Node, None] = None, spacing: str = "|-") -> None: """print the tree @@ -327,15 +360,21 @@ def print_tree(self, node: Union[Node, None] = None, spacing: str = "|-") -> Non return # Print the question at this node - print(spacing + str(node.question) + - " | " + self.criteria + " :" + str(node.uncertainty)) + print( + spacing + + str(node.question) + + " | " + + self.criteria + + " :" + + str(node.uncertainty) + ) # Call this function recursively on the true branch - print(spacing + '--> True:') + print(spacing + "--> True:") self.print_tree(node.true_branch, " " + spacing + "-") # Call this function recursively on the false branch - print(spacing + '--> False:') + print(spacing + "--> False:") self.print_tree(node.false_branch, " " + spacing + "-") def _classification(self, row: np.ndarray, node: Union[Node, None]) -> Union[dict]: @@ -384,7 +423,7 @@ def predict(self, X: Union[np.ndarray, list]) -> np.ndarray: np.ndarray: results of classification. """ if isinstance(X, (np.ndarray, list)): - X = np.array(X, dtype='O') if not isinstance(X, (np.ndarray)) else X + X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X if len(X.shape) == 1: max_result = 0 @@ -393,7 +432,7 @@ def predict(self, X: Union[np.ndarray, list]) -> np.ndarray: for key in result_dict: if result_dict[key] > max_result: result = key - return np.array([[result]], dtype='O') + return np.array([[result]], dtype="O") else: leaf_value = [] # get maximum caterigorical value from all catergories @@ -405,11 +444,13 @@ def predict(self, X: Union[np.ndarray, list]) -> np.ndarray: if result_dict[key] > max_result: result = key leaf_value.append([result]) - return np.array(leaf_value, dtype='O') + return np.array(leaf_value, dtype="O") else: raise ValueError("X should be list or numpy array") - def predict_probability(self, X: Union[np.ndarray, list]) -> Union[np.ndarray, dict]: + def predict_probability( + self, X: Union[np.ndarray, list] + ) -> Union[np.ndarray, dict]: """predict classfication probabilities Args: @@ -422,16 +463,23 @@ def predict_probability(self, X: Union[np.ndarray, list]) -> Union[np.ndarray, d Union[np.ndarray, dict]: probabity results of classification. """ if isinstance(X, (np.ndarray, list)): - X = np.array(X, dtype='O') if not isinstance(X, (np.ndarray)) else X + X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X if len(X.shape) == 1: - return self._leaf_probabilities(self._classification(row=X, node=self._tree)) + return self._leaf_probabilities( + self._classification(row=X, node=self._tree) + ) else: leaf_value = [] for row in X: - leaf_value.append([self._leaf_probabilities( - self._classification(row=row, node=self._tree))]) - return np.array(leaf_value, dtype='O') + leaf_value.append( + [ + self._leaf_probabilities( + self._classification(row=row, node=self._tree) + ) + ] + ) + return np.array(leaf_value, dtype="O") else: raise ValueError("X should be list or numpy array") @@ -445,9 +493,13 @@ class DecisionTreeRegressor: criteria (str, optional): criteria for best info gain. Defaults to 'variance'. """ - def __init__(self, max_depth: int = 10, min_samples_split: int = 3, criteria: str = 'variance'): - """constructor - """ + def __init__( + self, + max_depth: int = 10, + min_samples_split: int = 3, + criteria: str = "variance", + ): + """constructor""" self._X = None self._y = None self._feature_names = None @@ -469,7 +521,9 @@ def _mean_leaf_value(self, a: np.ndarray) -> float: return float(np.mean(a)) - def _partition(self, rows: np.ndarray, question: Union[Question, None]) -> Tuple[list, list]: + def _partition( + self, rows: np.ndarray, question: Union[Question, None] + ) -> Tuple[list, list]: """partition the rows based on the question Args: @@ -503,7 +557,9 @@ def _uncertainty(self, a: np.ndarray) -> float: value = np.var(a) return float(value) - def _info_gain(self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float) -> float: + def _info_gain( + self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float + ) -> float: """Calculate information gain after splitting Args: @@ -517,12 +573,15 @@ def _info_gain(self, left: np.ndarray, right: np.ndarray, parent_uncertainty: fl pr = left.shape[0] / (left.shape[0] + right.shape[0]) - child_uncertainty = pr * \ - self._uncertainty(left) - (1 - pr) * self._uncertainty(right) + child_uncertainty = pr * self._uncertainty(left) - (1 - pr) * self._uncertainty( + right + ) info_gain_value = parent_uncertainty - child_uncertainty return info_gain_value - def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, Union[Question, None], float]: + def _find_best_split( + self, X: np.ndarray, y: np.ndarray + ) -> Tuple[float, Union[Question, None], float]: """method to find best split possible for the sample Args: @@ -543,12 +602,14 @@ def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, Union[Q for col_index in range(n_labels): # iterate over feature columns # get unique values from the feature unique_values = np.unique(X[:, col_index]) - for val in unique_values: # check for every value and find maximum info gain + for ( + val + ) in unique_values: # check for every value and find maximum info gain ques = Question( column_index=col_index, value=val, - header=self._feature_names[col_index] + header=self._feature_names[col_index], ) t_idx, f_idx = self._partition(X, ques) @@ -570,7 +631,7 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: """Recursive funtion to build tree Args: - X (np.ndarray): input features matrix. + X (np.ndarray): input features matrix. y (np.ndarray): target matrix. depth (int, optional): depth count of the recursion. Defaults to 0. @@ -582,7 +643,11 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: # if depth is greater than max depth defined or labels/features are left to 1 # or number of samples are less than the minimum size of samples to split then # stop recursion and return a node - if (depth > self.max_depth or n_labels == 1 or m_samples < self.min_samples_split): + if ( + depth > self.max_depth + or n_labels == 1 + or m_samples < self.min_samples_split + ): return Node(leaf_value=y) gain, ques, uncertainty = self._find_best_split(X, y) @@ -593,17 +658,25 @@ def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node: t_idx, f_idx = self._partition(X, ques) true_branch = self._build_tree( - X[t_idx, :], y[t_idx, :], depth + 1) # get true samples + X[t_idx, :], y[t_idx, :], depth + 1 + ) # get true samples false_branch = self._build_tree( - X[f_idx, :], y[f_idx, :], depth + 1) # get false samples + X[f_idx, :], y[f_idx, :], depth + 1 + ) # get false samples return Node( question=ques, true_branch=true_branch, false_branch=false_branch, - uncertainty=uncertainty + uncertainty=uncertainty, ) - def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_name: list = None, target_name: list = None) -> None: + def train( + self, + X: Union[np.ndarray, list], + y: Union[np.ndarray, list], + feature_name: list = None, + target_name: list = None, + ) -> None: """Train the model Args: @@ -613,10 +686,12 @@ def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_ target_name (list, optional): target name list. Defaults to None. """ - X = np.array(X, dtype='O') if not isinstance( - X, (np.ndarray)) else X # converting to numpy array - y = np.array(y, dtype='O') if not isinstance( - y, (np.ndarray)) else y # converting to numpy array + X = ( + np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X + ) # converting to numpy array + y = ( + np.array(y, dtype="O") if not isinstance(y, (np.ndarray)) else y + ) # converting to numpy array # reshaping to vectors self._X = X.reshape(-1, 1) if len(X.shape) == 1 else X @@ -624,19 +699,22 @@ def train(self, X: Union[np.ndarray, list], y: Union[np.ndarray, list], feature_ # creating feature names if not mentioned self._feature_names = feature_name or [ - f"C_{i}" for i in range(self._X.shape[1])] + f"C_{i}" for i in range(self._X.shape[1]) + ] # creating target name if not mentioned - self._target_name = target_name or ['target'] + self._target_name = target_name or ["target"] # BOOOM # building the tree - self._tree = self._build_tree( - X=self._X, - y=self._y - ) - - def print_tree(self, node: Union[Node, None] = None, spacing: str = "|-", mean_preds: bool = True) -> None: + self._tree = self._build_tree(X=self._X, y=self._y) + + def print_tree( + self, + node: Union[Node, None] = None, + spacing: str = "|-", + mean_preds: bool = True, + ) -> None: """print the tree Args: @@ -651,29 +729,37 @@ def print_tree(self, node: Union[Node, None] = None, spacing: str = "|-", mean_p if mean_preds: print(spacing, " Predict :", self._mean_leaf_value(node.leaf_value)) else: - print(spacing, " Predict :", node.leaf_value[...,-1]) + print(spacing, " Predict :", node.leaf_value[..., -1]) return # Print the question at this node - print(spacing + str(node.question) + - " | " + self.criteria + " :" + str(node.uncertainty)) + print( + spacing + + str(node.question) + + " | " + + self.criteria + + " :" + + str(node.uncertainty) + ) # Call this function recursively on the true branch - print(spacing + '--> True:') + print(spacing + "--> True:") self.print_tree(node.true_branch, " " + spacing + "-", mean_preds) # Call this function recursively on the false branch - print(spacing + '--> False:') + print(spacing + "--> False:") self.print_tree(node.false_branch, " " + spacing + "-", mean_preds) - def _regression(self, row: np.ndarray, node: Union[Node, None], mean_preds: bool) -> float: + def _regression( + self, row: np.ndarray, node: Union[Node, None], mean_preds: bool + ) -> float: """regression recursive method Args: row (np.ndarray): input matrix. node (Union[Node,None]): node to start with. mostly root node. rest will be handled by recursion. mean_preds (bool): do the mean of prediction values. - + Returns: float: regression result. """ @@ -682,7 +768,7 @@ def _regression(self, row: np.ndarray, node: Union[Node, None], mean_preds: bool if mean_preds: return self._mean_leaf_value(node.leaf_value) else: - return node.leaf_value[...,-1] + return node.leaf_value[..., -1] if node.question.match(row): return self._regression(row, node.true_branch, mean_preds) @@ -700,18 +786,18 @@ def predict(self, X: np.ndarray, mean_preds: bool = True) -> np.ndarray: np.ndarray: regression prediction. """ if isinstance(X, (np.ndarray, list)): - X = np.array(X, dtype='O') if not isinstance(X, (np.ndarray)) else X + X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X if len(X.shape) == 1: result = self._regression(row=X, node=self._tree, mean_preds=mean_preds) - return np.array([[result]], dtype='O') + return np.array([[result]], dtype="O") else: leaf_value = [] for row in X: - result = self._regression(row=row, node=self._tree, mean_preds=mean_preds) + result = self._regression( + row=row, node=self._tree, mean_preds=mean_preds + ) leaf_value.append([result]) - return np.array(leaf_value, dtype='O') + return np.array(leaf_value, dtype="O") else: raise ValueError("X should be list or numpy array") - - diff --git a/src/mightypy/ml/_utils.py b/src/mightypy/ml/_utils.py index 699de41..b97ba1a 100644 --- a/src/mightypy/ml/_utils.py +++ b/src/mightypy/ml/_utils.py @@ -7,7 +7,7 @@ def sigmoid(val: np.ndarray) -> np.ndarray: """Sigmoid function - + .. math:: f(z) = \\frac{1}{1 + e^{-z}} @@ -43,22 +43,22 @@ def moving_window_matrix(arr: np.ndarray, window: int, lag: int = 1) -> np.ndarr >>> print(moving_window_matrix(a, 20, 2)) """ - assert len(np.shape(arr)) == 1, 'input array shape should be 1D like (m,).' + assert len(np.shape(arr)) == 1, "input array shape should be 1D like (m,)." size = arr.shape[0] - assert size > window and size > lag, \ - 'length of array should be greater than window size and lag.' + assert ( + size > window and size > lag + ), "length of array should be greater than window size and lag." frame_width = size - window + 1 new_frame_width = int(np.ceil(frame_width / lag)) new_frame = np.empty(shape=(window, new_frame_width)) for row in range(0, window): - new_frame[row] = arr[row: row+frame_width][::lag] + new_frame[row] = arr[row : row + frame_width][::lag] return new_frame.T - if __name__ == "__main__": # a = np.random.rand(100) @@ -75,7 +75,6 @@ def moving_window_matrix(arr: np.ndarray, window: int, lag: int = 1) -> np.ndarr # plt.legend() # plt.show() - # x = np.arange(-10, 10) # y = x**2 + x**3 # s, r, l = polynomial_regression(x, y, 3) diff --git a/src/mightypy/signal_processing/__init__.py b/src/mightypy/signal_processing/__init__.py index d7660db..f53a8c8 100644 --- a/src/mightypy/signal_processing/__init__.py +++ b/src/mightypy/signal_processing/__init__.py @@ -4,11 +4,9 @@ """ -from mightypy.signal_processing._fft import ( - DenoiseFFT -) +from mightypy.signal_processing._fft import PSDDenoiser __all__ = [ - 'DenoiseFFT' + 'PSDDenoiser' ] \ No newline at end of file diff --git a/src/mightypy/signal_processing/_fft.py b/src/mightypy/signal_processing/_fft.py index 9c0224f..1a5c771 100644 --- a/src/mightypy/signal_processing/_fft.py +++ b/src/mightypy/signal_processing/_fft.py @@ -1,112 +1,146 @@ """ -Signal Processing module. - - -author : Nishant Baheti +FFT Denoiser +------------- """ +# Authors: Nishant Baheti +from typing import Optional, Union import numpy as np -class FFTMixins: - @staticmethod - def psd(f_hat, l_signal): - """Power Spectral Density.""" - return ((f_hat * np.conjugate(f_hat)) / l_signal).real # type: ignore - - @staticmethod - def magnitude(f_hat, l_signal): - """Magnitude.""" - return (np.abs(f_hat) / l_signal).real # type: ignore +class PSDDenoiser: + """PSD (Power Spectral Density) Based Denoiser -class DenoiseFFT(FFTMixins): - """ - Denoise signals with Fast Fourier method. - - Args: - method (str): method. accepted values are psd, mag. - threshold (float): threshold for cleanup. + This method takes the FFT transform of the signal to calculate PSD + based on the PSD results and cutoff threshold the signal is filtered and + a FFT inverse is applied to regenerate denoised signal. - References: - https://machinelearningexploration.readthedocs.io/en/latest/MathExploration/SignalProcessingFFT.html + Parameters + ---------- + threshold : Optional[Union[int, float, str]], optional + threshold to create cutoff mask, but any threshold can be applied, + if it is precalculated by any method chosen by the process, by default auto-mean + { auto-mean, auto-max } - Examples: + Examples + -------- + >>> import numpy as np >>> import matplotlib.pyplot as plt - >>> from mightypy.make import sine_wave_from_timesteps - >>> time_step = 0.001 - >>> wave1, time1, freqs1 = sine_wave_from_timesteps(signal_freq=50, time_step=time_step) - >>> wave2, time2, freqs2 = sine_wave_from_timesteps(signal_freq=70, time_step=time_step) - >>> original_signal = wave1 + wave2 - >>> N = len(original_signal) - >>> noisy_signal = original_signal + 2.5 * np.random.randn(N) + 2.8 * np.random.randn(N) # adding random noise here - >>> model = DenoiseFFT('psd', 100) - >>> cleaned_signal = model.transform(noisy_signal) - >>> plt.plot(original_signal, label='original') - >>> plt.plot(noisy_signal, label='noisy') + >>> from sklearn.preprocessing import PSDDenoiser + >>> rng = np.random.default_rng() + >>> fs = 10e3 + >>> N = 100 + >>> amp = 2 * np.sqrt(2) + >>> freq = 1234.0 + >>> noise_power = 0.001 * fs / 2 + >>> time = np.arange(N) / fs + >>> X = amp * np.sin(2 * np.pi * freq * time) + >>> X += rng.normal(scale=np.sqrt(noise_power), size=time.shape) + + >>> denoiser = PSDDenoiser() + >>> cleaned_signal = denoiser.transform(X) + >>> plt.plot(X, label="noisy") + >>> plt.plot(cleaned_signal, label="cleaned") + >>> plt.title(f"Threshold : {denoiser.threshold}") + >>> plt.legend(loc="best") + >>> plt.show() + + >>> denoiser = PSDDenoiser(10) + >>> cleaned_signal = denoiser.transform(X) + >>> plt.plot(X, label='noisy') >>> plt.plot(cleaned_signal, label='cleaned') + >>> plt.title(f"Threshold : {denoiser.threshold}") >>> plt.legend(loc='best') >>> plt.show() """ - def __init__(self, method: str, threshold: float) -> None: - assert method.lower() in ('psd', 'mag'), "denoise signal method should be in psd, mag." - self._method = method.lower() - self._threshold = threshold - super().__init__() - - def transform(self, signal: np.ndarray) -> np.ndarray: - """ - Perform denoising operation on signal. - - Args: - signal (np.ndarray): signal. - - Returns: - np.ndarray: cleaned signal. + def __init__( + self, threshold: Optional[Union[int, float, str]] = "auto-mean" + ) -> None: + self._threshold = self.__init_threshold(threshold) + self._f_hat = None + self._pxx = None + self._cutoff_mask = None + self._filtered_f_hat = None + self._denoised_X = None + + def __init_threshold(self, threshold): + if isinstance(threshold, (str,)): + threshold = threshold.lower() + assert threshold in ( + "auto-mean", + "auto-max", + ), "available auto threshold methods { auto-mean, auto-max, auto-min }" + return threshold + + def _reshape_X(self, X: np.ndarray): + if len(X.shape) == 1: + X = X.reshape(-1, 1) + return X + + def psd(self, f_hat: np.ndarray, tau: int) -> np.ndarray: + """Power Spectral Density + + Parameters + ---------- + f_hat : np.ndarray + Signal in Frequency Domain + tau : int + Interval + + Returns + ------- + np.ndarray + Power spectrum """ - self.signal = signal - self._l_signal = len(self.signal) - self._f_hat = np.fft.fft(self.signal, self._l_signal) - - if self._method == 'psd': - x = super().psd(self._f_hat, self._l_signal) - else: - x = super().magnitude(self._f_hat, self._l_signal) - - above_thresh_flag = x > self._threshold - cleaned_f_hat = self._f_hat * above_thresh_flag - - return np.fft.ifft(cleaned_f_hat).real # type: ignore - - -if __name__ == '__main__': + return ((f_hat * np.conjugate(f_hat)) / tau).real - import matplotlib.pyplot as plt - from mightypy.make import sine_wave_from_timesteps + def transform(self, X: np.ndarray) -> np.ndarray: + """Apply PSD - time_step = 0.001 - wave1, time1, freqs1 = sine_wave_from_timesteps(signal_freq=50, time_step=time_step) - wave2, time2, freqs2 = sine_wave_from_timesteps(signal_freq=70, time_step=time_step) - original_signal = wave1 + wave2 + Parameters + ---------- + X : np.ndarray + Input matrix, signal in IOT Terms. - N = len(original_signal) - - noisy_signal = original_signal + 2.5 * np.random.randn(N) + 2.8 * np.random.randn(N) # adding random noise here - - model = DenoiseFFT('psd', 100) - cleaned_signal = model.transform(noisy_signal) - - plt.plot(original_signal, label='original') - plt.plot(noisy_signal, label='noisy') - plt.plot(cleaned_signal, label='cleaned') - plt.legend(loc='best') - plt.show() + Returns + ------- + np.ndarray + Denoised Signal + """ + X = self._reshape_X(X) + tau = len(X) + self._f_hat = np.fft.fft(X, tau, axis=0) + self._pxx = self.psd(self._f_hat, tau) + + if isinstance(self._threshold, str): + agg_func = getattr(np, self._threshold.split("-")[1]) + self._threshold = agg_func(self._pxx[np.int16(np.floor(tau / 2)) :]) + self._cutoff_mask = self._pxx > self._threshold + self._filtered_f_hat = self._f_hat * self._cutoff_mask + self._denoised_X = np.fft.ifft(self._filtered_f_hat, axis=0).real + return self._denoised_X + + @property + def threshold(self) -> Union[str, float, int]: + """Threshold calculated by the process + + In Power spectrum after the half length it takes the aggregation of the values + and use that as a threshold to cutoff frequencies that are insignificant. + + Returns + ------- + Union[str, float, int] + cutoff threshold value + """ + return self._threshold - model = DenoiseFFT('mag', 0.2) - cleaned_signal = model.transform(noisy_signal) + @property + def f_hat(self) -> Optional[np.ndarray]: + """FFT of input signal""" + return self._f_hat - plt.plot(original_signal, label='original') - plt.plot(noisy_signal, label='noisy') - plt.plot(cleaned_signal, label='cleaned') - plt.legend(loc='best') - plt.show() + @property + def filtered_f_hat(self) -> Optional[np.ndarray]: + """filtered FFT of input signal""" + return self._filtered_f_hat \ No newline at end of file diff --git a/src/mightypy/stats/_data_drift.py b/src/mightypy/stats/_data_drift.py index d6ecadb..765da04 100644 --- a/src/mightypy/stats/_data_drift.py +++ b/src/mightypy/stats/_data_drift.py @@ -3,12 +3,15 @@ ================== """ + from typing import Union import numpy as np import pandas as pd -def population_stability_index(expected: Union[list, np.ndarray], actual: Union[list, np.ndarray], data_type: str) -> pd.DataFrame: +def population_stability_index( + expected: Union[list, np.ndarray], actual: Union[list, np.ndarray], data_type: str +) -> pd.DataFrame: """ Populaion Stability Index. @@ -26,7 +29,7 @@ def population_stability_index(expected: Union[list, np.ndarray], actual: Union[ Examples: >>> import numpy as np >>> from mightypy.stats import population_stability_index - + continuous data >>> expected_continuous = np.random.normal(size=(500,)) >>> actual_continuous = np.random.normal(size=(500,)) @@ -39,33 +42,39 @@ def population_stability_index(expected: Union[list, np.ndarray], actual: Union[ >>> psi_df = population_stability_index(expected_discrete, actual_discrete, data_type='discrete') >>> psi_df.psi.sum() """ - if data_type == 'continuous': + if data_type == "continuous": max_val: Union[int, float] = np.max(expected) min_val: Union[int, float] = np.min(expected) ranges = np.linspace(min_val, max_val, 11)[1:-1] bins = [-np.inf, *ranges, np.inf] labels = [ - f"{idx+1} | {i[0]:.2f} to {i[1]:.2f}" for idx, i in enumerate(zip(bins[:-1], bins[1:])) + f"{idx+1} | {i[0]:.2f} to {i[1]:.2f}" + for idx, i in enumerate(zip(bins[:-1], bins[1:])) ] expected_cuts = pd.cut(expected, bins=bins, labels=labels).value_counts() actual_cuts = pd.cut(actual, bins=bins, labels=labels).value_counts() - elif data_type == 'discrete': + elif data_type == "discrete": expected_cuts = pd.Series(expected).value_counts() actual_cuts = pd.Series(actual).value_counts() else: - raise NotImplementedError(f"Method {data_type} is not implemented, or correct one. Try continuous, discrete.") - - calc_df = pd.concat([expected_cuts, actual_cuts], axis=1, - keys=['expected', 'actual']).sort_index() - calc_df[['expected %', 'actual %']] = ( - calc_df[['expected', 'actual']]/calc_df[['expected', 'actual']].sum(axis=0)) - calc_df['diff'] = calc_df['actual %'] - calc_df['expected %'] - calc_df['log(actual %/ expected %)'] = np.log(calc_df['actual %'] / - calc_df['expected %']) - calc_df['psi'] = calc_df['diff'] * calc_df['log(actual %/ expected %)'] + raise NotImplementedError( + f"Method {data_type} is not implemented, or correct one. Try continuous, discrete." + ) + + calc_df = pd.concat( + [expected_cuts, actual_cuts], axis=1, keys=["expected", "actual"] + ).sort_index() + calc_df[["expected %", "actual %"]] = calc_df[["expected", "actual"]] / calc_df[ + ["expected", "actual"] + ].sum(axis=0) + calc_df["diff"] = calc_df["actual %"] - calc_df["expected %"] + calc_df["log(actual %/ expected %)"] = np.log( + calc_df["actual %"] / calc_df["expected %"] + ) + calc_df["psi"] = calc_df["diff"] * calc_df["log(actual %/ expected %)"] return calc_df @@ -74,9 +83,17 @@ def population_stability_index(expected: Union[list, np.ndarray], actual: Union[ expected_continuous = np.random.normal(size=(500,)) actual_continuous = np.random.normal(size=(500,)) - expected_discrete = np.random.randint(0,10, size=(500,)) - actual_discrete = np.random.randint(0,10, size=(500,)) + expected_discrete = np.random.randint(0, 10, size=(500,)) + actual_discrete = np.random.randint(0, 10, size=(500,)) - print(population_stability_index(expected_continuous, actual_continuous, data_type='continuous')) + print( + population_stability_index( + expected_continuous, actual_continuous, data_type="continuous" + ) + ) - print(population_stability_index(expected_discrete, actual_discrete, data_type='discrete')) + print( + population_stability_index( + expected_discrete, actual_discrete, data_type="discrete" + ) + ) diff --git a/src/mightypy/stats/_feature_importance.py b/src/mightypy/stats/_feature_importance.py index c9cfc65..8302b91 100644 --- a/src/mightypy/stats/_feature_importance.py +++ b/src/mightypy/stats/_feature_importance.py @@ -31,7 +31,7 @@ class WOE_IV: Examples: >>> from sklearn.datasets import load_breast_cancer - >>> from mightypy.stats import WOE_IV + >>> from mightypy.stats import WOE_IV >>> dataset = load_breast_cancer(as_frame=True) >>> df = dataset.frame[['mean radius', 'target']] @@ -46,26 +46,34 @@ class WOE_IV: >>> fig.tight_layout() >>> fig.show() - or directly + or directly >>> fig, ax = obj.plot(df) >>> fig.show() """ - def __init__(self, event: str, non_event: str, target_col: str, bucket_col: str, - value_col: Optional[str] = None, agg_func: Callable = np.count_nonzero, - bucket_col_type: str = 'continuous', n_buckets: int = 10): + def __init__( + self, + event: str, + non_event: str, + target_col: str, + bucket_col: str, + value_col: Optional[str] = None, + agg_func: Callable = np.count_nonzero, + bucket_col_type: str = "continuous", + n_buckets: int = 10, + ): self._event = event self._non_event = non_event self._target_col = target_col self._bucket_col = bucket_col - self._bucket_col_name = f'buckets_{bucket_col}' + self._bucket_col_name = f"buckets_{bucket_col}" self._value_col = value_col self._agg_func = agg_func self._bucket_col_type = bucket_col_type self._n_buckets = n_buckets - self._perc_event_col_name: str = f'%_event_{event}' - self._perc_non_event_col_name: str = f'%_non_event_{non_event}' + self._perc_event_col_name: str = f"%_event_{event}" + self._perc_non_event_col_name: str = f"%_non_event_{non_event}" self._df: pd.DataFrame = None # type: ignore self._cal_df: pd.DataFrame = None # type: ignore self._iv: float = None # type: ignore @@ -84,51 +92,64 @@ def _calculate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, float]: Tuple[pd.DataFrame, float]: calculated dataframe and information value. """ if self._value_col is None: - self._value_col = 'values' + self._value_col = "values" self._df = df[[self._target_col, self._bucket_col]].copy() - self._df.insert(loc=0, column=self._value_col, value='x') + self._df.insert(loc=0, column=self._value_col, value="x") else: self._df = df[[self._target_col, self._bucket_col, self._value_col]].copy() - if self._bucket_col_type == 'continuous': - quantiles = np.linspace(0, 1, self._n_buckets+1) + if self._bucket_col_type == "continuous": + quantiles = np.linspace(0, 1, self._n_buckets + 1) self._df.insert( loc=0, column=self._bucket_col_name, - value=pd.qcut(self._df[self._bucket_col].values, q=quantiles, duplicates='raise', - retbins=False) + value=pd.qcut( + self._df[self._bucket_col].values, + q=quantiles, + duplicates="raise", + retbins=False, + ), + ) + elif self._bucket_col_type == "discrete": + self._df.insert( + loc=0, column=self._bucket_col_name, value=df[self._bucket_col] ) - elif self._bucket_col_type == 'discrete': - self._df.insert(loc=0, column=self._bucket_col_name, - value=df[self._bucket_col]) else: raise NotImplementedError - self._cal_df = pd.pivot_table(self._df, index=[self._bucket_col_name], columns=[self._target_col], - values=self._value_col, aggfunc=self._agg_func) + self._cal_df = pd.pivot_table( + self._df, + index=[self._bucket_col_name], + columns=[self._target_col], + values=self._value_col, + aggfunc=self._agg_func, + ) self._cal_df.fillna(value=0, inplace=True) - self._cal_df[['adj_event', 'adj_non_event']] = self._cal_df[ - [self._event, self._non_event]].apply(lambda x: (x+0.5) if (x[0] == 0 or x[1] == 0) else x, axis=1) + self._cal_df[["adj_event", "adj_non_event"]] = self._cal_df[ + [self._event, self._non_event] + ].apply(lambda x: (x + 0.5) if (x[0] == 0 or x[1] == 0) else x, axis=1) event_sum = self._cal_df[self._event].sum() - self._cal_df[self._perc_event_col_name] = self._cal_df['adj_event'] / event_sum + self._cal_df[self._perc_event_col_name] = self._cal_df["adj_event"] / event_sum non_event_sum = self._cal_df[self._non_event].sum() - self._cal_df[self._perc_non_event_col_name] = self._cal_df['adj_non_event'] / non_event_sum + self._cal_df[self._perc_non_event_col_name] = ( + self._cal_df["adj_non_event"] / non_event_sum + ) - self._cal_df['woe'] = np.log( - self._cal_df[self._perc_non_event_col_name] / - self._cal_df[self._perc_event_col_name] + self._cal_df["woe"] = np.log( + self._cal_df[self._perc_non_event_col_name] + / self._cal_df[self._perc_event_col_name] ) - self._cal_df['iv'] = ( - self._cal_df[self._perc_non_event_col_name] - - self._cal_df[self._perc_event_col_name] - ) * self._cal_df['woe'] + self._cal_df["iv"] = ( + self._cal_df[self._perc_non_event_col_name] + - self._cal_df[self._perc_event_col_name] + ) * self._cal_df["woe"] - self._iv: float = self._cal_df['iv'].sum() + self._iv: float = self._cal_df["iv"].sum() return self._cal_df, self._iv def values(self, df: Optional[pd.DataFrame] = None) -> Tuple[pd.DataFrame, float]: @@ -149,7 +170,8 @@ def values(self, df: Optional[pd.DataFrame] = None) -> Tuple[pd.DataFrame, float if df is None: if self._df is None: raise ValueError( - "dataframe doesn't exist. Please insert dataframe.") + "dataframe doesn't exist. Please insert dataframe." + ) else: self._calculate(self._df) else: @@ -175,7 +197,8 @@ def plot(self, df: Optional[pd.DataFrame] = None, figsize=(10, 5)) -> plt.Figure if df is None: if self._df is None: raise ValueError( - "dataframe doesn't exist. Please insert dataframe.") + "dataframe doesn't exist. Please insert dataframe." + ) else: self._calculate(self._df) else: @@ -185,36 +208,54 @@ def plot(self, df: Optional[pd.DataFrame] = None, figsize=(10, 5)) -> plt.Figure ranges = np.arange(0, self._n_buckets, step=1) fig, _ax = plt.subplots(1, 2, figsize=figsize) - _ax[0].set_xlim(left=self._cal_df['woe'].min() - 2, # type: ignore - right=self._cal_df['woe'].max() + 2) - _ax[0].barh(y=idxs, width=self._cal_df['woe'], # type: ignore - color='blue', alpha=0.6) # type: ignore + _ax[0].set_xlim( + left=self._cal_df["woe"].min() - 2, # type: ignore + right=self._cal_df["woe"].max() + 2, + ) + _ax[0].barh( + y=idxs, width=self._cal_df["woe"], color="blue", alpha=0.6 # type: ignore + ) # type: ignore for i in _ax[0].containers: # type: ignore - _ax[0].bar_label(i, fmt='%.3f', padding=5) # type: ignore + _ax[0].bar_label(i, fmt="%.3f", padding=5) # type: ignore _ax[0].grid(alpha=0.2) # type: ignore _ax[0].set_xlabel(None) # type: ignore _ax[0].set_ylabel(None) # type: ignore - _ax[0].set_title('Weight Of Evidence') # type: ignore - - _ax[1].barh(y=ranges-0.2, width=self._cal_df[self._event], # type: ignore - color='red', alpha=0.6, label=self._event, height=0.4) - _ax[1].barh(y=ranges+0.2, width=self._cal_df[self._non_event], # type: ignore - color='green', alpha=0.6, label=self._non_event, height=0.4) + _ax[0].set_title("Weight Of Evidence") # type: ignore + + _ax[1].barh( + y=ranges - 0.2, + width=self._cal_df[self._event], # type: ignore + color="red", + alpha=0.6, + label=self._event, + height=0.4, + ) + _ax[1].barh( + y=ranges + 0.2, + width=self._cal_df[self._non_event], # type: ignore + color="green", + alpha=0.6, + label=self._non_event, + height=0.4, + ) _ax[1].set_yticks(ranges) # type: ignore _ax[1].set_yticklabels(idxs) # type: ignore for i in _ax[1].containers: # type: ignore - _ax[1].bar_label(i, fmt='%.0f', padding=5) # type: ignore + _ax[1].bar_label(i, fmt="%.0f", padding=5) # type: ignore _ax[1].grid(alpha=0.2) # type: ignore _ax[1].set_ylabel(None) # type: ignore - _ax[1].set_title('Deciles') # type: ignore - _ax[1].legend(bbox_to_anchor=(1.5, 1), loc='upper right') # type: ignore + _ax[1].set_title("Deciles") # type: ignore + _ax[1].legend(bbox_to_anchor=(1.5, 1), loc="upper right") # type: ignore - fig.suptitle(f""" + fig.suptitle( + f""" {self._bucket_col} ======================================= Information Value : {self._iv:.3f} --------------------------------------- - """, fontsize=12) + """, + fontsize=12, + ) return fig