diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 8d982855..35f1f092 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -35,7 +35,29 @@ {% endmacro %} {% macro get_snapshot_table_column_names() %} - {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }} +{% endmacro %} + +{# Check the hard_deletes config enum, and the legacy invalidate_hard_deletes + config flag in order to determine which behavior should be used for deleted + records in the current snapshot. The default is to ignore them. #} +{% macro get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') %} + {% set hard_deletes = config.get('hard_deletes') %} + + {% if invalidate_hard_deletes is not none and hard_deletes is not none %} + {% do exceptions.raise_compiler_error("You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot.") %} + {% endif %} + + {% if invalidate_hard_deletes or hard_deletes == 'invalidate' %} + {{ return('invalidate') }} + {% elif hard_deletes == 'new_record' %} + {{ return('new_record') }} + {% elif hard_deletes is none or hard_deletes == 'ignore' %} + {{ return('ignore') }} + {% else %} + {% do exceptions.raise_compiler_error("Invalid setting for property hard_deletes.") %} + {% endif %} {% endmacro %} {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} @@ -82,7 +104,7 @@ from snapshot_query ), - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' %} deletes_source_data as ( @@ -125,7 +147,7 @@ ) ) - {%- if strategy.invalidate_hard_deletes -%} + {%- if strategy.hard_deletes == 'invalidate' %} , deletes as ( @@ -147,7 +169,7 @@ select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' %} union all select * from deletes {%- endif %} @@ -167,6 +189,10 @@ {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }} + {% if strategy.hard_deletes == 'new_record' -%} + , + {{ strategy.dbt_is_deleted }} as {{ columns.dbt_is_deleted }} + {%- endif -%} from ( {{ sql }} ) sbq diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 8c086182..de8cf653 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -54,7 +54,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set primary_key = config.get('unique_key') %} {% set updated_at = config.get('updated_at') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* @@ -77,7 +78,8 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} @@ -140,7 +142,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set check_cols_config = config.get('check_cols') %} {% set primary_key = config.get('unique_key') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} @@ -173,6 +176,7 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %}