diff --git a/pydbsmgr/health.py b/pydbsmgr/health.py index 1c38859..4c2080e 100644 --- a/pydbsmgr/health.py +++ b/pydbsmgr/health.py @@ -1,6 +1,6 @@ import os from abc import abstractmethod -from typing import List +from typing import List, Union import missingno as msno import numpy as np @@ -11,16 +11,21 @@ from pydbsmgr.main import check_dtypes, clean_transform, drop_empty_columns, intersection_cols +# Configure logging globally +LOG_FILE = "report_{time}.log" +logger.add(LOG_FILE, rotation="100 KB") + class FrameCheck: """Class for checking and transforming a `DataFrame`/dataframes""" - def __init__(self, _df: DataFrame | List[DataFrame], df_names: str | List[str] = None) -> None: + def __init__( + self, _df: Union[DataFrame, List[DataFrame]], df_names: Union[str, List[str]] = None + ) -> None: self.df_names = df_names + if isinstance(self.df_names, str): - logger.add(self.df_names + "_{time}.log", rotation="100 KB") - else: - logger.add("report_{time}.log", rotation="100 KB") + logger.add(f"{self.df_names}_{{time}}.log", rotation="100 KB") if isinstance(_df, list): self._dfs = _df @@ -80,45 +85,34 @@ def generate_report( self.df_files_info = pd.DataFrame() self.yaml_name = yaml_name self.database_name = database_name + if not os.path.exists(directory_name): os.mkdir(directory_name) - warning_type = "UserWarning" - msg = "The directory {%s} was created" % directory_name - print(f"{warning_type}: {msg}") - logger.info(f"The {directory_name} directory has been created.") + logger.warning(f"The {directory_name} directory has been created.") if concat_vertically: self._dfs = intersection_cols(self._dfs) self._dfs = [pd.concat(self._dfs, axis=0)] for j, df in enumerate(self._dfs): - ax = msno.matrix(self._dfs[j]) - ax.get_figure().savefig("./" + directory_name + f"/{j}_msno_report.png", dpi=300) - info = [] - for col in df.columns: - try: - if col.find("unnamed") == -1: - nrows = df.isnull().sum()[col] + df[col].count() - nrows_missing = df.isnull().sum()[col] - percentage = nrows_missing / nrows - datatype = df.dtypes[col] - unique_vals = len(df[col].unique()) - info.append( - [ - col, - datatype, - self.df_names[j], - nrows, - nrows_missing, - percentage, - unique_vals, - ] - ) - - except: - pass # column doesn't exist in this dataframe - info = np.array(info).reshape((-1, 7)) - info = pd.DataFrame( + ax = msno.matrix(df) + ax.get_figure().savefig(f"./{directory_name}/{j}_msno_report.png", dpi=300) + + info = [ + [ + col, + str(df.dtypes[col]), + self.df_names[j], + len(df), + df.isnull().sum()[col], + df.isnull().sum()[col] / len(df), + len(df[col].unique()), + ] + for col in df.columns + if "unnamed" not in col.lower() + ] + + info_df = pd.DataFrame( info, columns=[ "column name", @@ -130,50 +124,43 @@ def generate_report( "unique values", ], ) - info["# missing rows (percentage)"] = info["# missing rows (percentage)"].apply( - lambda x: "{:.2%}".format(float(x)) - ) - info["# rows"] = info["# rows"].apply(lambda x: "{:,}".format(int(x))) - info["# missing rows"] = info["# missing rows"].apply(lambda x: "{:,}".format(int(x))) - info["unique values"] = info["unique values"].apply(lambda x: "{:,}".format(int(x))) + self._format_info_df(info_df) logger.info(f"DataFrame '{self.df_names[j]}' has been processed") - self.df_files_info = pd.concat([self.df_files_info, info]) + self.df_files_info = pd.concat([self.df_files_info, info_df]) self.df_files_info.to_html(report_name, index=False, encoding=encoding) logger.info(f"A report has been created under the name '{report_name}'") - self.df_files_info["data type"] = [ - str(_type) for _type in self.df_files_info["data type"].to_list() - ] - self.df_files_info["sql name"] = [ - col_name.replace(" ", "_") for col_name in self.df_files_info["column name"] - ] - self._create_yaml_tree() + def _format_info_df(self, df: DataFrame) -> None: + df["# missing rows (percentage)"] = df["# missing rows (percentage)"].apply( + lambda x: f"{x:.2%}" + ) + df["# rows"] = df["# rows"].apply(lambda x: f"{int(x):,}") + df["# missing rows"] = df["# missing rows"].apply(lambda x: f"{int(x):,}") + df["unique values"] = df["unique values"].apply(lambda x: f"{int(x):,}") + @abstractmethod - def _ops_dtypes(self, df, count) -> DataFrame: + def _ops_dtypes(self, df: DataFrame, count: int) -> DataFrame: """Processing of data types""" df = check_dtypes(df, df.dtypes) logger.info(f"{count+1}) The data type has been verified.") df = df.replace(r"(nan|Nan)", np.nan, regex=True) logger.info(f"{count+1}) The `nan` strings have been replaced by `np.nan`.") - df = df.loc[:, ~df.columns.str.contains("^unnamed")] + df = df.loc[:, ~df.columns.str.contains("^unnamed", case=False)] logger.info(f"{count+1}) Only the named columns have been retained.") return df def _create_yaml_tree(self) -> None: """Function that creates a `yaml` configuration file for database data type validation.""" - data = {} - for col_name, data_type, sql_name in zip( - self.df_files_info["column name"].to_list(), - self.df_files_info["data type"], - self.df_files_info["sql name"].to_list(), - ): - data[col_name] = {"type": [data_type], "sql_name": [sql_name]} - - yaml_data = yaml.dump({self.database_name: data}) + data = { + col_name: {"type": [data_type], "sql_name": [col_name.replace(" ", "_")]} + for col_name, data_type in zip( + self.df_files_info["column name"].to_list(), self.df_files_info["data type"] + ) + } with open(self.yaml_name, "w") as file: - file.write(yaml_data) + yaml.dump({self.database_name: data}, file)