Skip to content

Commit

Permalink
Merge branch 'main' into microbatch-behavior-flag
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Oct 21, 2024
2 parents da6e6ab + 2bf3808 commit 3600753
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changes/1.7.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## dbt-adapters 1.7.1 - October 15, 2024

### Features

- Enable setting current value of dbt_valid_to ([#320](https://github.com/dbt-labs/dbt-adapters/issues/320))
6 changes: 6 additions & 0 deletions .changes/unreleased/Breaking Changes-20241016-180629.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Drop support for Python 3.8
time: 2024-10-16T18:06:29.535761-04:00
custom:
Author: mikealfare
Issue: "332"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240422-081302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allows unique_key for snapshots to take a list
time: 2024-04-22T08:13:02.937534-04:00
custom:
Author: agpapa
Issue: "181"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241016-160412.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Always validate an incremental model's `incremental_strategy`
time: 2024-10-16T16:04:12.58581-05:00
custom:
Author: QMalcolm
Issue: "330"
2 changes: 1 addition & 1 deletion .github/actions/publish-results/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ inputs:
description: File type for file name stub (e.g. "unit-tests")
required: true
python-version:
description: Python version for the file name stub (e.g. "3.8")
description: Python version for the file name stub (e.g. "3.9")
required: true
source-file:
description: File to be uploaded
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Check out repository
Expand Down
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ repos:
- id: dbt-core-in-adapters-check

- repo: https://github.com/psf/black
rev: 24.4.0
rev: 24.8.0
hooks:
- id: black
args:
- --line-length=99
- --target-version=py38
- --target-version=py39
- --target-version=py310
- --target-version=py311
- --target-version=py312
- --force-exclude=dbt/adapters/events/adapter_types_pb2.py

- repo: https://github.com/pycqa/flake8
rev: 7.0.0
rev: 7.1.1
hooks:
- id: flake8
exclude: dbt/adapters/events/adapter_types_pb2.py|tests/functional/
Expand All @@ -41,7 +41,7 @@ repos:
- --per-file-ignores=*/__init__.py:F401,*/conftest.py:F401

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.9.0
rev: v1.11.2
hooks:
- id: mypy
exclude: dbt/adapters/events/adapter_types_pb2.py|dbt-tests-adapter/dbt/__init__.py
Expand Down
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ and is generated by [Changie](https://github.com/miniscruff/changie).

- dbt-tests-adapters: Add required begin to microbatch model config to BaseMicrobatch test ([#315](https://github.com/dbt-labs/dbt-adapters/issues/315))



## dbt-adapters 1.10.1 - September 16, 2024

## dbt-adapters 1.10.0 - September 12, 2024

## dbt-adapters 1.7.1 - October 15, 2024

### Features

- Enable setting current value of dbt_valid_to ([#320](https://github.com/dbt-labs/dbt-adapters/issues/320))



## dbt-adapters 1.7.0 - September 19, 2024

### Features
Expand Down
7 changes: 7 additions & 0 deletions dbt-tests-adapter/dbt/tests/adapter/basic/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@
{{ config(materialized="incremental") }}
"""

config_materialized_incremental_invalid_strategy = """
{{ config(materialized="incremental", incremental_strategy="bad_strategy") }}
"""

config_materialized_var = """
{{ config(materialized=var("materialized_var", "table"))}}
"""
Expand Down Expand Up @@ -217,3 +221,6 @@
ephemeral_view_sql = config_materialized_view + model_ephemeral
ephemeral_table_sql = config_materialized_table + model_ephemeral
incremental_sql = config_materialized_incremental + model_incremental
incremental_invalid_strategy_sql = (
config_materialized_incremental_invalid_strategy + model_incremental
)
39 changes: 39 additions & 0 deletions dbt-tests-adapter/dbt/tests/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ def test_incremental_not_schema_change(self, project):
assert run_result == RunStatus.Success


class BaseIncrementalBadStrategy:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "incremental"}

@pytest.fixture(scope="class")
def models(self):
return {
"incremental.sql": files.incremental_invalid_strategy_sql,
"schema.yml": files.schema_base_yml,
}

@pytest.fixture(scope="class")
def seeds(self):
return {"base.csv": files.seeds_base_csv, "added.csv": files.seeds_added_csv}

@pytest.fixture(autouse=True)
def clean_up(self, project):
yield
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

def test_incremental_invalid_strategy(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# try to run the incremental model, it should fail on the first attempt
results = run_dbt(["run"], expect_pass=False)
assert len(results.results) == 1
assert (
'dbt could not find an incremental strategy macro with the name "get_incremental_bad_strategy_sql"'
in results.results[0].message
)


class Testincremental(BaseIncremental):
pass

Expand Down
3 changes: 1 addition & 2 deletions dbt-tests-adapter/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = "dbt-tests-adapter"
description = "The set of reusable tests and test fixtures used to test common functionality"
readme = "README.md"
keywords = ["dbt", "adapter", "adapters", "database", "elt", "dbt-core", "dbt Core", "dbt Cloud", "dbt Labs"]
requires-python = ">=3.8.0"
requires-python = ">=3.9.0"
authors = [
{ name = "dbt Labs", email = "[email protected]" },
]
Expand All @@ -17,7 +17,6 @@ classifiers = [
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
Expand Down
11 changes: 11 additions & 0 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Dict,
FrozenSet,
Iterator,
List,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -341,6 +342,16 @@ def create(
)
return cls.from_dict(kwargs)

@classmethod
def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]:
scd_args = []
if isinstance(primary_key, list):
scd_args.extend(primary_key)
else:
scd_args.append(primary_key)
scd_args.append(updated_at)
return scd_args

@property
def can_be_renamed(self) -> bool:
return self.type in self.renameable_relations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

{% set to_drop = [] %}

{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}

{% if existing_relation is none %}
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %}
{% elif full_refresh_mode %}
Expand All @@ -52,9 +55,7 @@
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,32 @@

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

select *, {{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where {{ columns.dbt_valid_to }} is null
where
{% if config.get('dbt_valid_to_current') %}
{# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #}
( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null)
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}

),

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }},
{{ get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}

from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
Expand All @@ -86,9 +86,7 @@

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
select *, {{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
Expand All @@ -100,13 +98,11 @@
source_data.*

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})

)

),
Expand All @@ -119,7 +115,8 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
Expand All @@ -139,8 +136,9 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

Expand All @@ -166,7 +164,7 @@
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }}
{{ get_dbt_valid_to_current(strategy, columns) }}
from (
{{ sql }}
) sbq
Expand Down Expand Up @@ -210,3 +208,52 @@
{% endif %}
{% endif %}
{% endmacro %}


{% macro get_dbt_valid_to_current(strategy, columns) %}
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %}
coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}})
as {{ columns.dbt_valid_to }}
{% endmacro %}


{% macro unique_key_fields(unique_key) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
{{ key }} as dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} , {%- endif %}
{% endfor %}
{% else %}
{{ unique_key }} as dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_join_on(unique_key, identifier, from_identifier) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
{{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} and {%- endif %}
{% endfor %}
{% else %}
{{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_is_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is null
{% else %}
{{ identifier }}.dbt_unique_key is null
{% endif %}
{% endmacro %}


{% macro unique_key_is_not_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is not null
{% else %}
{{ identifier }}.dbt_unique_key is not null
{% endif %}
{% endmacro %}
Loading

0 comments on commit 3600753

Please sign in to comment.