Skip to content

Commit

Permalink
Add Incremental Merge Strategy (#228)
Browse files Browse the repository at this point in the history
### Summary

Add implementation for merge functionality when using an incremental
materialization.

### Description

Dremio supports both merge and append now. More details on how to use
merge strategies can be found here:
https://docs.getdbt.com/docs/build/incremental-strategy#strategy-specific-configs.

### Test Results

Only ran incremental tests as the new changes don't impact other parts
of the adapter.

### Changelog

-   [x] Added a summary of what this PR accomplishes to CHANGELOG.md

### Related Issue
(#224)
  • Loading branch information
ravjotbrar authored May 14, 2024
1 parent 04900e6 commit 8a295da
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 20 deletions.
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# dbt-dremio MAIN

- [#223](https://github.com/dremio/dbt-dremio/issues/224) Implement merge strategy for incremental materializations

# dbt-dremio v1.7.0

## Changes
- [#8307](https://github.com/dbt-labs/dbt-core/discussions/8307) Allow source freshness to be evaluated from table metadata
- [#8307](https://github.com/dbt-labs/dbt-core/discussions/8307) Catalog fetch performance improvements
- [#8307](https://github.com/dbt-labs/dbt-core/discussions/8307) Migrate data_spine macros
- [#195](https://github.com/dremio/dbt-dremio/issues/195) Ensure api call to create folders does not get called when creating a table

- [#195](https://github.com/dremio/dbt-dremio/issues/195) Ensure the adapter does not try and create folders in object storage source
- [#220](https://github.com/dremio/dbt-dremio/pull/220) Optimize networking performance with Dremio server


# dbt-dremio v1.5.1

## Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ limitations under the License.*/
-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set dest_columns = dremio__get_columns_in_relation(existing_relation) %}
{% endif %}

-- Get the incremental_strategy, the macro to use for the strategy, and build the sql
{%- set incremental_strategy = config.get('incremental_strategy', validator=validation.any[basestring]) or 'append' -%}
{%- set raw_file_format = config.get('format', validator=validation.any[basestring]) or 'iceberg' -%}
{%- set file_format = dbt_dremio_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_dremio_validate_get_incremental_strategy(incremental_strategy, file_format) -%}
{%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%}
{%- set strategy = dbt_dremio_validate_get_incremental_strategy(incremental_strategy) -%}
{%- set raw_on_schema_change = config.get('on_schema_change', validator=validation.any[basestring]) or 'ignore' -%}
{% set build_sql = dbt_dremio_get_incremental_sql(strategy, intermediate_relation, target_relation, unique_key) %}
{% set build_sql = dbt_dremio_get_incremental_sql(strategy, intermediate_relation, target_relation, dest_columns, unique_key) %}

{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,71 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.*/

{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set src_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set intersection = intersect_columns(src_columns, dest_columns) -%}
{%- set dest_cols_csv = intersection | map(attribute='quoted') | join(', ') -%}
{% macro dremio__get_incremental_append_sql(source_relation, target_relation, dest_columns) %}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into {{ target_relation }}( {{dest_cols_csv}} )
select {{dest_cols_csv}} from {{ source_relation }}

{% endmacro %}

{% macro dbt_dremio_get_incremental_sql(strategy, source, target, unique_key) %}

{% macro dremio__get_incremental_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}

when not matched then insert
({{ dest_cols_csv }})
values
({{ dest_cols_csv }})

{% endmacro %}

{% macro dbt_dremio_get_incremental_sql(strategy, source, target, dest_columns, unique_key) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{{ dremio__get_incremental_append_sql(source, target, dest_columns) }}
{%- elif strategy == 'merge' -%}
{{dremio__get_incremental_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none)}}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.CompilationError(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ limitations under the License.*/
{% do return(raw_file_format) %}
{% endmacro %}

{% macro dbt_dremio_validate_get_incremental_strategy(raw_strategy, file_format) %}
{% macro dbt_dremio_validate_get_incremental_strategy(raw_strategy) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append'
Expected one of: 'append, merge'
{%- endset %}

{% if raw_strategy not in ['append'] %}
{% if raw_strategy not in ['append', 'merge'] %}
{% do exceptions.CompilationError(invalid_strategy_msg) %}
{% endif %}

Expand Down
74 changes: 73 additions & 1 deletion tests/functional/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,54 @@
BaseIncremental,
BaseIncrementalNotSchemaChange,
)
from dbt.tests.util import run_dbt, check_relations_equal, relation_from_name
from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)
from tests.fixtures.profiles import unique_schema, dbt_profile_data
from tests.utils.util import BUCKET, SOURCE
from dbt.tests.util import run_dbt, relation_from_name, check_relations_equal
from collections import namedtuple


models__merge_exclude_columns_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
merge_exclude_columns='msg'
) }}
{% if not is_incremental() %}
-- data for first invocation of model
select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color
{% else %}
-- data for subsequent incremental update
select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color
{% endif %}
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"relation",
],
)


# Need to modify test to not assert any sources for it to pass
Expand Down Expand Up @@ -65,3 +111,29 @@ def test_incremental(self, project):

class TestBaseIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange):
pass


class TestBaseMergeExcludeColumnsDremio(BaseMergeExcludeColumns):
def get_test_fields(self, project, seed, incremental_model, update_sql_file):
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(
run_dbt(["run", "--select", incremental_model, "--full-refresh"])
)

relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = "select * from {}.{}".format(
f"{SOURCE}.{BUCKET}.{project.test_schema}", seed
)

seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(
incremental_model=incremental_model
)

return ResultHolder(
seed_count, model_count, seed_rows, inc_test_model_count, relation
)

1 comment on commit 8a295da

@somativa-mauricio-macri

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a date on when this will be added to the latest version? Anxious to use it.

Please sign in to comment.