From 7a8e2e424e1dab5dc6fe28f2732045514ca1581f Mon Sep 17 00:00:00 2001 From: ali <117142933+muhammad-ali-e@users.noreply.github.com> Date: Thu, 3 Oct 2024 10:27:12 +0530 Subject: [PATCH] Added delta changes in datamigrations, Also added account_usage in schema (#763) --- backend/backend/settings/base.py | 2 +- .../v2/management/commands/migrate_to_v2.py | 27 +++++++---- backend/migrating/v2/query.py | 45 ++++++++++++++++--- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index f45dbf46d..4f0253509 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -297,6 +297,7 @@ def get_required_setting( "corsheaders", # For the organization model "account_v2", + "account_usage", # Django apps should go below this line "django.contrib.admin", "django.contrib.auth", @@ -320,7 +321,6 @@ def get_required_setting( ) v2_apps = ( "migrating.v2", - # "account_v2", "connector_auth_v2", "tenant_account_v2", "connector_v2", diff --git a/backend/migrating/v2/management/commands/migrate_to_v2.py b/backend/migrating/v2/management/commands/migrate_to_v2.py index ff07913a2..d87fe145e 100644 --- a/backend/migrating/v2/management/commands/migrate_to_v2.py +++ b/backend/migrating/v2/management/commands/migrate_to_v2.py @@ -87,7 +87,7 @@ def _is_migration_applied(self, migration_name): cur.execute( f""" SELECT COUNT(*) FROM {self.migration_tracking_table} - WHERE migration_name = %s;" + WHERE migration_name = %s; """, (migration_name,), ) @@ -153,7 +153,7 @@ def _prepare_row_and_migrate_relations( dest_cursor: cursor, column_names: list[str], column_transformations: dict[str, dict[str, Any]], - ) -> tuple[Any, ...]: + ) -> Optional[tuple[Any, ...]]: """Prepares and migrates the relational keys of a single row from the source database to the destination database, updating specific column values based on provided transformations. @@ -170,11 +170,15 @@ def _prepare_row_and_migrate_relations( - query (str): SQL query to fetch the corresponding new value. - params (list[str]): List of column names to use as parameters for the query. + - none_action (str, optional): Action to take if the new value + is None. Defaults to None. + If set to "DELETE", the row will be removed. This is used to migrate old column names from V1 to new column names in V2. Returns: - tuple[Any, ...]: The row with updated relational keys. + Optional[tuple[Any, ...]]: The row with updated relational keys, + or None if the row should be deleted. """ row = list(row) @@ -182,16 +186,20 @@ def _prepare_row_and_migrate_relations( if key in column_names: query = transaction["query"] params = transaction["params"] - param_values = [row[column_names.index(param)] for param in params] + none_action = transaction.get("none_action", None) + param_values = [row[column_names.index(param)] for param in params] dest_cursor.execute(query, param_values) new_key_value = dest_cursor.fetchone() + if new_key_value: row[column_names.index(key)] = new_key_value[0] + elif none_action == "DELETE": + # Row should be deleted + return None - for i, value in enumerate(row): - if isinstance(value, dict): - row[i] = json.dumps(value) + # Handle serialization for dict objects + row = [json.dumps(value) if isinstance(value, dict) else value for value in row] return tuple(row) def _migrate_rows( @@ -243,6 +251,9 @@ def _migrate_rows( converted_row = self._prepare_row_and_migrate_relations( row, dest_cursor, column_names, column_transformations ) + if converted_row is None: + logger.info(f"[{migration_name}] Skipping deleted row {row}.") + continue converted_rows.append(converted_row) start_time = time.time() @@ -433,7 +444,7 @@ def handle(self, *args, **options): # Public tables public_schema_migrations = migration_query.get_public_schema_migrations() migrator = DataMigrator( - src_db_config, dest_db_config, v2_schema, batch_size=1000 + src_db_config, dest_db_config, v2_schema, batch_size=3000 ) migrator.migrate(public_schema_migrations) diff --git a/backend/migrating/v2/query.py b/backend/migrating/v2/query.py index fc39bd740..7916fccdd 100644 --- a/backend/migrating/v2/query.py +++ b/backend/migrating/v2/query.py @@ -232,6 +232,21 @@ def get_public_schema_migrations(self) -> list[dict[str, str]]: """, "dest_table": "connector_auth", }, + { + "name": "migration_017_page_usage", + "src_query": """ + SELECT id, organization_id, file_name, file_type, run_id, + pages_processed, file_size, created_at + FROM page_usage; + """, + "dest_query": f""" + INSERT INTO "{self.v2_schema}".page_usage ( + id, organization_id, file_name, file_type, run_id, + pages_processed, file_size, created_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s); + """, + "dest_table": "page_usage", + }, ] return migrations @@ -365,15 +380,15 @@ def get_organization_migrations( { "name": f"migration_{schema}_api_key", "src_query": f""" - SELECT id, api_key, api_id, description, is_active, + SELECT id, api_key, api_id, pipeline_id, description, is_active, created_by_id, modified_by_id, created_at, modified_at FROM "{schema}".api_apikey; """, "dest_query": f""" INSERT INTO "{self.v2_schema}".api_deployment_key ( - id, api_key, api_id, description, is_active, + id, api_key, api_id, pipeline_id, description, is_active, created_by_id, modified_by_id, created_at, modified_at - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """, "dest_table": "api_deployment_key", }, @@ -410,7 +425,7 @@ def get_organization_migrations( FROM "{schema}".token_usage; """, "dest_query": f""" - INSERT INTO "{self.v2_schema}".token_usage ( + INSERT INTO "{self.v2_schema}".usage ( id, workflow_id, execution_id, adapter_instance_id, run_id, usage_type, llm_usage_reason, model_name, embedding_tokens, prompt_tokens, completion_tokens, @@ -419,7 +434,7 @@ def get_organization_migrations( ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, {organization_id}); """, - "dest_table": "token_usage", + "dest_table": "usage", }, { "name": f"migration_{schema}_workflow_execution", @@ -449,7 +464,7 @@ def get_organization_migrations( """, "dest_query": f""" INSERT INTO "{self.v2_schema}".file_history ( - id, cache_key, workflow_id, status, error, result, meta_data, + id, cache_key, workflow_id, status, error, result, metadata, created_at, modified_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); """, @@ -501,6 +516,7 @@ def get_organization_migrations( WHERE user_id = %s AND organization_id='{organization_id}'; """, "params": ["user_id"], + "none_action": "DELETE", } }, }, @@ -692,5 +708,22 @@ def get_organization_migrations( """, "dest_table": "prompt_studio_registry_shared_users", }, + { + "name": f"migration_{schema}_notification", + "src_query": f""" + SELECT id, name, url, authorization_key, authorization_header, + authorization_type, max_retries, platform, notification_type, + is_active, pipeline_id, api_id, created_at, modified_at + FROM "{schema}".notification_notification; + """, + "dest_query": f""" + INSERT INTO "{self.v2_schema}".notification ( + id, name, url, authorization_key, authorization_header, + authorization_type, max_retries, platform, notification_type, + is_active, pipeline_id, api_id, created_at, modified_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + """, + "dest_table": "notification", + }, ] return migrations