Skip to content

Commit

Permalink
fix(): Fix multiple source databases in schema config
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Nuzzo-Mueller committed Oct 13, 2023
1 parent 2d2db05 commit 5416e7d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 33 deletions.
2 changes: 1 addition & 1 deletion dbt_schema_builder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Automate management of PII redacted schemas for dbt projects.
"""

__version__ = '0.4.11'
__version__ = '0.4.13'
6 changes: 3 additions & 3 deletions dbt_schema_builder/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self.app_path = app_path
self.design_file_path = design_file_path
self.current_raw_sources = current_raw_sources
self.current_downstream_sources = current_downstream_sources
self.current_downstream_sources = current_downstream_sources or {}
self.safe_downstream_source_name = app
self.pii_downstream_source_name = "{}_PII".format(app)
if no_pii and pii_only:
Expand Down Expand Up @@ -122,7 +122,7 @@ def check_downstream_sources_for_dupes(self):

return dupes

def add_source_to_new_schema(self, current_raw_source, relation, source_database, raw_schema):
def add_source_to_new_schema(self, current_raw_source, relation, raw_schema):
"""
Add our table to the appropriate raw schema entry in our "sources" list
in the new schema.
Expand All @@ -132,7 +132,7 @@ def add_source_to_new_schema(self, current_raw_source, relation, source_database
source_index = index
break

self.new_schema["sources"][source_index]["database"] = source_database
self.new_schema["sources"][source_index]["database"] = raw_schema.database

if current_raw_source:
self.new_schema["sources"][source_index]["tables"].append(
Expand Down
23 changes: 10 additions & 13 deletions dbt_schema_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import string
from pathlib import Path

import dbt.utils
import yaml
Expand Down Expand Up @@ -393,17 +394,13 @@ def build_app_path(self, app_destination_database, app_destination_schema):
"""
Create a path to the directory into which schema files will be built
"""
db_path = os.path.join(self.source_path, app_destination_database)
db_path = Path(os.path.join(self.source_path, app_destination_database))
db_path.mkdir(parents=True, exist_ok=True)

if not os.path.isdir(db_path):
os.mkdir(db_path)
app_path = Path(os.path.join(db_path, app_destination_schema))
app_path.mkdir(parents=True, exist_ok=True)

app_path = os.path.join(db_path, app_destination_schema)

if not os.path.isdir(app_path):
os.mkdir(app_path)

return app_path
return str(app_path)

@staticmethod
def get_current_raw_schema_attrs(design_file_path):
Expand All @@ -426,8 +423,8 @@ def get_current_downstream_sources_attrs(downstream_sources_dir_path, downstream
Make sure the path exists for holding downstream sources files, and check if there's an existing file that we
need to preserve.
"""
if not os.path.isdir(downstream_sources_dir_path):
os.mkdir(downstream_sources_dir_path)
downstream_sources_dir = Path(downstream_sources_dir_path)
downstream_sources_dir.mkdir(parents=True, exist_ok=True)

if os.path.exists(downstream_sources_file_path):
with open(downstream_sources_file_path, "r") as f:
Expand Down Expand Up @@ -511,7 +508,7 @@ def build_app(self, app_name, app_config, no_pii=False, pii_only=False):
app_source_database = raw_schema_name.split('.')[0]
app_source_schema = raw_schema_name.split('.')[1]
raw_schema = Schema.from_config(
app_source_schema, raw_schema_config
app_source_database, app_source_schema, raw_schema_config
)
raw_schema_relations = self.get_relations(app_source_database, app_source_schema)
for source_relation_name, meta_data in raw_schema_relations[app_source_schema].items():
Expand Down Expand Up @@ -553,7 +550,7 @@ def build_app(self, app_name, app_config, no_pii=False, pii_only=False):
current_downstream_sources,
prefix=raw_schema.prefix
)
app_object.add_source_to_new_schema(current_raw_source, relation, app_source_database, raw_schema)
app_object.add_source_to_new_schema(current_raw_source, relation, raw_schema)
app_object.add_table_to_downstream_sources(
relation,
current_safe_source,
Expand Down
6 changes: 4 additions & 2 deletions dbt_schema_builder/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ class Schema:
Class to represent a raw Schema used to back an application schema
"""

def __init__(self, schema_name, exclusion_list, inclusion_list, soft_delete_column_name,
def __init__(self, database, schema_name, exclusion_list, inclusion_list, soft_delete_column_name,
soft_delete_sql_predicate, relations=None, prefix=None):
self.database = database
self.schema_name = schema_name
self.exclusion_list = exclusion_list
self.inclusion_list = inclusion_list
Expand Down Expand Up @@ -42,7 +43,7 @@ def validate(self):
)

@classmethod
def from_config(cls, schema_name, config):
def from_config(cls, source_database, schema_name, config):
"""
Construct a Schema object from a config dictionary. This is encapuslated
into it's own function to help declutter the `run` function in
Expand All @@ -68,6 +69,7 @@ def from_config(cls, schema_name, config):
prefix = config['PREFIX']

schema = Schema(
source_database,
schema_name,
exclusion_list,
inclusion_list,
Expand Down
26 changes: 13 additions & 13 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@


def test_add_source_to_new_schema():
schema_1 = Schema('LMS_TEST_RAW', [], [], None, None)
schema_2 = Schema('LMS_RAW', [], [], None, None)
schema_3 = Schema('LMS_STITCH_RAW', [], [], None, None)
schema_1 = Schema('PROD', 'LMS_TEST_RAW', [], [], None, None)
schema_2 = Schema('PROD', 'LMS_RAW', [], [], None, None)
schema_3 = Schema('PROD', 'LMS_STITCH_RAW', [], [], None, None)
raw_schemas = [schema_1, schema_2, schema_3]
app = App(
raw_schemas,
Expand All @@ -33,7 +33,7 @@ def test_add_source_to_new_schema():
[],
[]
)
app.add_source_to_new_schema(current_raw_source, relation, 'PROD', schema_2)
app.add_source_to_new_schema(current_raw_source, relation, schema_2)

current_raw_source = {"name": "THAT_TABLE", "description": "some special description"}
relation = Relation(
Expand All @@ -46,7 +46,7 @@ def test_add_source_to_new_schema():
[],
[]
)
app.add_source_to_new_schema(current_raw_source, relation, 'PROD', schema_2)
app.add_source_to_new_schema(current_raw_source, relation, schema_2)

expected_schema = {
"version": 2,
Expand All @@ -73,7 +73,7 @@ def test_add_source_to_new_schema():

def test_update_trifecta_models():
raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_add_table_to_downstream_sources(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -241,7 +241,7 @@ def test_add_table_to_downstream_sources_no_pii(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -339,7 +339,7 @@ def test_prefix(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None, prefix="TEST_PREFIX")
Schema('PROD', 'LMS_RAW', [], [], None, None, prefix="TEST_PREFIX")
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -435,7 +435,7 @@ def test_prefix_when_already_applied(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None, prefix="TEST_PREFIX")
Schema('PROD', 'LMS_RAW', [], [], None, None, prefix="TEST_PREFIX")
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -531,7 +531,7 @@ def test_dupe_detection(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -608,7 +608,7 @@ def test_add_table_to_downstream_sources_pii_only(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down Expand Up @@ -692,7 +692,7 @@ def test_add_table_to_downstream_sources_pii_merge(tmpdir):
manual_model_file.write('data')

raw_schemas = [
Schema('LMS_RAW', [], [], None, None)
Schema('PROD', 'LMS_RAW', [], [], None, None)
]
app = App(
raw_schemas,
Expand Down
43 changes: 42 additions & 1 deletion tests/test_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
Tests for various things in builder.py
"""

import os
from tempfile import mkdtemp
from unittest.mock import MagicMock, patch

import pytest
import yaml

from dbt_schema_builder.builder import SchemaBuilder
from dbt_schema_builder.builder import GetCatalogTask, SchemaBuilder
from dbt_schema_builder.schema import InvalidConfigurationException


Expand Down Expand Up @@ -168,3 +173,39 @@ def test_bad_regex_unmanaged_tables_file():
'an invalid regular expression'
)
assert err_msg in str(excinfo.value)


@patch.object(SchemaBuilder, 'get_redactions', lambda x: {})
@patch.object(SchemaBuilder, 'get_snowflake_keywords', lambda x: {})
@patch.object(SchemaBuilder, 'get_banned_columns', lambda x: {})
@patch.object(SchemaBuilder, 'get_unmanaged_tables', lambda x: {})
@patch.object(SchemaBuilder, 'get_downstream_sources_allow_list', lambda x: {})
def test_build_app():
app_name = 'DB_1.APP'
app_config = {
app_name: {
'DB_2.RAW_SCHEMA_1': {},
'DB_3.RAW_SCHEMA_2': {},
}
}

temp_dir = mkdtemp()
mock_get_catalog_task = MagicMock(GetCatalogTask)
mock_get_catalog_task.run.return_value = [
{"TABLE_NAME": "TABLE_A", "COLUMN_NAME": "COLUMN_A"},
{"TABLE_NAME": "TABLE_B", "COLUMN_NAME": "COLUMN_D"},
]
with patch.object(SchemaBuilder, 'build_app_path', lambda x, y, z: temp_dir):
with patch.object(SchemaBuilder, 'get_app_schema_configs', lambda x: app_config):
builder = SchemaBuilder(temp_dir, temp_dir, temp_dir, mock_get_catalog_task)
builder.build_app(app_name, app_config[app_name])
with open(os.path.join(temp_dir, 'models/automatically_generated_sources/APP.yml')) as fp:
downstream_sources = yaml.safe_load(fp)
assert downstream_sources['sources'][0]['name'] == 'APP'
assert downstream_sources['sources'][1]['name'] == 'APP_PII'
with open(os.path.join(temp_dir, 'APP.yml')) as fp:
raw_source = yaml.safe_load(fp)
assert raw_source['sources'][0]['database'] == 'DB_2'
assert raw_source['sources'][0]['name'] == 'RAW_SCHEMA_1'
assert raw_source['sources'][1]['database'] == 'DB_3'
assert raw_source['sources'][1]['name'] == 'RAW_SCHEMA_2'
6 changes: 6 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_raw_schema_filter_with_exclusion_list():
exclusion_list = ['NOT_THIS_TABLE']

raw_schema = Schema(
'PROD',
'LMS_RAW',
exclusion_list,
[],
Expand Down Expand Up @@ -93,6 +94,7 @@ def test_raw_schema_filter_with_inclusion_list():
]
inclusion_list = ['ONLY_THIS_TABLE']
raw_schema = Schema(
'PROD',
'LMS_RAW',
[],
inclusion_list,
Expand All @@ -107,6 +109,7 @@ def test_raw_schema_filter_with_inclusion_list():

def test_raw_schema_sql_clause_success():
raw_schema = Schema(
'PROD',
'LMS_RAW',
[],
[],
Expand All @@ -121,6 +124,7 @@ def test_raw_schema_sql_clause_success():
def test_raw_schema_sql_predicate_null():
with pytest.raises(InvalidConfigurationException) as excinfo:
Schema(
'PROD',
'LMS_RAW',
[],
[],
Expand All @@ -135,6 +139,7 @@ def test_raw_schema_sql_predicate_null():
def test_raw_schema_sql_predicate_empty():
with pytest.raises(InvalidConfigurationException) as excinfo:
Schema(
'PROD',
'LMS_RAW',
[],
[],
Expand All @@ -149,6 +154,7 @@ def test_raw_schema_sql_predicate_empty():
def test_raw_schema_sql_clause_non_string():
with pytest.raises(InvalidConfigurationException) as excinfo:
Schema(
'PROD',
'LMS_RAW',
[],
[],
Expand Down

0 comments on commit 5416e7d

Please sign in to comment.