Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(utils): improve upsert (#13) #101

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions bigfunctions/upsert.yaml → bigfunctions/merge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ description: |-

| Param | Possible values |
|---|---|
| `query_or_table_or_view` | Can be a fully qualified table or view `(<project-id>.)?<dataset_id>.<table_or_view_name>`. <br> Can also be a plain query in BigQuery Standard SQL. |
| `destination_table` | Must be a fully qualified table `(<project-id>.)?<dataset_id>.<table_or_view_name>`. |
| `insertion_mode` | Three insertion mode are available:<ul><li> `"insert_only"`: existing records in `query_or_table_or_view` and not existing in `destination_table` are inserted. Deletion and update are not possible. </li><li> `"delta"`: same as `insert_only` with the updatable records. Records existing both in `query_or_table_or_view` and in `destination_table` are updated. If `recency_field` is filled, only the most recent version from source and destination is kept. </li><li> `"full"`: same as `delta` with the deletable records. Records not existing in `query_or_table_or_view` and existing in `destination_table` are deleted. </li> </ul> |
| `query_or_table_or_view` | Can be a fully qualified table or view `<project-id>.<dataset_id>.<table_or_view_name>`. <br> Can also be a plain query in BigQuery Standard SQL. |
| `destination_table` | Must be a fully qualified table `<project-id>.<dataset_id>.<table_or_view_name>`. |
| `insertion_mode` | Three insertion mode are available:<ul><li> `"insert_only"`: existing records in `query_or_table_or_view` and not existing in `destination_table` are inserted. Deletion and update are not possible. </li><li> `"upsert"`: same as `insert_only` with the updatable records. Records existing both in `query_or_table_or_view` and in `destination_table` are updated. If `recency_field` is filled, only the most recent version from source and destination is kept. </li><li> `"full"`: same as `upsert` with the deletable records. Records not existing in `query_or_table_or_view` and existing in `destination_table` are deleted. </li> </ul> |
| `primary_keys` | Combination of field identifying a record. If `primary_keys = []`, every row will be considered as a unique record. |
| `recency_field` | Orderable field (ie. `timestamp`, `integer`, ...) to identify the relative frechness of a record version. |
arguments:
Expand All @@ -31,18 +31,18 @@ arguments:
- name: recency_field
type: string
examples:
- description: "Merge tables in delta mode"
- description: "Merge tables in upsert mode"
arguments:
- "'dataset_id.source_table_or_view'"
- "'dataset_id.destination_table'"
- "'delta'"
- "'project-id.dataset_id.source_table_or_view'"
- "'project-id.dataset_id.destination_table'"
- "'upsert'"
- "['id']"
- "'timestamp_field'"
region: ALL
- description: "Merge from query in full"
arguments:
- "'select * from dataset_id.source_table_or_view where filter_field = true'"
- "'dataset_id.destination_table'"
- "'select * from project-id.dataset_id.source_table_or_view where filter_field = true'"
- "'project-id.dataset_id.destination_table'"
- "'full'"
- "['id']"
- "null"
Expand All @@ -52,7 +52,7 @@ code: |
declare context json;
declare table_columns array<string>;

assert lower(insertion_mode) in ('insert_only', 'delta', 'full') AS '`insertion_mode` must be either "insert_only", "delta", or "full"';
assert lower(insertion_mode) in ('insert_only', 'upsert', 'full') AS '`insertion_mode` must be either "insert_only", "upsert", or "full"';

/*
Get destination table columns to define the insert and update parts of the merge query.
Expand Down Expand Up @@ -157,15 +157,16 @@ code: |

set context = to_json(struct(
if(
-- if table then create a query from its name.
regexp_contains(replace(trim(query_or_table_or_view), '`', ''), r'^(([a-zA-Z0-9\-]+)\.)?([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$'),
'select * from ' || query_or_table_or_view,
query_or_table_or_view
) as query_or_table_or_view,
destination_table as destination_table,
insertion_mode as insertion_mode,
primary_keys as primary_keys,
recency_field as recency_field,
table_columns as table_columns
) as query_or_table_or_view,
destination_table as destination_table,
lower(insertion_mode) as insertion_mode,
primary_keys as primary_keys,
recency_field as recency_field,
table_columns as table_columns
));

execute immediate {BIGFUNCTIONS_DATASET}.render_string(query, to_json_string(context));