From 1c476fb47ef922bacb4f0b655d3afdf0f3235a97 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:18:27 -0800 Subject: [PATCH 1/7] schema changelog incremental materialization --- models/fivetran_log__schema_changelog.sql | 41 ++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/models/fivetran_log__schema_changelog.sql b/models/fivetran_log__schema_changelog.sql index f00f5745..38de9283 100644 --- a/models/fivetran_log__schema_changelog.sql +++ b/models/fivetran_log__schema_changelog.sql @@ -1,9 +1,39 @@ +{{ config( + materialized='incremental', + unique_key='unique_schema_change_key', + partition_by={ + 'field': 'created_at', + 'data_type': 'timestamp', + 'granularity': 'day' + } if target.type == 'bigquery' else none, + incremental_strategy = 'merge', + file_format = 'delta' +) }} + with schema_changes as ( select * from {{ ref('stg_fivetran_log__log') }} where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') + + {% if is_incremental() %} + + -- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs + {%- call statement('max_schema_change', fetch_result=True) -%} + select max(created_at) from {{ this }} + {%- endcall -%} + + -- load the result from the above query into a new variable + {%- set query_result = load_result('max_schema_change') -%} + + -- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value. + {%- set max_schema_change = query_result['data'][0][0] -%} + + -- compare the new batch of data to the latest sync already stored in this model + and created_at >= '{{ max_schema_change }}' + + {% endif %} ), connector as ( @@ -20,7 +50,8 @@ add_connector_info as ( connector.destination_id, connector.destination_name - from schema_changes join connector using(connector_id) + from schema_changes join + connector on schema_changes.connector_id = connector.connector_id ), final as ( @@ -41,10 +72,12 @@ final as ( case when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='schema') }} - else null end as schema_name + else null end as schema_name, + + {{ dbt_utils.surrogate_key(['connector_id', 'destination_id', 'created_at']) }} as unique_schema_change_key + from add_connector_info ) -select * from final -order by created_at desc, connector_id \ No newline at end of file +select * from final \ No newline at end of file From 721af80f54baa7ba7a28bfff6286839f199ae630 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:27:43 -0800 Subject: [PATCH 2/7] better seed data --- integration_tests/seeds/log.csv | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration_tests/seeds/log.csv b/integration_tests/seeds/log.csv index e5bd6d96..f93105c5 100644 --- a/integration_tests/seeds/log.csv +++ b/integration_tests/seeds/log.csv @@ -22,4 +22,8 @@ intrinsic_departed,2021-12-09 14:26:29.860,2021-12-09 20:30:53.904,intrinsic_dep intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, \ No newline at end of file +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, +intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{"schema":"instagram_business","name":"activity_add_to_fivetran_user","columns":{"activity_type_id":"INTEGER","_fivetran_synced":"TIMESTAMP","primary_attribute_value_id":"INTEGER","activity_date":"TIMESTAMP","primary_attribute_value":"STRING","id":"STRING","campaign_id":"INTEGER","lead_id":"INTEGER"}}",create_table, +intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{"type":"ADD_COLUMN","table":"lead","properties":{"columnName":"salesforce","dataType":"STRING"}}",alter_table, +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{"schema":"fivetran_log_2"}",create_schema, +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{"connectorId":"pg","properties":{"REMOVAL":[{"schema":"public","tables":["active_rows_estimate"]}]}}",change_schema_config, \ No newline at end of file From 809fdee33426634b4e65c91a23b445026e6ce17d Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:28:42 -0800 Subject: [PATCH 3/7] date trunc --- models/fivetran_log__schema_changelog.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/fivetran_log__schema_changelog.sql b/models/fivetran_log__schema_changelog.sql index 38de9283..34be2395 100644 --- a/models/fivetran_log__schema_changelog.sql +++ b/models/fivetran_log__schema_changelog.sql @@ -21,7 +21,7 @@ with schema_changes as ( -- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs {%- call statement('max_schema_change', fetch_result=True) -%} - select max(created_at) from {{ this }} + select date(max(created_at)) from {{ this }} {%- endcall -%} -- load the result from the above query into a new variable @@ -31,7 +31,7 @@ with schema_changes as ( {%- set max_schema_change = query_result['data'][0][0] -%} -- compare the new batch of data to the latest sync already stored in this model - and created_at >= '{{ max_schema_change }}' + and date(created_at) >= '{{ max_schema_change }}' {% endif %} ), From ff4eb39dbe67ec95fbf58862ade446a594480e76 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:34:27 -0800 Subject: [PATCH 4/7] fix seeds --- integration_tests/seeds/log.csv | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/seeds/log.csv b/integration_tests/seeds/log.csv index f93105c5..e5e4aaa3 100644 --- a/integration_tests/seeds/log.csv +++ b/integration_tests/seeds/log.csv @@ -23,7 +23,7 @@ intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_dep intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, -intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{"schema":"instagram_business","name":"activity_add_to_fivetran_user","columns":{"activity_type_id":"INTEGER","_fivetran_synced":"TIMESTAMP","primary_attribute_value_id":"INTEGER","activity_date":"TIMESTAMP","primary_attribute_value":"STRING","id":"STRING","campaign_id":"INTEGER","lead_id":"INTEGER"}}",create_table, -intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{"type":"ADD_COLUMN","table":"lead","properties":{"columnName":"salesforce","dataType":"STRING"}}",alter_table, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{"schema":"fivetran_log_2"}",create_schema, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{"connectorId":"pg","properties":{"REMOVAL":[{"schema":"public","tables":["active_rows_estimate"]}]}}",change_schema_config, \ No newline at end of file +intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user"",""columns"":{""activity_type_id"":""INTEGER"",""_fivetran_synced"":""TIMESTAMP""}}",create_table, +intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead",""properties"":{""columnName"":""salesforce"",""dataType"":""STRING""}}",alter_table, +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, +intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config, \ No newline at end of file From 85730255ee55ff75a2aefd4775512d40af0e5472 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:39:15 -0800 Subject: [PATCH 5/7] seeds? --- integration_tests/dbt_project.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 1ddbdde9..2b9dde0b 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -52,6 +52,7 @@ seeds: +column_types: time_stamp: timestamp transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + message_event: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" transformation: +column_types: created_at: timestamp From 2583b6c7a81478114f5ce900b9e2284d89624bf1 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 15:58:47 -0800 Subject: [PATCH 6/7] figured out seeds? --- integration_tests/seeds/log.csv | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/seeds/log.csv b/integration_tests/seeds/log.csv index e5e4aaa3..2c72939b 100644 --- a/integration_tests/seeds/log.csv +++ b/integration_tests/seeds/log.csv @@ -23,7 +23,7 @@ intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_dep intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, -intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user"",""columns"":{""activity_type_id"":""INTEGER"",""_fivetran_synced"":""TIMESTAMP""}}",create_table, -intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead",""properties"":{""columnName"":""salesforce"",""dataType"":""STRING""}}",alter_table, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, -intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config, \ No newline at end of file +intrinsic_departed,2021-12-11 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config, +intrinsic_departed,2021-12-10 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, +intrinsic_departed,2021-12-19 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead"}",alter_table, +intrinsic_departed,2021-12-10 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user""}",create_table, \ No newline at end of file From 38b0ed447c71aa81832f46a1ab599b705a089934 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Mon, 14 Feb 2022 16:19:44 -0800 Subject: [PATCH 7/7] bq seeds --- integration_tests/dbt_project.yml | 12 ++++++------ integration_tests/seeds/log.csv | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 2b9dde0b..33adebdf 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -43,7 +43,7 @@ seeds: destination: +column_types: created_at: timestamp - id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" destination_membership: +column_types: activated_at: timestamp @@ -51,18 +51,18 @@ seeds: log: +column_types: time_stamp: timestamp - transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" - message_event: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" + message_event: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" transformation: +column_types: created_at: timestamp - destination_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" - id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + destination_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" + id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" trigger_table: +quote_columns: "{{ true if target.type in ('redshift', 'postgres') else false }}" +enabled: "{{ true if target.type != 'snowflake' else false }}" +column_types: - transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}" + transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}" trigger_table_snowflake: +enabled: "{{ true if target.type == 'snowflake' else false }}" user: diff --git a/integration_tests/seeds/log.csv b/integration_tests/seeds/log.csv index 2c72939b..1e97d244 100644 --- a/integration_tests/seeds/log.csv +++ b/integration_tests/seeds/log.csv @@ -23,7 +23,7 @@ intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_dep intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start, intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start, +intrinsic_departed,2021-12-10 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user""}",create_table, +intrinsic_departed,2021-12-19 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead""}",alter_table, intrinsic_departed,2021-12-11 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config, -intrinsic_departed,2021-12-10 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, -intrinsic_departed,2021-12-19 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead"}",alter_table, -intrinsic_departed,2021-12-10 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user""}",create_table, \ No newline at end of file +intrinsic_departed,2021-12-10 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema, \ No newline at end of file