diff --git a/pydbsmgr/lightest.py b/pydbsmgr/lightest.py index 52b2377..93dee5d 100644 --- a/pydbsmgr/lightest.py +++ b/pydbsmgr/lightest.py @@ -93,7 +93,10 @@ def clean_frame( """ table = self.df.copy() cols = table.columns - table_sample = table.sample(frac=sample_frac, replace=False) + if sample_frac != 1.0: + table_sample = table.sample(frac=sample_frac, replace=False) + else: + table_sample = table.copy() errors = kwargs.get("errors", "ignore") for column_index, datatype in enumerate(table.dtypes): diff --git a/pydbsmgr/logs.py b/pydbsmgr/logs.py index 4c41c0b..421a898 100644 --- a/pydbsmgr/logs.py +++ b/pydbsmgr/logs.py @@ -2,113 +2,121 @@ from datetime import datetime from typing import List -import numpy as np import pandas as pd -from pandas.core.frame import DataFrame class EventLogger: """Allows you to store screen prints in a plain text file.""" - def __init__(self, FILE_NAME: str, FILE_PATH: str) -> None: - self.FILE_NAME = FILE_NAME + ".txt" - self.FILE_PATH = FILE_PATH - if os.path.isfile(self.FILE_PATH + self.FILE_NAME): + def __init__(self, file_name: str, file_path: str) -> None: + self.file_name = f"{file_name}.txt" + self.file_path = file_path + self._check_file_existence() + + def _check_file_existence(self): + if os.path.isfile(self.full_path): warning_type = "UserWarning" - msg = "The log file {%s} already exists, the changes will be added." % self.FILE_NAME + msg = f"The log file {self.file_name} already exists, the changes will be added." print(f"{warning_type}: {msg}") - self.FILE = open(self.FILE_PATH + self.FILE_NAME, "a") + + @property + def full_path(self) -> str: + return os.path.join(self.file_path, self.file_name) def audit(self): - if os.path.isfile(self.FILE_PATH + self.FILE_NAME): - msg = ( - "The log file {%s} already exists, do you want to delete it? (True/False): " - % self.FILE_NAME - ) - delete = input(msg) - if delete == "True": - os.remove(self.FILE_PATH + self.FILE_NAME) - warning_type = "UserWarning" - msg = "The log file {%s} has been deleted successfully" % self.FILE_NAME - print(f"{warning_type}: {msg}") - else: - msg = "Reading the log file: {%s}" % self.FILE_NAME - print(f"{msg}") - with open(self.FILE_PATH + self.FILE_NAME, "r") as file: - log = file.read() - return log - else: + if not os.path.isfile(self.full_path): warning_type = "UserWarning" - msg = "The log file {%s} does not exist" % self.FILE_NAME + msg = f"The log file {self.file_name} does not exist." print(f"{warning_type}: {msg}") + return + + delete = ( + input(f"Do you want to delete the log file {self.file_name}? (y/N): ").strip().lower() + ) + if delete == "y": + os.remove(self.full_path) + warning_type = "UserWarning" + msg = f"The log file {self.file_name} has been deleted successfully." + print(f"{warning_type}: {msg}") + else: + msg = f"Reading the log file: {self.file_name}" + print(msg) + with open(self.full_path, "r") as file: + return file.read() def writer(self, *chars: str) -> None: - self.FILE = open(self.FILE_PATH + self.FILE_NAME, "a") - date = str(datetime.now().strftime("%d/%m/%Y %H:%M:%S ")) - print(date, end="") - for item in chars: - print(item, end="") - print("\n") - print(date, end="", file=self.FILE) - for item in chars: - print(item, end="", file=self.FILE) - print("\n", file=self.FILE) - self.FILE.close() + date = datetime.now().strftime("%d/%m/%Y %H:%M:%S ") + output_line = date + " ".join(chars) + "\n" + + print(output_line, end="") + with open(self.full_path, "a") as file: + file.write(output_line) class EventLogBook: """Allows you to create and write new lines in a logbook.""" - def __init__(self, FILE_NAME: str, FILE_PATH: str) -> None: - self.FILE_NAME = FILE_NAME + ".csv" - self.FILE_PATH = FILE_PATH - if os.path.isfile(self.FILE_PATH + self.FILE_NAME): + def __init__(self, file_name: str, file_path: str) -> None: + self.file_name = f"{file_name}.csv" + self.file_path = file_path + self._check_file_existence() + + @property + def full_path(self) -> str: + return os.path.join(self.file_path, self.file_name) + + def _check_file_existence(self): + if os.path.isfile(self.full_path): warning_type = "UserWarning" - msg = ( - "The logbook file {%s} already exists, the changes will be added." % self.FILE_NAME - ) + msg = f"The logbook file {self.file_name} already exists, the changes will be added." print(f"{warning_type}: {msg}") - def create(self, df: DataFrame, encoding: str = "latin1") -> None: + def create(self, df: pd.DataFrame, encoding: str = "latin1") -> None: self._encoding = encoding - df.to_csv(self.FILE_PATH + self.FILE_NAME, index=False, encoding=self._encoding) + df.to_csv(self.full_path, index=False, encoding=self._encoding) def audit(self, encoding: str = "latin1"): - self._encoding = encoding - if os.path.isfile(self.FILE_PATH + self.FILE_NAME): - msg = ( - "The log file {%s} already exists, do you want to delete it? (True/False): " - % self.FILE_NAME - ) - delete = input(msg) - if delete == "True": - os.remove(self.FILE_PATH + self.FILE_NAME) - warning_type = "UserWarning" - msg = "The log file {%s} has been deleted successfully" % self.FILE_NAME - print(f"{warning_type}: {msg}") - else: - msg = "Reading the log file: {%s}" % self.FILE_NAME - print(f"{msg}") - return pd.read_csv(self.FILE_PATH + self.FILE_NAME, encoding=self._encoding) - else: + if not os.path.isfile(self.full_path): warning_type = "UserWarning" - msg = "The log file {%s} does not exist" % self.FILE_NAME + msg = f"The logbook file {self.file_name} does not exist." print(f"{warning_type}: {msg}") + return - def update(self, rows: List[str]) -> None: + delete = ( + input(f"Do you want to delete the logbook file {self.file_name}? (y/N): ") + .strip() + .lower() + ) + if delete == "y": + os.remove(self.full_path) + warning_type = "UserWarning" + msg = f"The logbook file {self.file_name} has been deleted successfully." + print(f"{warning_type}: {msg}") + else: + return pd.read_csv(self.full_path, encoding=self._encoding) + + def update(self, rows: List[List[str]]) -> None: try: - df = pd.read_csv(self.FILE_PATH + self.FILE_NAME, encoding=self._encoding) - to_add = rows - df_to_add = pd.DataFrame(np.array(to_add).reshape((1, -1)), columns=df.columns) - df = pd.concat([df, df_to_add]) - df.to_csv(self.FILE_PATH + self.FILE_NAME, index=False, encoding=self._encoding) - except: + df = pd.read_csv(self.full_path, encoding=self._encoding) + new_df = pd.DataFrame(rows, columns=df.columns) + df = pd.concat([df, new_df], ignore_index=True) + df.to_csv(self.full_path, index=False, encoding=self._encoding) + + except FileNotFoundError: warning_type = "UserWarning" msg = ( - "The logbook {%s} has not been created, so it is going to be created" - % self.FILE_NAME + f"The logbook {self.file_name} has not been created, so it is going to be created." ) print(f"{warning_type}: {msg}") - to_add = rows - df = pd.DataFrame(np.array(to_add).reshape((1, -1)), columns=df.columns) - df.to_csv(to_add=(rows), index=False, encoding=self._encoding) + new_df = pd.DataFrame(rows) + new_df.to_csv(self.full_path, index=False, encoding=self._encoding) + + +# Usage example +if __name__ == "__main__": + logger = EventLogger("example_log", "./") + logger.writer("This is a test log entry.") + + logbook = EventLogBook("example_logbook", "./") + logbook.create(pd.DataFrame({"Column1": [1, 2], "Column2": ["A", "B"]})) + logbook.update([[3, "C"]]) diff --git a/pydbsmgr/utils/tools.py b/pydbsmgr/utils/tools.py index d5b1f79..d0751f3 100644 --- a/pydbsmgr/utils/tools.py +++ b/pydbsmgr/utils/tools.py @@ -13,7 +13,6 @@ import pyarrow as pa import pyarrow.parquet as pq import yaml -from numpy import datetime64 from pandas.core.frame import DataFrame from pandas.errors import IntCastingNaNError from pyarrow import Table @@ -26,27 +25,26 @@ def disableprints(func: Callable) -> Callable: """Decorator to temporarily suppress print statements in a function""" def wrapper(*args, **kwargs): - sys.stdout = open(os.devnull, "w") - result = func(*args, **kwargs) - sys.stdout = sys.__stdout__ + with open(os.devnull, "w") as devnull: + old_stdout = sys.stdout + sys.stdout = devnull + result = func(*args, **kwargs) + sys.stdout = old_stdout return result - # Preserve the function's docstring if it has one if func.__doc__ is not None: wrapper.__doc__ = func.__doc__ - return wrapper - else: - return func + return wrapper -def most_repeated_item(items: list, two_most_common: bool = False) -> Tuple[str, str | None]: +def most_repeated_item(items: List[str], two_most_common: bool = False) -> Tuple[str, str | None]: """Returns a `Tuple` with the most common elements of a `list`. Parameters ---------- - items : `list` - the `list` containing the items to be evaluated. - two_most_common : `bool`, `optional` + items : `List[str]` + The list containing the items to be evaluated. + two_most_common : `bool`, optional If `False`, returns only one element. Defaults to `False`. Returns @@ -54,37 +52,30 @@ def most_repeated_item(items: list, two_most_common: bool = False) -> Tuple[str, Tuple[`str`, `str` | `None`] The two most common elements. """ - # Use Counter to count occurrences of each item in the list counter = Counter(items) - - # Find the two most common items and its count most_common = counter.most_common(2) if two_most_common: - if len(most_common) == 2: - item1, _ = most_common[0] - item2, _ = most_common[1] - return item1, item2 - else: - item, _ = most_common[0] - return item, None + return tuple(item for item, _ in most_common) + (None,) * (2 - len(most_common)) else: - item, _ = most_common[0] - return item, None + return most_common[0], None def generate_secure_password(pass_len: int = 24) -> str: """ Generate a secure password with the length specified """ - config = load_config("./pydbsmgr/utils/config.ini") - config = parse_config(config) - password = "" + config = parse_config(load_config("./pydbsmgr/utils/config.ini")) char_matrix = config["security"]["char_matrix"] - for _ in range(pass_len): - password = password + random.choice(char_matrix) + return "".join(random.choice(char_matrix) for _ in range(pass_len)) + - return password +def coerce_datetime(x: str) -> np.datetime64: + try: + x = x.replace("-", "") + return pd.to_datetime(x, format="%Y%m%d") + except ValueError: + return np.datetime64("NaT") class ColumnsCheck: @@ -93,10 +84,10 @@ class ColumnsCheck: def __init__(self, df: DataFrame): self.df = df - def get_frame(self, **kwargs) -> DataFrame: - return self._process_columns(**kwargs) + def get_frame(self, surrounding: bool = True) -> DataFrame: + return self._process_columns(surrounding) - def _process_columns(self, surrounding: bool = True) -> DataFrame: + def _process_columns(self, surrounding: bool) -> DataFrame: df = self.df.copy() df.columns = ( df.columns.str.lower() @@ -104,7 +95,7 @@ def _process_columns(self, surrounding: bool = True) -> DataFrame: .str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True) .str.replace("_+", "_", regex=True) .str.strip() - .str.strip("_") + .str.rstrip("_") ) if surrounding: df.columns = [f"[{col}]" for col in df.columns] @@ -112,8 +103,8 @@ def _process_columns(self, surrounding: bool = True) -> DataFrame: class ControllerFeatures: - def __init__(self, _container_client): - self._container_client = _container_client + def __init__(self, container_client): + self.container_client = container_client def write_pyarrow( self, @@ -121,25 +112,9 @@ def write_pyarrow( pytables: List[Table], names: List[str], overwrite: bool = True, - ) -> None: - """Write pyarrow table as `parquet` format""" - format_type = "parquet" - files_not_loaded = [] - for table, blob_name in zip(pytables, names): - if table != None: - buf = pa.BufferOutputStream() - pq.write_table(table, buf) - parquet_data = buf.getvalue().to_pybytes() - blob_path_name = directory_name + "/" + blob_name - self._container_client.upload_blob( - name=blob_path_name + "." + format_type, - data=parquet_data, - overwrite=overwrite, - ) - else: - files_not_loaded.append(blob_name) - if len(files_not_loaded) > 0: - return files_not_loaded + ) -> List[str] | None: + """Write PyArrow tables as Parquet format.""" + return self._write_tables(directory_name, pytables, names, "parquet", overwrite) def write_parquet( self, @@ -148,83 +123,72 @@ def write_parquet( names: List[str], overwrite: bool = True, upload: bool = True, - ) -> None: - """Write dataframes as `parquet` format by converting them first into `bytes`.""" - files = [] - format_type = "parquet" - files_not_loaded = [] - for data, blob_name in zip(dfs, names): - if data != None: - table = pa.Table.from_pandas(data) + ) -> List[str] | List[bytes] | None: + """Write DataFrames as Parquet format by converting them to bytes first.""" + pytables = [pa.Table.from_pandas(df) for df in dfs] + return self._write_tables(directory_name, pytables, names, "parquet", overwrite, upload) + + def _write_tables( + self, + directory_name: str, + tables: List[Table], + names: List[str], + format_type: str, + overwrite: bool = True, + upload: bool = True, + ) -> List[str] | List[bytes] | None: + files_not_loaded, collected_files = [], [] + for table, blob_name in zip(tables, names): + if table is not None: buf = pa.BufferOutputStream() pq.write_table(table, buf) parquet_data = buf.getvalue().to_pybytes() - blob_path_name = directory_name + "/" + blob_name + blob_path_name = os.path.join(directory_name, f"{blob_name}.{format_type}") if upload: - self._container_client.upload_blob( - name=blob_path_name + "." + format_type, + self.container_client.upload_blob( + name=blob_path_name, data=parquet_data, overwrite=overwrite, ) else: - files.append(parquet_data) + collected_files.append(parquet_data) else: files_not_loaded.append(blob_name) - if len(files_not_loaded) > 0: - return files_not_loaded - if not upload: - return files + return files_not_loaded or (collected_files if not upload else None) def column_coincidence(df1: DataFrame, df2: DataFrame) -> float: - """Return the percentage of coincident columns between two pandas dataframes.""" - if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame): - raise ValueError("Both inputs should be pandas DataFrames") + """Return the percentage of coincident columns between two pandas DataFrames.""" + set_columns1 = set(df1.columns) + set_columns2 = set(df2.columns) - column_names1 = set(df1.columns) - column_names2 = set(df2.columns) + common_columns = set_columns1.intersection(set_columns2) + total_columns = set_columns1.union(set_columns2) - common_columns = column_names1.intersection(column_names2) - total_columns = column_names1.union(column_names2) - - coincidence_percentage = len(common_columns) / len(total_columns) - return coincidence_percentage + return len(common_columns) / len(total_columns) def merge_by_coincidence(df1: DataFrame, df2: DataFrame, tol: float = 0.9) -> DataFrame: - """Merge two pandas dataframes by finding the most similar columns based on their names.""" + """Merge two pandas DataFrames by finding the most similar columns based on their names.""" percentage = column_coincidence(df1, df2) - total_columns = set(df1.columns).union(set(df2.columns)) - num_col1 = len(df1.columns) - num_col2 = len(df2.columns) - if num_col1 < num_col2: - min_cols = set(df1.columns) - min_cols = list(min_cols.intersection(set(df2.columns))) - else: - min_cols = set(df2.columns) - min_cols = list(min_cols.intersection(set(df1.columns))) - - df2 = (df2[min_cols]).copy() - df1 = (df1[min_cols]).copy() - diff = total_columns.difference(set(min_cols)) - - df = pd.concat([df1, df2], ignore_index=True) - if percentage > tol: - print("The following columns were lost : ", diff) - else: + if percentage < tol: print( - f"The following columns were missed with a match percentage of {percentage*100:.2f}% : ", - diff, + f"The following columns were missed with a match percentage of {percentage*100:.2f}%: " + f"{set(df1.columns).union(set(df2.columns)) - set(df1.columns).intersection(set(df2.columns))}" ) - return df + + common_cols = set(df1.columns).intersection(set(df2.columns)) + df_combined = pd.concat([df1[common_cols], df2[common_cols]], ignore_index=True) + + return df_combined def terminate_process(file_path: str) -> None: """Terminate the process holding the file.""" for proc in psutil.process_iter(["pid", "open_files"]): try: - if any(file_path in file_info.path for file_info in proc.open_files()): + if any(file_info.path == file_path for file_info in proc.info["open_files"] or []): print(f"Terminating process {proc.pid} holding the file.") proc.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): @@ -233,17 +197,17 @@ def terminate_process(file_path: str) -> None: def erase_files(format: str = "log") -> None: """Erase all files with the given format.""" - for filename in glob.glob("*." + format): + for filename in glob.glob(f"*.{format}"): try: os.remove(filename) - except: + except OSError: terminate_process(filename) os.remove(filename) def get_extraction_date( filename: str | List[str], REGEX_PATTERN: str = r"\d{4}-\d{2}-\d{2}" -) -> str: +) -> str | List[str]: """Allows to extract the date of extraction according to the directory within the storage account. Parameters @@ -255,25 +219,17 @@ def get_extraction_date( Returns ------- - `str` + `Union[str, List[str]]` the date that was extracted if found in the file path. """ - def sub_extraction_date(filename: str, REGEX_PATTERN: str) -> str: + def sub_extraction_date(filename: str) -> str: extraction_date = re.findall(REGEX_PATTERN, filename) - if len(extraction_date) > 0: - _date = extraction_date[0] - else: - _date = "" - return _date + return extraction_date[0] if extraction_date else "" - if type(filename) == str: - return sub_extraction_date(filename, REGEX_PATTERN) - elif isinstance(filename, list): - dates = [] - for name in filename: - dates.append(sub_extraction_date(name, REGEX_PATTERN)) - return dates + if isinstance(filename, list): + return [sub_extraction_date(name) for name in filename] + return sub_extraction_date(filename) class ColumnsDtypes: @@ -295,86 +251,45 @@ def correct( def get_frame(self) -> DataFrame: return self.df - def _check_int_float(self, drop_values: bool = False, drop_rows: bool = False) -> None: + def _check_int_float(self, drop_values: bool, drop_rows: bool) -> None: """Check and correct the data types of columns in a `DataFrame`.""" - def check_float(x): - if isinstance(x, str): - try: - return float(x) - except: - return np.nan - else: - return x - - def check_int(x): - if isinstance(x, str): - try: - return int(x) - except: - return "" - else: - return x - - df_ = (self.df).copy() - if drop_values: - if len(df_) < 1e5: - for col in df_.columns: - value = str(df_[col].iloc[0]) - val_dtype = None - if is_number_regex(value): - if type(check_float(value)).__name__ == "float": - with concurrent.futures.ThreadPoolExecutor() as executor: - df_[col] = list(executor.map(check_float, df_[col])) - df_[col] = df_[col].astype("float64") - val_dtype = "float64" - print("Checking {%s} for column {%s}" % (val_dtype, col)) - - if type(check_int(value)).__name__ == "int" and val_dtype == None: - with concurrent.futures.ThreadPoolExecutor() as executor: - df_[col] = list(executor.map(check_int, df_[col])) - if drop_rows: - df_ = df_.loc[df_[col].notnull()] - try: - df_[col] = df_[col].astype("int64") - val_dtype = "int64" - print("Checking {%s} for column {%s}" % (val_dtype, col)) - except IntCastingNaNError as e: - df_[col] = df_[col].astype("object") - val_dtype = "object" - print("Checking {%s} for column {%s}" % (val_dtype, col)) - - print(f"Successfully transformed the '{col}' column into {val_dtype}.") - self.df = df_ + def check_value(value): + try: + if float(value).is_integer(): + return int(value) + return float(value) + except ValueError: + return np.nan + + if len(self.df) < 1e5 or not drop_values: + for col in self.df.columns: + value = str(self.df[col].iloc[0]) + if is_number_regex(value): + with concurrent.futures.ThreadPoolExecutor() as executor: + self.df[col] = list(executor.map(check_value, self.df[col])) + try: + self.df[col] = pd.to_numeric(self.df[col], errors="coerce") + print(f"Successfully transformed the '{col}' column into numeric.") + except ValueError: + self.df[col] = self.df[col].astype("object") + print(f"Failed to transform the '{col}' column, setting to object.") + + if drop_rows: + self.df.dropna(inplace=True) def _check_datetime(self, sample_frac: float) -> None: """Check and convert date-time string columns to `datetime` objects.""" - df_ = self.df - cols = df_.columns - df_sample = df_.sample(frac=sample_frac) - for column_index, datatype in enumerate(df_.dtypes): - col = cols[column_index] - if datatype == "object": - datetype_column = (df_sample[col].apply(check_if_contains_dates)).isin([True]).any() - if datetype_column: + df_sample = self.df.sample(frac=sample_frac) + for col in self.df.columns: + if pd.api.types.is_string_dtype(df_sample[col]): + if (df_sample[col].apply(check_if_contains_dates)).any(): try: with concurrent.futures.ThreadPoolExecutor() as executor: - df_[col] = list( - executor.map(lambda date: date.replace("-", ""), df_[col]) - ) - df_[col] = pd.to_datetime(df_[col], format="%Y%m%d").dt.normalize() + self.df[col] = list(executor.map(coerce_datetime, self.df[col])) print(f"Successfully transformed the '{col}' column into datetime64[ns].") - except: - with concurrent.futures.ThreadPoolExecutor() as executor: - df_[col] = list(executor.map(coerce_datetime, df_[col])) - df_[col] = pd.to_datetime(df_[col], format="%Y%m%d", errors="coerce") - print(f"Successfully transformed the '{col}' column into datetime64[ns].") - elif datatype == "datetime64[us]" or datatype == "datetime64[ns]": - df_[col] = df_[col].astype("datetime64[ns]") - df_[col] = df_[col].dt.normalize() - print(f"Successfully transformed the '{col}' column into datetime64[ns].") - - self.df = df_ + except ValueError: + print(f"Failed to transform the '{col}' column into datetime.") def create_directory(data, parent_path=""):