Skip to content

Commit

Permalink
Added delta changes in datamigrations, Also added account_usage in sc…
Browse files Browse the repository at this point in the history
…hema (#763)
  • Loading branch information
muhammad-ali-e authored Oct 3, 2024
1 parent 146130b commit 7a8e2e4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 15 deletions.
2 changes: 1 addition & 1 deletion backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def get_required_setting(
"corsheaders",
# For the organization model
"account_v2",
"account_usage",
# Django apps should go below this line
"django.contrib.admin",
"django.contrib.auth",
Expand All @@ -320,7 +321,6 @@ def get_required_setting(
)
v2_apps = (
"migrating.v2",
# "account_v2",
"connector_auth_v2",
"tenant_account_v2",
"connector_v2",
Expand Down
27 changes: 19 additions & 8 deletions backend/migrating/v2/management/commands/migrate_to_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _is_migration_applied(self, migration_name):
cur.execute(
f"""
SELECT COUNT(*) FROM {self.migration_tracking_table}
WHERE migration_name = %s;"
WHERE migration_name = %s;
""",
(migration_name,),
)
Expand Down Expand Up @@ -153,7 +153,7 @@ def _prepare_row_and_migrate_relations(
dest_cursor: cursor,
column_names: list[str],
column_transformations: dict[str, dict[str, Any]],
) -> tuple[Any, ...]:
) -> Optional[tuple[Any, ...]]:
"""Prepares and migrates the relational keys of a single row from the
source database to the destination database, updating specific column
values based on provided transformations.
Expand All @@ -170,28 +170,36 @@ def _prepare_row_and_migrate_relations(
- query (str): SQL query to fetch the corresponding new value.
- params (list[str]): List of column names to use as
parameters for the query.
- none_action (str, optional): Action to take if the new value
is None. Defaults to None.
If set to "DELETE", the row will be removed.
This is used to migrate old column names from V1 to new column
names in V2.
Returns:
tuple[Any, ...]: The row with updated relational keys.
Optional[tuple[Any, ...]]: The row with updated relational keys,
or None if the row should be deleted.
"""
row = list(row)

for key, transaction in column_transformations.items():
if key in column_names:
query = transaction["query"]
params = transaction["params"]
param_values = [row[column_names.index(param)] for param in params]
none_action = transaction.get("none_action", None)

param_values = [row[column_names.index(param)] for param in params]
dest_cursor.execute(query, param_values)
new_key_value = dest_cursor.fetchone()

if new_key_value:
row[column_names.index(key)] = new_key_value[0]
elif none_action == "DELETE":
# Row should be deleted
return None

for i, value in enumerate(row):
if isinstance(value, dict):
row[i] = json.dumps(value)
# Handle serialization for dict objects
row = [json.dumps(value) if isinstance(value, dict) else value for value in row]
return tuple(row)

def _migrate_rows(
Expand Down Expand Up @@ -243,6 +251,9 @@ def _migrate_rows(
converted_row = self._prepare_row_and_migrate_relations(
row, dest_cursor, column_names, column_transformations
)
if converted_row is None:
logger.info(f"[{migration_name}] Skipping deleted row {row}.")
continue
converted_rows.append(converted_row)

start_time = time.time()
Expand Down Expand Up @@ -433,7 +444,7 @@ def handle(self, *args, **options):
# Public tables
public_schema_migrations = migration_query.get_public_schema_migrations()
migrator = DataMigrator(
src_db_config, dest_db_config, v2_schema, batch_size=1000
src_db_config, dest_db_config, v2_schema, batch_size=3000
)
migrator.migrate(public_schema_migrations)

Expand Down
45 changes: 39 additions & 6 deletions backend/migrating/v2/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,21 @@ def get_public_schema_migrations(self) -> list[dict[str, str]]:
""",
"dest_table": "connector_auth",
},
{
"name": "migration_017_page_usage",
"src_query": """
SELECT id, organization_id, file_name, file_type, run_id,
pages_processed, file_size, created_at
FROM page_usage;
""",
"dest_query": f"""
INSERT INTO "{self.v2_schema}".page_usage (
id, organization_id, file_name, file_type, run_id,
pages_processed, file_size, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
""",
"dest_table": "page_usage",
},
]
return migrations

Expand Down Expand Up @@ -365,15 +380,15 @@ def get_organization_migrations(
{
"name": f"migration_{schema}_api_key",
"src_query": f"""
SELECT id, api_key, api_id, description, is_active,
SELECT id, api_key, api_id, pipeline_id, description, is_active,
created_by_id, modified_by_id, created_at, modified_at
FROM "{schema}".api_apikey;
""",
"dest_query": f"""
INSERT INTO "{self.v2_schema}".api_deployment_key (
id, api_key, api_id, description, is_active,
id, api_key, api_id, pipeline_id, description, is_active,
created_by_id, modified_by_id, created_at, modified_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
"dest_table": "api_deployment_key",
},
Expand Down Expand Up @@ -410,7 +425,7 @@ def get_organization_migrations(
FROM "{schema}".token_usage;
""",
"dest_query": f"""
INSERT INTO "{self.v2_schema}".token_usage (
INSERT INTO "{self.v2_schema}".usage (
id, workflow_id, execution_id, adapter_instance_id, run_id,
usage_type, llm_usage_reason, model_name,
embedding_tokens, prompt_tokens, completion_tokens,
Expand All @@ -419,7 +434,7 @@ def get_organization_migrations(
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, {organization_id});
""",
"dest_table": "token_usage",
"dest_table": "usage",
},
{
"name": f"migration_{schema}_workflow_execution",
Expand Down Expand Up @@ -449,7 +464,7 @@ def get_organization_migrations(
""",
"dest_query": f"""
INSERT INTO "{self.v2_schema}".file_history (
id, cache_key, workflow_id, status, error, result, meta_data,
id, cache_key, workflow_id, status, error, result, metadata,
created_at, modified_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
Expand Down Expand Up @@ -501,6 +516,7 @@ def get_organization_migrations(
WHERE user_id = %s AND organization_id='{organization_id}';
""",
"params": ["user_id"],
"none_action": "DELETE",
}
},
},
Expand Down Expand Up @@ -692,5 +708,22 @@ def get_organization_migrations(
""",
"dest_table": "prompt_studio_registry_shared_users",
},
{
"name": f"migration_{schema}_notification",
"src_query": f"""
SELECT id, name, url, authorization_key, authorization_header,
authorization_type, max_retries, platform, notification_type,
is_active, pipeline_id, api_id, created_at, modified_at
FROM "{schema}".notification_notification;
""",
"dest_query": f"""
INSERT INTO "{self.v2_schema}".notification (
id, name, url, authorization_key, authorization_header,
authorization_type, max_retries, platform, notification_type,
is_active, pipeline_id, api_id, created_at, modified_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
"dest_table": "notification",
},
]
return migrations

0 comments on commit 7a8e2e4

Please sign in to comment.