diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9d11c1f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +# Use the official Microsoft SQL Server image as the base image +FROM mcr.microsoft.com/mssql/server:2022-latest + +# Set environment variables for non-interactive installation +ENV DEBIAN_FRONTEND=noninteractive + +# Set the password for the SQL Server 'sa' user +ENV SA_PASSWORD=vSk60DcYRU + +# Set the ACCEPT_EULA environment variable to accept the EULA +ENV ACCEPT_EULA=Y + +# Expose SQL Server port +EXPOSE 1433 + +# Start SQL Server +CMD ["/opt/mssql/bin/sqlservr"] + diff --git a/pydbsmgr/VERSION b/pydbsmgr/VERSION index bae256f..b5d0ec5 100644 --- a/pydbsmgr/VERSION +++ b/pydbsmgr/VERSION @@ -1 +1 @@ -0.9.7 \ No newline at end of file +0.9.8 \ No newline at end of file diff --git a/pydbsmgr/fast_upload.py b/pydbsmgr/fast_upload.py index 35cc6c3..fe9a054 100644 --- a/pydbsmgr/fast_upload.py +++ b/pydbsmgr/fast_upload.py @@ -13,12 +13,9 @@ class DataFrameToSQL(ColumnsCheck): - """Allows you to create a table from a dataframe""" - - __slots__ = ["_connection_string", "_con", "_cur"] + """Allows creation of a table from a DataFrame and uploading data to the database""" def __init__(self, connection_string: str) -> None: - """Set the connection with the database""" self._connection_string = connection_string self._con = pyodbc.connect(self._connection_string, autocommit=True) self._cur = self._con.cursor() @@ -30,142 +27,119 @@ def import_table( overwrite: bool = True, char_length: int = 512, override_length: bool = True, - close_cursor: bool = True, + close_connection: bool = True, verbose: bool = False, ) -> None: - """Process for importing the dataframe into the database""" - - """Check if the current connection is active. If it is not, create a new connection""" + """Imports a DataFrame into the database as a new table""" - super().__init__(df) - df = self.get_frame() - df = df.replace(" ", None) - df = df.replace("", None) - df = df.replace(np.datetime64("NaT"), None) + df = self._preprocess_dataframe(df) if self._con.closed: - self._con = pyodbc.connect(self._connection_string, autocommit=True) - self._cur = self._con.cursor() + self._reconnect() try: - """Create table""" - self._cur.execute( - self._create_table_query(table_name, df, char_length, override_length) - ) + query = self._create_table_query(table_name, df, char_length, override_length) + self._cur.execute(query) except pyodbc.Error as e: if overwrite: - """If the table exists, it will be deleted and recreated""" - self._cur.execute("DROP TABLE %s" % (table_name)) - self._cur.execute( - self._create_table_query(table_name, df, char_length, override_length) - ) + self._drop_and_recreate_table(table_name, query) else: - warning_type = "UserWarning" - msg = "It was not possible to create the table {%s}" % table_name - msg += "Error: {%s}" % e - print(f"{warning_type}: {msg}") + print(f"UserWarning: Could not create table {table_name}. Error: {e}") - """Insert data""" + query = self._insert_table_query(table_name, df) self._cur.fast_executemany = True - self._cur.executemany( - self._insert_table_query(table_name, df), - [ - [None if (isinstance(value, float) and np.isnan(value)) else value for value in row] - for row in df.values.tolist() - ], - ) - if close_cursor: + self._cur.executemany(query, self._prepare_data_for_insertion(df)) + + if close_connection: self._con.close() if verbose: - msg = "Table {%s}, successfully imported!" % table_name - print(f"{msg}") + print(f"Table {table_name} successfully imported!") - def upload_table(self, df: DataFrame, table_name: str, verbose: bool = False) -> None: - """Access to update data from a dataframe to a database""" - - """Check if the current connection is active. If it is not, create a new connection""" + def upload_table( + self, df: DataFrame, table_name: str, close_connection: bool = True, verbose: bool = False + ) -> None: + """Updates data in an existing table from a DataFrame""" - super().__init__(df) - df = self.get_frame() - df = df.replace(" ", None) - df = df.replace("", None) - df = df.replace(np.datetime64("NaT"), None) + df = self._preprocess_dataframe(df) if self._con.closed: - self._con = pyodbc.connect(self._connection_string, autocommit=True) - self._cur = self._con.cursor() + self._reconnect() try: - """Insert data""" + query = self._insert_table_query(table_name, df) self._cur.fast_executemany = True - self._cur.executemany(self._insert_table_query(table_name, df), df.values.tolist()) - self._con.close() + self._cur.executemany(query, self._prepare_data_for_insertion(df)) except pyodbc.Error as e: - print(e) - warning_type = "UserWarning" - msg = "It was not possible to create the table {%s}" % table_name - msg += "Error: {%s}" % e - print(f"{warning_type}: {msg}") + print(f"UserWarning: Could not upload data to table {table_name}. Error: {e}") + + if close_connection: + self._con.close() if verbose: - msg = "Table {%s}, successfully uploaded!" % table_name - print(f"{msg}") + print(f"Data successfully uploaded to table {table_name}!") + + def _preprocess_dataframe(self, df: DataFrame) -> DataFrame: + super().__init__(df) + return self.get_frame().replace([" ", "", np.datetime64("NaT")], None) + + def _reconnect(self): + self._con = pyodbc.connect(self._connection_string, autocommit=True) + self._cur = self._con.cursor() + + def _drop_and_recreate_table(self, table_name: str, query: str) -> None: + try: + self._cur.execute(f"DROP TABLE {table_name}") + self._cur.execute(query) + except pyodbc.Error as e: + print(f"UserWarning: Could not recreate table {table_name}. Error: {e}") def _create_table_query( self, table_name: str, df: DataFrame, char_length: int, override_length: bool ) -> str: - """Build the query that will be used to create the table""" - query = "CREATE TABLE " + table_name + "(" - for j, column in enumerate(df.columns): - matches = re.findall(r"([^']*)", str(df.iloc[:, j].dtype)) - dtype = self._infer_schema_query(matches[0]) - if dtype == "VARCHAR(MAX)": - element = max(list(df[column].astype(str)), key=len) - max_string_length = len(element) - if max_string_length == 0 or override_length: - max_string_length = char_length - dtype = dtype.replace("MAX", str(max_string_length)) - query += column + " " + dtype + ", " - - query = query[:-2] - query += ")" - return query + columns = ", ".join( + f"{col} {self._infer_schema(col, df, char_length, override_length)}" + for col in df.columns + ) + return f"CREATE TABLE {table_name} ({columns})" def _insert_table_query(self, table_name: str, df: DataFrame) -> str: - """Build the query to insert all rows found in the dataframe""" - query = "INSERT INTO %s({0}) values ({1})" % (table_name) - query = query.format(",".join(df.columns), ",".join("?" * len(df.columns))) - return query - - def _infer_schema_query(self, datatype: str) -> str: - """Infer schema from a given datatype string""" - datatype = datatype.lower() - if datatype.find("float") != -1: + columns = ", ".join(df.columns) + placeholders = ", ".join("?" * len(df.columns)) + return f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + + def _infer_schema( + self, column: str, df: DataFrame, char_length: int, override_length: bool + ) -> str: + dtype = str(df[column].dtype).lower() + if "float" in dtype: return "FLOAT" - elif datatype.find("int") != -1: - if datatype.find("64") != -1: - return "BIGINT" - else: - return "INT" - elif datatype.find("datetime") != -1: + elif "int" in dtype: + return "BIGINT" if "64" in dtype else "INT" + elif "datetime" in dtype: return "DATE" - elif datatype.find("object") != -1: - return "VARCHAR(MAX)" - elif datatype.find("category") != -1: - return "VARCHAR(MAX)" - elif datatype.find("bool") != -1: + elif "object" in dtype or "category" in dtype: + max_length = df[column].astype(str).str.len().max() + length = char_length if override_length or max_length == 0 else max_length + return f"VARCHAR({length})" + elif "bool" in dtype: return "BIT" - else: - raise ValueError("Data type could not be inferred!") + raise ValueError(f"Data type of column {column} could not be inferred: {dtype}") + + def _prepare_data_for_insertion(self, df: DataFrame) -> list: + return [ + [None if (isinstance(value, float) and np.isnan(value)) else value for value in row] + for row in df.values.tolist() + ] class UploadToSQL(DataFrameToSQL): - """It allows you to import/update a table from a `DataFrame` in an efficient way using the `DataFrameToSQL` class.""" + """Efficiently imports/updates a table from a `DataFrame` using the `DataFrameToSQL` class.""" def __init__(self, connection_string: str) -> None: - """Establish the connection to the database using the `DataFrameToSQL` class.""" + """Establishes the connection to the database.""" super().__init__(connection_string) + self._verbose = True def execute( self, @@ -175,23 +149,21 @@ def execute( method: str = "override", # or append char_length: int = 512, override_length: bool = True, - close_cursor: bool = True, + close_connection: bool = True, auto_resolve: bool = True, frac: float = 0.01, verbose: bool = False, ) -> None: - """Checks if the number of chunks corresponds to the `DataFrame`.""" - assert ( - len(df) > chunk_size - ), "'chunk_size' cannot be greater than the length of the 'DataFrame', change the 'chunk_size'" - - """Get chunks of `DataFrame`""" - if auto_resolve: - if len(df) >= 0.5e6: - n = int((df).shape[0] * frac) - df_chunks = [(df)[i : i + n] for i in range(0, (df).shape[0], n)] - else: - df_chunks = np.array_split(df, chunk_size) + """Executes the import/update operation based on the specified method.""" + if len(df) <= chunk_size: + raise ValueError( + "'chunk_size' cannot be greater than or equal to the length of the 'DataFrame'. Change the 'chunk_size'." + ) + + # Get chunks of DataFrame + if auto_resolve and len(df) >= 0.5e6: + n = int(len(df) * frac) + df_chunks = [df[i : i + n] for i in range(0, len(df), n)] else: df_chunks = np.array_split(df, chunk_size) @@ -199,81 +171,104 @@ def execute( if self._check_table_exists(table_name): print("Table exists, executing OVERRIDE...") self._drop_table(table_name) - + # Create table with the first chunk self.import_table( df=df_chunks[0], table_name=table_name, overwrite=True, char_length=char_length, override_length=override_length, - close_cursor=close_cursor, + close_connection=False, verbose=verbose, ) - else: print("Table does not exist, proceeding with CREATE TABLE.") - - """Inserting the first chunk""" + # Create table with the first chunk self.import_table( - df_chunks[0], - table_name, - True, - char_length, - override_length, - close_cursor, - verbose, + df=df_chunks[0], + table_name=table_name, + overwrite=True, + char_length=char_length, + override_length=override_length, + close_connection=False, + verbose=verbose, ) - """Inserting the rest of the chunks""" + # Insert the rest of the chunks for i in range(1, len(df_chunks)): - self.upload_table(df_chunks[i], table_name, False) + self.upload_table(df_chunks[i], table_name, close_connection=False) + elif method == "append": if self._check_table_exists(table_name): for data in df_chunks: - self.upload_table(data, table_name, False) + self.upload_table(data, table_name, close_connection=False) else: raise ValueError("Method 'append' requires an existing table.") else: raise ValueError( - 'Invalid value for argument "method". Choose from ["override","append"].' + 'Invalid value for argument "method". Choose from ["override", "append"].' ) + if close_connection: + self._con.close() + def _drop_table(self, table_name: str) -> None: query = f"DROP TABLE IF EXISTS {table_name}" - _con = pyodbc.connect(self._connection_string, autocommit=True) - cursor = _con.cursor() + if self._con.closed: + self._reconnect() try: - cursor.execute(query) + self._cur.execute(query) + if self.verbose: + print(f"Table '{table_name}' dropped successfully.") except Exception as e: print(f"Failed to drop table '{table_name}'. Error message:\n{str(e)}") - cursor.close() - _con.close() def _check_table_exists(self, table_name: str) -> bool: + if self._con.closed: + self._reconnect() query = f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME='{table_name}'" - result = self._execute_query(query) - return bool(result["data"][0][0]) + try: + self._cur.execute(query) + result = self._cur.fetchone() + self._con.close() + return bool(result[0]) + except Exception as e: + print("Error checking if table exists.") + raise ValueError(f"Query Error: {str(e)}") def _execute_query(self, query: str): - _con = pyodbc.connect(self._connection_string, autocommit=True) - cursor = _con.cursor() try: - cursor.execute(query) - results = {"columns": [desc[0] for desc in cursor.description]} - results["data"] = cursor.fetchall() - results["count"] = [len(results["data"])] + self._cur.execute(query) + results = {"columns": [desc[0] for desc in self._cur.description]} + results["data"] = self._cur.fetchall() + results["count"] = len(results["data"]) + return results except Exception as e: print("Error executing SQL Query") raise ValueError(f"Query Error: {str(e)}") - cursor.close() - _con.close() - return results + + @property + def verbose(self) -> bool: + return self._verbose + + @verbose.setter + def verbose(self, value: bool): + self._verbose = value ######################################################################################## if __name__ == "__main__": connection_string = os.getenv("conn_string") + connection_string = ( + "Driver={ODBC Driver 18 for SQL Server};" + "Server=localhost,1433;" + "Database=master;" + "UID=sa;" + "PWD=vSk60DcYRU;" + "Encrypt=no;" + "TrustServerCertificate=yes;" + ) # Create a DataFrame data = {"Name": ["John", "Alice", "Bob"], "Age": [25, 30, 35]} df = pd.DataFrame(data) diff --git a/pydbsmgr/utils/tools.py b/pydbsmgr/utils/tools.py index 3aab621..5769172 100644 --- a/pydbsmgr/utils/tools.py +++ b/pydbsmgr/utils/tools.py @@ -88,28 +88,26 @@ def generate_secure_password(pass_len: int = 24) -> str: class ColumnsCheck: - """Performs the relevant checks on the columns of the `DataFrame`""" + """Performs checks on the columns of a DataFrame""" def __init__(self, df: DataFrame): self.df = df def get_frame(self, **kwargs) -> DataFrame: - self.df = self._process_columns(**kwargs) - return self.df + return self._process_columns(**kwargs) def _process_columns(self, surrounding: bool = True) -> DataFrame: - df = (self.df).copy() - df.columns = df.columns.str.lower() - df.columns = df.columns.str.replace(".", "", regex=False) - df.columns = df.columns.str.replace(",", "", regex=False) - df.columns = df.columns.str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True) - - df.columns = df.columns.str.replace("_+", "_", regex=True) - df.columns = df.columns.str.strip() - df.columns = df.columns.str.strip("_") + df = self.df.copy() + df.columns = ( + df.columns.str.lower() + .str.replace("[.,]", "", regex=True) + .str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True) + .str.replace("_+", "_", regex=True) + .str.strip() + .str.strip("_") + ) if surrounding: df.columns = [f"[{col}]" for col in df.columns] - return df