Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/chunks (#16) #17

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions pydbsmgr/fast_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,103 @@ def _infer_schema_query(self, datatype: str) -> str:
raise ValueError("Data type could not be inferred!")


class UploadToSQL(DataFrameToSQL):
"""It allows you to import/update a table from a `DataFrame` in an efficient way using the `DataFrameToSQL` class."""

def __init__(self, connection_string: str) -> None:
"""Establish the connection to the database using the `DataFrameToSQL` class."""
super().__init__(connection_string)

def execute(
self,
df: DataFrame,
table_name: str,
chunk_size: int,
method: str = "override", # or append
char_length: int = 512,
override_length: bool = True,
close_cursor: bool = True,
auto_resolve: bool = True,
frac: float = 0.01,
) -> None:
"""Checks if the number of chunks corresponds to the `DataFrame`."""
assert (
len(df) > chunk_size
), "'chunk_size' cannot be greater than the length of the 'DataFrame', change the 'chunk_size'"

"""Get chunks of `DataFrame`"""
if auto_resolve:
if len(df) >= 0.5e6:
n = int((df).shape[0] * frac)
df_chunks = [(df)[i : i + n] for i in range(0, (df).shape[0], n)]
else:
df_chunks = np.array_split(df, chunk_size)
else:
df_chunks = np.array_split(df, chunk_size)

if method == "override":
if self._check_table_exists(table_name):
print("Table exists, executing OVERRIDE...")
self._drop_table(table_name)

self.import_table(
df_chunks[0], table_name, True, char_length, override_length, close_cursor
)

else:
print("Table does not exist, proceeding with CREATE TABLE.")

"""Inserting the first chunk"""
self.import_table(
df_chunks[0], table_name, True, char_length, override_length, close_cursor
)

"""Inserting the rest of the chunks"""
for i in range(1, len(df_chunks)):
self.upload_table(df_chunks[i], table_name, True)
elif method == "append":
if self._check_table_exists(table_name):
for data in df_chunks:
self.upload_table(data, table_name, False)
else:
raise ValueError("Method 'append' requires an existing table.")
else:
raise ValueError(
'Invalid value for argument "method". Choose from ["override","append"].'
)

def _drop_table(self, table_name: str) -> None:
query = f"DROP TABLE IF EXISTS {table_name}"
_con = pyodbc.connect(self._connection_string, autocommit=True)
cursor = _con.cursor()
try:
cursor.execute(query)
except Exception as e:
print(f"Failed to drop table '{table_name}'. Error message:\n{str(e)}")
cursor.close()
_con.close()

def _check_table_exists(self, table_name: str) -> bool:
query = f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME='{table_name}'"
result = self._execute_query(query)
return bool(result["data"][0][0])

def _execute_query(self, query: str):
_con = pyodbc.connect(self._connection_string, autocommit=True)
cursor = _con.cursor()
try:
cursor.execute(query)
results = {"columns": [desc[0] for desc in cursor.description]}
results["data"] = cursor.fetchall()
results["count"] = [len(results["data"])]
except Exception as e:
print("Error executing SQL Query")
raise ValueError(f"Query Error: {str(e)}")
cursor.close()
_con.close()
return results


########################################################################################

if __name__ == "__main__":
Expand All @@ -168,12 +265,12 @@ def _infer_schema_query(self, datatype: str) -> str:
data = {"Name": ["John", "Alice", "Bob"], "Age": [25, 30, 35]}
df = pd.DataFrame(data)
table_name = "test_table"

upload_from_df = DataFrameToSQL(connection_string)
upload_from_df.import_table(df, table_name)
upload_from_df = UploadToSQL(connection_string)
upload_from_df.execute(df, table_name, 2)

# Update the table
data = {"Name": ["Alexis", "Ivan", "Cordero"], "Age": [27, 27, 28]}
df = pd.DataFrame(data)

upload_from_df.upload_table(df, table_name)
upload_from_df.execute(df, table_name, 2, "append")
8 changes: 4 additions & 4 deletions pydbsmgr/utils/sql_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, connection_string: str) -> None:

def insert_data(
self,
df: DataFrame,
df: DataFrame | str,
table_name: str,
overwrite: bool = True,
char_length: int = 512,
Expand Down Expand Up @@ -59,10 +59,10 @@ def insert_data(
self.file_type = "xlsx"
except:
raise ValueError("Unable to parse Excel")
if type(df) == DataFrame:
elif type(df) == DataFrame:
df_to_load = df
# txt = "{:,}".format(len(df))
print(f"Will be loaded {len(df_to_load)} rows.")
df_length = "{:,}".format(len(df_to_load))
print(f"Will be loaded {df_length} rows.")
if overwrite:
self._create(df_to_load, table_name, overwrite, char_length, override_length)
self._append_to_table(df_to_load.iloc[2:, :], table_name)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pydbsmgr",
version="0.7.8",
version="0.7.9",
author="J. A. Moreno-Guerra",
author_email="[email protected]",
description="Testing installation of Package",
Expand Down Expand Up @@ -37,5 +37,5 @@
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.6",
python_requires=">=3.10",
)