Skip to content

Commit

Permalink
StorageController class improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
jzsmoreno committed Dec 6, 2024
1 parent 5435000 commit 3ba6816
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 174 deletions.
15 changes: 4 additions & 11 deletions example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -303,26 +303,19 @@
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"controller.upload_parquet(\"/\", [df], [database_name])\n",
"controller.upload_parquet(\"/\", [df], [database_name], compression=False)\n",
"BlobList = controller.get_BlobList(\"/\")\n",
"controller._print_BlobPrefix()\n",
"\n",
"del BlobList[0]\n",
"\n",
"controller.set_BlobPrefix(BlobList)\n",
"controller._print_BlobPrefix()\n",
"blob_list = controller.get_blob_list(\"/\")\n",
"\n",
"dfs, names = controller.get_parquet(\"/\", \"\\w+.parquet\", True)\n",
"dfs, names = controller.get_parquet(\"/\", \"\\w+.parquet\")\n",
"\n",
"logbook.create(logbook_data)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
Expand All @@ -336,7 +329,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
273 changes: 110 additions & 163 deletions pydbsmgr/utils/azure_sdk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""Define azure storage utilities"""

import os
import re
from io import BytesIO, StringIO
from typing import List, Tuple

import pyarrow.parquet as pq
from azure.storage.blob import BlobPrefix, BlobServiceClient
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv
from pandas import read_csv, read_excel
from pandas.core.frame import DataFrame
Expand All @@ -21,7 +19,7 @@ def get_connection_string() -> str:


class StorageController(ControllerFeatures):
"""Retrive blobs from a container/directory"""
"""Retrieve blobs from a container/directory"""

def __init__(self, connection_string: str, container_name: str):
"""Create blob storage client and container client"""
Expand All @@ -34,123 +32,65 @@ def __init__(self, connection_string: str, container_name: str):
self._container_client = self._blob_service_client.get_container_client(self.container_name)
super().__init__(self._container_client)

def get_BlobList(self, directory_name: str) -> List[str]:
_BlobPrefix = self._get_BlobPrefix(directory_name)
_BlobList = []
for _blob in _BlobPrefix:
_BlobList.append(_blob["name"])
_BlobList.sort()
return _BlobList
def get_blob_list(self, directory_name: str) -> List[str]:
blob_prefixes = self._get_blob_prefix(directory_name)
return sorted(blob["name"] for blob in blob_prefixes)

def _get_blob_prefix(self, directory_name: str):
return list(
self._container_client.walk_blobs(name_starts_with=directory_name + "/", delimiter="/")
)

def get_parquet(
self, directory_name: str, regex: str, manual_mode: bool = False, engine: str = "pyarrow"
self, directory_name: str, regex: str, manual_mode: bool = False
) -> Tuple[List[DataFrame], List[str]]:
"""Perform reading of `.parquet` and `.parquet.gzip` files in container-directory"""
dataframes = list()
dataframe_names = list()
if manual_mode:
file_list = self.file_list
else:
file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/")

for file in file_list:
if not re.search(regex, file["name"], re.IGNORECASE):
print(f"Ignoring {file.name}, does not match {regex}")
continue
blob_client = self._blob_service_client.get_blob_client(
container=self.container_name, blob=file["name"]
file_list = (
self.file_list
if manual_mode
else self._container_client.walk_blobs(
name_starts_with=directory_name + "/", delimiter="/"
)
blob_data = blob_client.download_blob().readall()
print("File name : ", file["name"].split("/")[-1])

if file["name"].endswith(".parquet"):
df_name = str(file["name"]).replace(".parquet", "").split("/")[-1]
dataframe_names.append(df_name)
bytes_io = BytesIO(blob_data)
parquet_file = pq.ParquetFile(bytes_io)
df = parquet_file.read().to_pandas()
dataframes.append(df)
elif file["name"].endswith(".gzip"):
gzip_file = BytesIO(blob_data)
df_name = str(file["name"]).replace(".parquet.gzip", ".gzip").split("/")[-1]
dataframe_names.append(df_name)
parquet_file = pq.ParquetFile(gzip_file)
df = parquet_file.read().to_pandas()
dataframes.append(df)

return dataframes, dataframe_names
)
return self._read_files(file_list, regex, "parquet")

def upload_parquet(
self,
directory_name: str,
dfs: List[DataFrame],
blob_names: List[str],
format_type: str = "parquet",
engine: str = "auto",
compression: bool = True,
overwrite: bool = True,
) -> None:
"""Perform upload of `.parquet` and `.parquet.gzip` files in container-directory"""
for df, blob_name in zip(dfs, blob_names):
blob_path_name = directory_name + "/" + blob_name
if not compression:
parquet_data = df.to_parquet(index=False, engine=engine)
self._container_client.upload_blob(
name=blob_path_name + "." + format_type, data=parquet_data, overwrite=overwrite
)
elif compression:
parquet_data = df.to_parquet(index=False, engine=engine, compression="gzip")
self._container_client.upload_blob(
name=blob_path_name + "." + format_type + ".gzip",
data=parquet_data,
overwrite=overwrite,
)
else:
raise ValueError(f"{format_type} not supported")
blob_path_name = f"{directory_name}/{blob_name}"
parquet_data = df.to_parquet(
index=False, engine="pyarrow", compression="gzip" if compression else None
)
self._container_client.upload_blob(
name=(
f"{blob_path_name}.{format_type}.gz"
if compression
else f"{blob_path_name}.{format_type}"
),
data=parquet_data,
overwrite=overwrite,
)

def get_excel_csv(
self, directory_name: str, regex: str, manual_mode: bool = False
) -> Tuple[List[DataFrame], List[str]]:
"""Perform reading of `.xlsx` and `.csv` files in container-directory"""
dataframes = list()
dataframe_names = list()
if manual_mode:
file_list = self.file_list
else:
file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/")

for file in file_list:
if not re.search(regex, file["name"], re.IGNORECASE):
print(f"Ignoring {file.name}, does not match {regex}")
continue
blob_client = self._blob_service_client.get_blob_client(
container=self.container_name, blob=file["name"]
file_list = (
self.file_list
if manual_mode
else self._container_client.walk_blobs(
name_starts_with=directory_name + "/", delimiter="/"
)
blob_data = blob_client.download_blob().readall()

print("File name : ", file["name"].split("/")[-1])

if file["name"].endswith(".csv"):
# blob_data_str = StringIO(str(blob_data, encoding))
try:
blob_data_str = blob_data.decode("utf-8")
except UnicodeDecodeError:
blob_data_str = blob_data.decode("latin-1")
df_name = str(file["name"]).replace(".csv", "").split("/")[-1]
dataframe_names.append(df_name)
df = read_csv(StringIO(blob_data_str), index_col=None, low_memory=False)
dataframes.append(df)
elif file["name"].endswith(".xlsx"):
xls_buffer = BytesIO(blob_data)
all_sheets = read_excel(xls_buffer, sheet_name=None, index_col=None)
for sheet_name, df in all_sheets.items():
df_name = (
str(file["name"]).replace(".xlsx", "").split("/")[-1] + "-" + sheet_name
)
dataframe_names.append(df_name)
dataframes.append(df.reset_index(drop=True))

return dataframes, dataframe_names
)
return self._read_files(file_list, regex, "excel_csv")

def upload_excel_csv(
self,
Expand All @@ -163,88 +103,95 @@ def upload_excel_csv(
) -> None:
"""Perform upload of `.xlsx` and `.csv` files in container-directory"""
for df, blob_name in zip(dfs, blob_names):
blob_path_name = directory_name + "/" + blob_name
blob_path_name = f"{directory_name}/{blob_name}"
if format_type == "csv":
csv_data = df.to_csv(index=False, encoding=encoding)
self._container_client.upload_blob(
name=blob_path_name + "." + format_type, data=csv_data, overwrite=overwrite
name=f"{blob_path_name}.csv", data=csv_data, overwrite=overwrite
)
elif format_type == "xlsx":
xlsx_data = BytesIO()
df.to_excel(xlsx_data, index=False)
self._container_client.upload_blob(
name=blob_path_name + "." + format_type,
data=xlsx_data.getvalue(),
overwrite=overwrite,
)
with xlsx_data:
df.to_excel(xlsx_data, index=False)
self._container_client.upload_blob(
name=f"{blob_path_name}.xlsx",
data=xlsx_data.getvalue(),
overwrite=overwrite,
)
else:
raise ValueError(f"{format_type} not supported")
raise ValueError(f"Unsupported format: {format_type}")

def _read_files(self, file_list, regex, file_type):
"""Read files based on the given type and regex filter."""
dataframes = []
dataframe_names = []

for file in file_list:
if not re.search(regex, file.name, re.IGNORECASE):
print(f"Ignoring {file.name}, does not match {regex}")
continue

blob_data = self._download_blob(file.name)

if file_type == "parquet":
df_name = file.name.rsplit(".", 2)[0].rsplit("/", 1)[-1]
dataframe_names.append(df_name)
with BytesIO(blob_data) as bytes_io:
df = pq.read_table(bytes_io).to_pandas()
dataframes.append(df)

elif file_type == "excel_csv":
filename, extension = os.path.splitext(file.name.rsplit("/", 1)[1])
if extension == ".csv":
try:
blob_str = blob_data.decode("utf-8")
except UnicodeDecodeError:
blob_str = blob_data.decode("latin-1")
dataframe_names.append(filename)
with StringIO(blob_str) as csv_file:
df = read_csv(csv_file, index_col=None, low_memory=False)
dataframes.append(df)
elif extension == ".xlsx":
with BytesIO(blob_data) as xlsx_buffer:
all_sheets = read_excel(xlsx_buffer, sheet_name=None, index_col=None)
for sheet_name, df in all_sheets.items():
dataframe_names.append(f"{filename}-{sheet_name}")
dataframes.append(df.reset_index(drop=True))

return dataframes, dataframe_names

def _download_blob(self, blob_name):
"""Download a blob from Azure Storage."""
blob_client = self._blob_service_client.get_blob_client(
container=self.container_name, blob=blob_name
)
return blob_client.download_blob().readall()

def show_all_blobs(self) -> None:
"""Show directories from a container"""
print(f"Container Name: {self.container_name}")
for blob in self._container_client.list_blobs():
if len(blob["name"].split("/")) > 1:
print("\tBlob name : {}".format(blob["name"]))
if len(blob.name.split("/")) > 1:
print(f"\tBlob name : {blob.name}")

def get_all_blob(self, filter_criteria: str = None) -> List[str]:
"""Get all blob names from a container"""
blob_names = []
for blob in self._container_client.list_blobs():
if len(blob["name"].split("/")) > 1:
blob_names.append(blob["name"])
if filter_criteria != None:
blob_names = self._list_filter(blob_names, filter_criteria)
return blob_names

def show_blobs(self, directory_name) -> None:
blob_names = [
blob.name
for blob in self._container_client.list_blobs()
if len(blob.name.split("/")) > 1
]
return self._list_filter(blob_names, filter_criteria) if filter_criteria else blob_names

def show_blobs(self, directory_name: str) -> None:
"""Show blobs from a directory"""
print(f"Container Name: {self.container_name}")
print(f"\tDirectory Name: {directory_name}")
file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/")
for file in file_list:
print("\t\tBlob name: {}".format(file["name"].split("/")[1]))

def _get_BlobPrefix(self, directory_name: str) -> BlobPrefix:
self.file_list = self._container_client.walk_blobs(directory_name + "/", delimiter="/")
return self.file_list

def set_BlobPrefix(self, file_list: list) -> None:
self.file_list = self._list_to_BlobPrefix(file_list)
for file in self._container_client.walk_blobs(
name_starts_with=directory_name + "/", delimiter="/"
):
print(f"\t\tBlob name: {file.name.rsplit('/', 1)[-1]}")

def _list_filter(self, elements: list, character: str) -> List[str]:
"""Function that filters a list from a criteria
Args:
----------
elements (`list`): list of values to be filtered
character (`str`): filter criteria
Returns:
----------
List[`str`]: list of filtered elements
"""
filter_elements = []
for element in elements:
if element.find(character) != -1:
filter_elements.append(element)
return filter_elements

def _print_BlobItem(self) -> None:
for file in self.file_list:
print("File name : ", file["name"].split("/")[-1])

def _print_BlobPrefix(self) -> None:
for file in self.file_list:
print("File name : ", file["name"])

def _list_to_BlobPrefix(self, my_list: list) -> BlobPrefix:
"""Converts a list of iterable directory items into a BlobPrefix class"""
blob_prefixes = []

for item in my_list:
prefix = "/".join(item.split("/")[:-1])
blob_prefix = BlobPrefix(prefix)
blob_prefix.name = item
blob_prefixes.append(blob_prefix)
return blob_prefixes
"""Filter a list based on a criteria."""
return [element for element in elements if character in element]

0 comments on commit 3ba6816

Please sign in to comment.