Skip to content

Commit

Permalink
Update health.py
Browse files Browse the repository at this point in the history
  • Loading branch information
jzsmoreno committed Dec 5, 2024
1 parent bc00097 commit 135d293
Showing 1 changed file with 49 additions and 62 deletions.
111 changes: 49 additions & 62 deletions pydbsmgr/health.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from abc import abstractmethod
from typing import List
from typing import List, Union

import missingno as msno
import numpy as np
Expand All @@ -11,16 +11,21 @@

from pydbsmgr.main import check_dtypes, clean_transform, drop_empty_columns, intersection_cols

# Configure logging globally
LOG_FILE = "report_{time}.log"
logger.add(LOG_FILE, rotation="100 KB")


class FrameCheck:
"""Class for checking and transforming a `DataFrame`/dataframes"""

def __init__(self, _df: DataFrame | List[DataFrame], df_names: str | List[str] = None) -> None:
def __init__(
self, _df: Union[DataFrame, List[DataFrame]], df_names: Union[str, List[str]] = None
) -> None:
self.df_names = df_names

if isinstance(self.df_names, str):
logger.add(self.df_names + "_{time}.log", rotation="100 KB")
else:
logger.add("report_{time}.log", rotation="100 KB")
logger.add(f"{self.df_names}_{{time}}.log", rotation="100 KB")

if isinstance(_df, list):
self._dfs = _df
Expand Down Expand Up @@ -80,45 +85,34 @@ def generate_report(
self.df_files_info = pd.DataFrame()
self.yaml_name = yaml_name
self.database_name = database_name

if not os.path.exists(directory_name):
os.mkdir(directory_name)
warning_type = "UserWarning"
msg = "The directory {%s} was created" % directory_name
print(f"{warning_type}: {msg}")
logger.info(f"The {directory_name} directory has been created.")
logger.warning(f"The {directory_name} directory has been created.")

if concat_vertically:
self._dfs = intersection_cols(self._dfs)
self._dfs = [pd.concat(self._dfs, axis=0)]

for j, df in enumerate(self._dfs):
ax = msno.matrix(self._dfs[j])
ax.get_figure().savefig("./" + directory_name + f"/{j}_msno_report.png", dpi=300)
info = []
for col in df.columns:
try:
if col.find("unnamed") == -1:
nrows = df.isnull().sum()[col] + df[col].count()
nrows_missing = df.isnull().sum()[col]
percentage = nrows_missing / nrows
datatype = df.dtypes[col]
unique_vals = len(df[col].unique())
info.append(
[
col,
datatype,
self.df_names[j],
nrows,
nrows_missing,
percentage,
unique_vals,
]
)

except:
pass # column doesn't exist in this dataframe
info = np.array(info).reshape((-1, 7))
info = pd.DataFrame(
ax = msno.matrix(df)
ax.get_figure().savefig(f"./{directory_name}/{j}_msno_report.png", dpi=300)

info = [
[
col,
str(df.dtypes[col]),
self.df_names[j],
len(df),
df.isnull().sum()[col],
df.isnull().sum()[col] / len(df),
len(df[col].unique()),
]
for col in df.columns
if "unnamed" not in col.lower()
]

info_df = pd.DataFrame(
info,
columns=[
"column name",
Expand All @@ -130,50 +124,43 @@ def generate_report(
"unique values",
],
)
info["# missing rows (percentage)"] = info["# missing rows (percentage)"].apply(
lambda x: "{:.2%}".format(float(x))
)
info["# rows"] = info["# rows"].apply(lambda x: "{:,}".format(int(x)))
info["# missing rows"] = info["# missing rows"].apply(lambda x: "{:,}".format(int(x)))
info["unique values"] = info["unique values"].apply(lambda x: "{:,}".format(int(x)))

self._format_info_df(info_df)
logger.info(f"DataFrame '{self.df_names[j]}' has been processed")
self.df_files_info = pd.concat([self.df_files_info, info])
self.df_files_info = pd.concat([self.df_files_info, info_df])

self.df_files_info.to_html(report_name, index=False, encoding=encoding)
logger.info(f"A report has been created under the name '{report_name}'")

self.df_files_info["data type"] = [
str(_type) for _type in self.df_files_info["data type"].to_list()
]
self.df_files_info["sql name"] = [
col_name.replace(" ", "_") for col_name in self.df_files_info["column name"]
]

self._create_yaml_tree()

def _format_info_df(self, df: DataFrame) -> None:
df["# missing rows (percentage)"] = df["# missing rows (percentage)"].apply(
lambda x: f"{x:.2%}"
)
df["# rows"] = df["# rows"].apply(lambda x: f"{int(x):,}")
df["# missing rows"] = df["# missing rows"].apply(lambda x: f"{int(x):,}")
df["unique values"] = df["unique values"].apply(lambda x: f"{int(x):,}")

@abstractmethod
def _ops_dtypes(self, df, count) -> DataFrame:
def _ops_dtypes(self, df: DataFrame, count: int) -> DataFrame:
"""Processing of data types"""
df = check_dtypes(df, df.dtypes)
logger.info(f"{count+1}) The data type has been verified.")
df = df.replace(r"(nan|Nan)", np.nan, regex=True)
logger.info(f"{count+1}) The `nan` strings have been replaced by `np.nan`.")
df = df.loc[:, ~df.columns.str.contains("^unnamed")]
df = df.loc[:, ~df.columns.str.contains("^unnamed", case=False)]
logger.info(f"{count+1}) Only the named columns have been retained.")
return df

def _create_yaml_tree(self) -> None:
"""Function that creates a `yaml` configuration file for database data type validation."""
data = {}
for col_name, data_type, sql_name in zip(
self.df_files_info["column name"].to_list(),
self.df_files_info["data type"],
self.df_files_info["sql name"].to_list(),
):
data[col_name] = {"type": [data_type], "sql_name": [sql_name]}

yaml_data = yaml.dump({self.database_name: data})
data = {
col_name: {"type": [data_type], "sql_name": [col_name.replace(" ", "_")]}
for col_name, data_type in zip(
self.df_files_info["column name"].to_list(), self.df_files_info["data type"]
)
}

with open(self.yaml_name, "w") as file:
file.write(yaml_data)
yaml.dump({self.database_name: data}, file)

0 comments on commit 135d293

Please sign in to comment.