diff --git a/example.ipynb b/example.ipynb index 3ef1ef7..1863ab5 100644 --- a/example.ipynb +++ b/example.ipynb @@ -303,18 +303,11 @@ "metadata": {}, "outputs": [], "source": [ - "%%capture\n", "controller.upload_parquet(\"/\", [df], [database_name])\n", "controller.upload_parquet(\"/\", [df], [database_name], compression=False)\n", - "BlobList = controller.get_BlobList(\"/\")\n", - "controller._print_BlobPrefix()\n", - "\n", - "del BlobList[0]\n", - "\n", - "controller.set_BlobPrefix(BlobList)\n", - "controller._print_BlobPrefix()\n", + "blob_list = controller.get_blob_list(\"/\")\n", "\n", - "dfs, names = controller.get_parquet(\"/\", \"\\w+.parquet\", True)\n", + "dfs, names = controller.get_parquet(\"/\", \"\\w+.parquet\")\n", "\n", "logbook.create(logbook_data)" ] @@ -322,7 +315,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -336,7 +329,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.11.9" } }, "nbformat": 4, diff --git a/pydbsmgr/utils/azure_sdk.py b/pydbsmgr/utils/azure_sdk.py index 1ead030..fd0e279 100644 --- a/pydbsmgr/utils/azure_sdk.py +++ b/pydbsmgr/utils/azure_sdk.py @@ -1,12 +1,10 @@ -"""Define azure storage utilities""" - import os import re from io import BytesIO, StringIO from typing import List, Tuple import pyarrow.parquet as pq -from azure.storage.blob import BlobPrefix, BlobServiceClient +from azure.storage.blob import BlobServiceClient from dotenv import load_dotenv from pandas import read_csv, read_excel from pandas.core.frame import DataFrame @@ -21,7 +19,7 @@ def get_connection_string() -> str: class StorageController(ControllerFeatures): - """Retrive blobs from a container/directory""" + """Retrieve blobs from a container/directory""" def __init__(self, connection_string: str, container_name: str): """Create blob storage client and container client""" @@ -34,51 +32,27 @@ def __init__(self, connection_string: str, container_name: str): self._container_client = self._blob_service_client.get_container_client(self.container_name) super().__init__(self._container_client) - def get_BlobList(self, directory_name: str) -> List[str]: - _BlobPrefix = self._get_BlobPrefix(directory_name) - _BlobList = [] - for _blob in _BlobPrefix: - _BlobList.append(_blob["name"]) - _BlobList.sort() - return _BlobList + def get_blob_list(self, directory_name: str) -> List[str]: + blob_prefixes = self._get_blob_prefix(directory_name) + return sorted(blob["name"] for blob in blob_prefixes) + + def _get_blob_prefix(self, directory_name: str): + return list( + self._container_client.walk_blobs(name_starts_with=directory_name + "/", delimiter="/") + ) def get_parquet( - self, directory_name: str, regex: str, manual_mode: bool = False, engine: str = "pyarrow" + self, directory_name: str, regex: str, manual_mode: bool = False ) -> Tuple[List[DataFrame], List[str]]: """Perform reading of `.parquet` and `.parquet.gzip` files in container-directory""" - dataframes = list() - dataframe_names = list() - if manual_mode: - file_list = self.file_list - else: - file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/") - - for file in file_list: - if not re.search(regex, file["name"], re.IGNORECASE): - print(f"Ignoring {file.name}, does not match {regex}") - continue - blob_client = self._blob_service_client.get_blob_client( - container=self.container_name, blob=file["name"] + file_list = ( + self.file_list + if manual_mode + else self._container_client.walk_blobs( + name_starts_with=directory_name + "/", delimiter="/" ) - blob_data = blob_client.download_blob().readall() - print("File name : ", file["name"].split("/")[-1]) - - if file["name"].endswith(".parquet"): - df_name = str(file["name"]).replace(".parquet", "").split("/")[-1] - dataframe_names.append(df_name) - bytes_io = BytesIO(blob_data) - parquet_file = pq.ParquetFile(bytes_io) - df = parquet_file.read().to_pandas() - dataframes.append(df) - elif file["name"].endswith(".gzip"): - gzip_file = BytesIO(blob_data) - df_name = str(file["name"]).replace(".parquet.gzip", ".gzip").split("/")[-1] - dataframe_names.append(df_name) - parquet_file = pq.ParquetFile(gzip_file) - df = parquet_file.read().to_pandas() - dataframes.append(df) - - return dataframes, dataframe_names + ) + return self._read_files(file_list, regex, "parquet") def upload_parquet( self, @@ -86,71 +60,37 @@ def upload_parquet( dfs: List[DataFrame], blob_names: List[str], format_type: str = "parquet", - engine: str = "auto", compression: bool = True, overwrite: bool = True, ) -> None: """Perform upload of `.parquet` and `.parquet.gzip` files in container-directory""" for df, blob_name in zip(dfs, blob_names): - blob_path_name = directory_name + "/" + blob_name - if not compression: - parquet_data = df.to_parquet(index=False, engine=engine) - self._container_client.upload_blob( - name=blob_path_name + "." + format_type, data=parquet_data, overwrite=overwrite - ) - elif compression: - parquet_data = df.to_parquet(index=False, engine=engine, compression="gzip") - self._container_client.upload_blob( - name=blob_path_name + "." + format_type + ".gzip", - data=parquet_data, - overwrite=overwrite, - ) - else: - raise ValueError(f"{format_type} not supported") + blob_path_name = f"{directory_name}/{blob_name}" + parquet_data = df.to_parquet( + index=False, engine="pyarrow", compression="gzip" if compression else None + ) + self._container_client.upload_blob( + name=( + f"{blob_path_name}.{format_type}.gz" + if compression + else f"{blob_path_name}.{format_type}" + ), + data=parquet_data, + overwrite=overwrite, + ) def get_excel_csv( self, directory_name: str, regex: str, manual_mode: bool = False ) -> Tuple[List[DataFrame], List[str]]: """Perform reading of `.xlsx` and `.csv` files in container-directory""" - dataframes = list() - dataframe_names = list() - if manual_mode: - file_list = self.file_list - else: - file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/") - - for file in file_list: - if not re.search(regex, file["name"], re.IGNORECASE): - print(f"Ignoring {file.name}, does not match {regex}") - continue - blob_client = self._blob_service_client.get_blob_client( - container=self.container_name, blob=file["name"] + file_list = ( + self.file_list + if manual_mode + else self._container_client.walk_blobs( + name_starts_with=directory_name + "/", delimiter="/" ) - blob_data = blob_client.download_blob().readall() - - print("File name : ", file["name"].split("/")[-1]) - - if file["name"].endswith(".csv"): - # blob_data_str = StringIO(str(blob_data, encoding)) - try: - blob_data_str = blob_data.decode("utf-8") - except UnicodeDecodeError: - blob_data_str = blob_data.decode("latin-1") - df_name = str(file["name"]).replace(".csv", "").split("/")[-1] - dataframe_names.append(df_name) - df = read_csv(StringIO(blob_data_str), index_col=None, low_memory=False) - dataframes.append(df) - elif file["name"].endswith(".xlsx"): - xls_buffer = BytesIO(blob_data) - all_sheets = read_excel(xls_buffer, sheet_name=None, index_col=None) - for sheet_name, df in all_sheets.items(): - df_name = ( - str(file["name"]).replace(".xlsx", "").split("/")[-1] + "-" + sheet_name - ) - dataframe_names.append(df_name) - dataframes.append(df.reset_index(drop=True)) - - return dataframes, dataframe_names + ) + return self._read_files(file_list, regex, "excel_csv") def upload_excel_csv( self, @@ -163,88 +103,95 @@ def upload_excel_csv( ) -> None: """Perform upload of `.xlsx` and `.csv` files in container-directory""" for df, blob_name in zip(dfs, blob_names): - blob_path_name = directory_name + "/" + blob_name + blob_path_name = f"{directory_name}/{blob_name}" if format_type == "csv": csv_data = df.to_csv(index=False, encoding=encoding) self._container_client.upload_blob( - name=blob_path_name + "." + format_type, data=csv_data, overwrite=overwrite + name=f"{blob_path_name}.csv", data=csv_data, overwrite=overwrite ) elif format_type == "xlsx": xlsx_data = BytesIO() - df.to_excel(xlsx_data, index=False) - self._container_client.upload_blob( - name=blob_path_name + "." + format_type, - data=xlsx_data.getvalue(), - overwrite=overwrite, - ) + with xlsx_data: + df.to_excel(xlsx_data, index=False) + self._container_client.upload_blob( + name=f"{blob_path_name}.xlsx", + data=xlsx_data.getvalue(), + overwrite=overwrite, + ) else: - raise ValueError(f"{format_type} not supported") + raise ValueError(f"Unsupported format: {format_type}") + + def _read_files(self, file_list, regex, file_type): + """Read files based on the given type and regex filter.""" + dataframes = [] + dataframe_names = [] + + for file in file_list: + if not re.search(regex, file.name, re.IGNORECASE): + print(f"Ignoring {file.name}, does not match {regex}") + continue + + blob_data = self._download_blob(file.name) + + if file_type == "parquet": + df_name = file.name.rsplit(".", 2)[0].rsplit("/", 1)[-1] + dataframe_names.append(df_name) + with BytesIO(blob_data) as bytes_io: + df = pq.read_table(bytes_io).to_pandas() + dataframes.append(df) + + elif file_type == "excel_csv": + filename, extension = os.path.splitext(file.name.rsplit("/", 1)[1]) + if extension == ".csv": + try: + blob_str = blob_data.decode("utf-8") + except UnicodeDecodeError: + blob_str = blob_data.decode("latin-1") + dataframe_names.append(filename) + with StringIO(blob_str) as csv_file: + df = read_csv(csv_file, index_col=None, low_memory=False) + dataframes.append(df) + elif extension == ".xlsx": + with BytesIO(blob_data) as xlsx_buffer: + all_sheets = read_excel(xlsx_buffer, sheet_name=None, index_col=None) + for sheet_name, df in all_sheets.items(): + dataframe_names.append(f"{filename}-{sheet_name}") + dataframes.append(df.reset_index(drop=True)) + + return dataframes, dataframe_names + + def _download_blob(self, blob_name): + """Download a blob from Azure Storage.""" + blob_client = self._blob_service_client.get_blob_client( + container=self.container_name, blob=blob_name + ) + return blob_client.download_blob().readall() def show_all_blobs(self) -> None: """Show directories from a container""" print(f"Container Name: {self.container_name}") for blob in self._container_client.list_blobs(): - if len(blob["name"].split("/")) > 1: - print("\tBlob name : {}".format(blob["name"])) + if len(blob.name.split("/")) > 1: + print(f"\tBlob name : {blob.name}") def get_all_blob(self, filter_criteria: str = None) -> List[str]: """Get all blob names from a container""" - blob_names = [] - for blob in self._container_client.list_blobs(): - if len(blob["name"].split("/")) > 1: - blob_names.append(blob["name"]) - if filter_criteria != None: - blob_names = self._list_filter(blob_names, filter_criteria) - return blob_names - - def show_blobs(self, directory_name) -> None: + blob_names = [ + blob.name + for blob in self._container_client.list_blobs() + if len(blob.name.split("/")) > 1 + ] + return self._list_filter(blob_names, filter_criteria) if filter_criteria else blob_names + + def show_blobs(self, directory_name: str) -> None: """Show blobs from a directory""" print(f"Container Name: {self.container_name}") print(f"\tDirectory Name: {directory_name}") - file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/") - for file in file_list: - print("\t\tBlob name: {}".format(file["name"].split("/")[1])) - - def _get_BlobPrefix(self, directory_name: str) -> BlobPrefix: - self.file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/") - return self.file_list - - def set_BlobPrefix(self, file_list: list) -> None: - self.file_list = self._list_to_BlobPrefix(file_list) + for file in self._container_client.walk_blobs( + name_starts_with=directory_name + "/", delimiter="/" + ): + print(f"\t\tBlob name: {file.name.rsplit('/', 1)[-1]}") def _list_filter(self, elements: list, character: str) -> List[str]: - """Function that filters a list from a criteria - - Args: - ---------- - elements (`list`): list of values to be filtered - character (`str`): filter criteria - - Returns: - ---------- - List[`str`]: list of filtered elements - """ - filter_elements = [] - for element in elements: - if element.find(character) != -1: - filter_elements.append(element) - return filter_elements - - def _print_BlobItem(self) -> None: - for file in self.file_list: - print("File name : ", file["name"].split("/")[-1]) - - def _print_BlobPrefix(self) -> None: - for file in self.file_list: - print("File name : ", file["name"]) - - def _list_to_BlobPrefix(self, my_list: list) -> BlobPrefix: - """Converts a list of iterable directory items into a BlobPrefix class""" - blob_prefixes = [] - - for item in my_list: - prefix = "/".join(item.split("/")[:-1]) - blob_prefix = BlobPrefix(prefix) - blob_prefix.name = item - blob_prefixes.append(blob_prefix) - return blob_prefixes + """Filter a list based on a criteria.""" + return [element for element in elements if character in element]