Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hard_deletes config and new_record Option for Snapshots #317

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,29 @@
{% endmacro %}

{% macro get_snapshot_table_column_names() %}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }}
{% endmacro %}

{# Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
config flag in order to determine which behavior should be used for deleted
records in the current snapshot. The default is to ignore them. #}
{% macro get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') %}
{% set hard_deletes = config.get('hard_deletes') %}

{% if invalidate_hard_deletes is not none and hard_deletes is not none %}
{% do exceptions.raise_compiler_error("You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot.") %}
{% endif %}

{% if invalidate_hard_deletes or hard_deletes == 'invalidate' %}
{{ return('invalidate') }}
{% elif hard_deletes == 'new_record' %}
{{ return('new_record') }}
{% elif hard_deletes is none or hard_deletes == 'ignore' %}
{{ return('ignore') }}
{% else %}
{% do exceptions.raise_compiler_error("Invalid setting for property hard_deletes.") %}
{% endif %}
{% endmacro %}

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
Expand Down Expand Up @@ -82,7 +104,7 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' %}

deletes_source_data as (

Expand Down Expand Up @@ -125,7 +147,7 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
{%- if strategy.hard_deletes == 'invalidate' %}
,

deletes as (
Expand All @@ -147,7 +169,7 @@
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' %}
union all
select * from deletes
{%- endif %}
Expand All @@ -167,6 +189,10 @@
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }}
{% if strategy.hard_deletes == 'new_record' -%}
,
{{ strategy.dbt_is_deleted }} as {{ columns.dbt_is_deleted }}
{%- endif -%}
from (
{{ sql }}
) sbq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set primary_key = config.get('unique_key') %}
{% set updated_at = config.get('updated_at') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{#/*
Expand All @@ -77,7 +78,8 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -140,7 +142,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set check_cols_config = config.get('check_cols') %}
{% set primary_key = config.get('unique_key') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set updated_at = config.get('updated_at') or snapshot_get_time() %}

{% set column_added = false %}
Expand Down Expand Up @@ -173,6 +176,7 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}
Loading