Skip to content

Commit

Permalink
feature/chunks (#16)
Browse files Browse the repository at this point in the history
* Update `fast_upload.py`

Added function to upload large dataframes to sql by chunks

* Update `fast_upload.py`

* Update `fast_upload.py`

* Refactoring class

---------

Co-authored-by: jzsmoreno <[email protected]>
  • Loading branch information
jafetcc02 and jzsmoreno authored Dec 22, 2023
1 parent 020f3c4 commit c50f362
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 10 deletions.
105 changes: 101 additions & 4 deletions pydbsmgr/fast_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,103 @@ def _infer_schema_query(self, datatype: str) -> str:
raise ValueError("Data type could not be inferred!")


class UploadToSQL(DataFrameToSQL):
"""It allows you to import/update a table from a `DataFrame` in an efficient way using the `DataFrameToSQL` class."""

def __init__(self, connection_string: str) -> None:
"""Establish the connection to the database using the `DataFrameToSQL` class."""
super().__init__(connection_string)

def execute(
self,
df: DataFrame,
table_name: str,
chunk_size: int,
method: str = "override", # or append
char_length: int = 512,
override_length: bool = True,
close_cursor: bool = True,
auto_resolve: bool = True,
frac: float = 0.01,
) -> None:
"""Checks if the number of chunks corresponds to the `DataFrame`."""
assert (
len(df) > chunk_size
), "'chunk_size' cannot be greater than the length of the 'DataFrame', change the 'chunk_size'"

"""Get chunks of `DataFrame`"""
if auto_resolve:
if len(df) >= 0.5e6:
n = int((df).shape[0] * frac)
df_chunks = [(df)[i : i + n] for i in range(0, (df).shape[0], n)]
else:
df_chunks = np.array_split(df, chunk_size)
else:
df_chunks = np.array_split(df, chunk_size)

if method == "override":
if self._check_table_exists(table_name):
print("Table exists, executing OVERRIDE...")
self._drop_table(table_name)

self.import_table(
df_chunks[0], table_name, True, char_length, override_length, close_cursor
)

else:
print("Table does not exist, proceeding with CREATE TABLE.")

"""Inserting the first chunk"""
self.import_table(
df_chunks[0], table_name, True, char_length, override_length, close_cursor
)

"""Inserting the rest of the chunks"""
for i in range(1, len(df_chunks)):
self.upload_table(df_chunks[i], table_name, True)
elif method == "append":
if self._check_table_exists(table_name):
for data in df_chunks:
self.upload_table(data, table_name, False)
else:
raise ValueError("Method 'append' requires an existing table.")
else:
raise ValueError(
'Invalid value for argument "method". Choose from ["override","append"].'
)

def _drop_table(self, table_name: str) -> None:
query = f"DROP TABLE IF EXISTS {table_name}"
_con = pyodbc.connect(self._connection_string, autocommit=True)
cursor = _con.cursor()
try:
cursor.execute(query)
except Exception as e:
print(f"Failed to drop table '{table_name}'. Error message:\n{str(e)}")
cursor.close()
_con.close()

def _check_table_exists(self, table_name: str) -> bool:
query = f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME='{table_name}'"
result = self._execute_query(query)
return bool(result["data"][0][0])

def _execute_query(self, query: str):
_con = pyodbc.connect(self._connection_string, autocommit=True)
cursor = _con.cursor()
try:
cursor.execute(query)
results = {"columns": [desc[0] for desc in cursor.description]}
results["data"] = cursor.fetchall()
results["count"] = [len(results["data"])]
except Exception as e:
print("Error executing SQL Query")
raise ValueError(f"Query Error: {str(e)}")
cursor.close()
_con.close()
return results


########################################################################################

if __name__ == "__main__":
Expand All @@ -168,12 +265,12 @@ def _infer_schema_query(self, datatype: str) -> str:
data = {"Name": ["John", "Alice", "Bob"], "Age": [25, 30, 35]}
df = pd.DataFrame(data)
table_name = "test_table"

upload_from_df = DataFrameToSQL(connection_string)
upload_from_df.import_table(df, table_name)
upload_from_df = UploadToSQL(connection_string)
upload_from_df.execute(df, table_name, 2)

# Update the table
data = {"Name": ["Alexis", "Ivan", "Cordero"], "Age": [27, 27, 28]}
df = pd.DataFrame(data)

upload_from_df.upload_table(df, table_name)
upload_from_df.execute(df, table_name, 2, "append")
8 changes: 4 additions & 4 deletions pydbsmgr/utils/sql_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, connection_string: str) -> None:

def insert_data(
self,
df: DataFrame,
df: DataFrame | str,
table_name: str,
overwrite: bool = True,
char_length: int = 512,
Expand Down Expand Up @@ -59,10 +59,10 @@ def insert_data(
self.file_type = "xlsx"
except:
raise ValueError("Unable to parse Excel")
if type(df) == DataFrame:
elif type(df) == DataFrame:
df_to_load = df
# txt = "{:,}".format(len(df))
print(f"Will be loaded {len(df_to_load)} rows.")
df_length = "{:,}".format(len(df_to_load))
print(f"Will be loaded {df_length} rows.")
if overwrite:
self._create(df_to_load, table_name, overwrite, char_length, override_length)
self._append_to_table(df_to_load.iloc[2:, :], table_name)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pydbsmgr",
version="0.7.8",
version="0.7.9",
author="J. A. Moreno-Guerra",
author_email="[email protected]",
description="Testing installation of Package",
Expand Down Expand Up @@ -37,5 +37,5 @@
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.6",
python_requires=">=3.10",
)

0 comments on commit c50f362

Please sign in to comment.