Skip to content

Commit

Permalink
Use tmp table in static insert overwrite (#630)
Browse files Browse the repository at this point in the history
* Use tmp table in static insert overwrite

* review changes

* Update .changes/unreleased/Fixes-20230325-204352.yaml

Co-authored-by: Doug Beatty <[email protected]>

* add test for static overwrite day case

---------

Co-authored-by: Christophe Oudar <[email protected]>
Co-authored-by: Doug Beatty <[email protected]>
  • Loading branch information
3 people authored Jul 14, 2023
1 parent 1d48967 commit d3deeb8
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230325-204352.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Use tmp table in static insert overwrite to avoid computing the SQL twice
time: 2023-03-25T20:43:52.830135+01:00
custom:
Author: Kayrnt
Issue: 427 556
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
Expand All @@ -9,7 +9,7 @@
{% endif %}

{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{{ return(build_sql) }}
Expand Down Expand Up @@ -39,14 +39,14 @@
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) }}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }}
{% else %} {# dynamic #}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% endif %}
{% endmacro %}

{% macro bq_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{% set predicate -%}
Expand All @@ -57,25 +57,39 @@

{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%}
select
{{ partition_by.insertable_time_partitioning_field() }},
* from {{ tmp_relation }}
{% elif tmp_relation_exists -%}
select
* from {{ tmp_relation }}
{%- elif partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{%- else -%}
{{sql}}
{{sql}}
{%- endif -%}

)
{%- endset -%}

{% if copy_partitions %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}

{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "dynamic" case, we save the model SQL result as a temp table first, wherein the
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
-- 1. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}

{% endif %}
{% endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,50 @@
where date_time > '2020-01-01'
{% endif %}
""".lstrip()

overwrite_static_day_sql = """
{% set partitions_to_replace = [
"'2020-01-01'",
"'2020-01-02'",
] %}
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime",
"granularity": "day"
},
partitions=partitions_to_replace,
on_schema_change="sync_all_columns"
)
}}
with data as (
{% if not is_incremental() %}
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time
{% else %}
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time
{% endif %}
)
select * from data
""".lstrip()
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
overwrite_time_sql,
overwrite_day_with_time_ingestion_sql,
overwrite_day_with_time_partition_datetime_sql,
overwrite_static_day_sql,
)


Expand All @@ -46,6 +47,7 @@ def models(self):
"incremental_overwrite_time.sql": overwrite_time_sql,
"incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql,
"incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql,
"incremental_overwrite_static_day.sql": overwrite_static_day_sql,
}

@pytest.fixture(scope="class")
Expand All @@ -63,10 +65,10 @@ def seeds(self):
def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(self, project):
run_dbt(["seed"])
results = run_dbt()
assert len(results) == 10
assert len(results) == 11

results = run_dbt()
assert len(results) == 10
assert len(results) == 11
incremental_strategies = [
("incremental_merge_range", "merge_expected"),
("incremental_merge_time", "merge_expected"),
Expand All @@ -79,6 +81,7 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se
"incremental_overwrite_day_with_time_partition_datetime",
"incremental_overwrite_day_with_time_partition_expected",
),
("incremental_overwrite_static_day", "incremental_overwrite_day_expected"),
]
db_with_schema = f"{project.database}.{project.test_schema}"
for incremental_strategy in incremental_strategies:
Expand Down

0 comments on commit d3deeb8

Please sign in to comment.