diff --git a/CHANGELOG.md b/CHANGELOG.md index 51cdb79b..6b4c9f5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,11 @@ - Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not - `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio) - Removing duplicated macros array_append, array_concat as Dremio already has SQL functions analogues. +- [#250](https://github.com/dremio/dbt-dremio/pull/250) Possibility to integrate wikis and tags by enabling `relation` option from `persist_docs` configuration + - New macro `dremio__persist_docs` created + - Views also perform `persist_docs` macro + - Integration via REST API + ## Dependency - [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0 @@ -23,6 +28,7 @@ - [#223](https://github.com/dremio/dbt-dremio/issues/224) Implement merge strategy for incremental materializations - [#229](https://github.com/dremio/dbt-dremio/issues/229) Add max operator to get_relation_last_modified macro +- [#250](https://github.com/dremio/dbt-dremio/pull/250) Implementation of wikis and tags feature # dbt-dremio v1.7.0 diff --git a/dbt/adapters/dremio/api/rest/client.py b/dbt/adapters/dremio/api/rest/client.py index 92bfb49d..4ecdbafd 100644 --- a/dbt/adapters/dremio/api/rest/client.py +++ b/dbt/adapters/dremio/api/rest/client.py @@ -132,6 +132,79 @@ def delete_catalog(self, cid): self._parameters.authentication.get_headers(), ssl_verify=self._parameters.authentication.verify_ssl, ) + + # dbt docs integration within Dremio wikis and tags + def create_wiki(self, object_id: str, text: str): + url = UrlBuilder.wikis_management_url(self._parameters, object_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"text": text}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def retrieve_wiki(self, object_id: str): + url = UrlBuilder.wikis_management_url(self._parameters, object_id) + return _get( + url, + self._parameters.authentication.get_headers(), + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def update_wiki(self, object_id: str, text: str, version: int): + url = UrlBuilder.wikis_management_url(self._parameters, object_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"text": text, "version": version}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def delete_wiki(self, object_id: str, version: int): + url = UrlBuilder.wikis_management_url(self._parameters, object_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"text": "", "version": version}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + + def create_tags(self, dataset_id: str, tags: list[str]): + url = UrlBuilder.tags_management_url(self._parameters, dataset_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"tags": tags}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def retrieve_tags(self, dataset_id: str): + url = UrlBuilder.tags_management_url(self._parameters, dataset_id) + return _get( + url, + self._parameters.authentication.get_headers(), + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def update_tags(self, dataset_id: str, tags: list[str], version: str): + url = UrlBuilder.tags_management_url(self._parameters, dataset_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"tags": tags, "version": version}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def delete_tags(self, dataset_id: str, version: str): + url = UrlBuilder.tags_management_url(self._parameters, dataset_id) + return _post( + url, + self._parameters.authentication.get_headers(), + json={"tags": [], "version": version}, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + def get_reflections(self, dataset_id): url = UrlBuilder.get_reflection_url(self._parameters, dataset_id) @@ -158,3 +231,4 @@ def update_reflection(self, reflection_id, payload): json=payload, ssl_verify=self._parameters.authentication.verify_ssl, ) + \ No newline at end of file diff --git a/dbt/adapters/dremio/api/rest/url_builder.py b/dbt/adapters/dremio/api/rest/url_builder.py index a92327bf..42a991b7 100644 --- a/dbt/adapters/dremio/api/rest/url_builder.py +++ b/dbt/adapters/dremio/api/rest/url_builder.py @@ -34,6 +34,10 @@ class UrlBuilder: SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog" CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog" + DREMIO_WIKIS_ENDPOINT = "/collaboration/wiki" + + DREMIO_TAGS_ENDPOINT = "/collaboration/tag" + SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection" CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection" @@ -145,6 +149,15 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list): joined_path_str = "/".join(quoted_path_list).replace('"', "") endpoint = f"/by-path/{joined_path_str}" return url_path + endpoint + + # dbt docs integration within Dremio wikis and tags + @classmethod + def wikis_management_url(cls, parameters: Parameters, object_id: str) -> str: + return cls.catalog_url(parameters) + f"/{object_id}{UrlBuilder.DREMIO_WIKIS_ENDPOINT}" + + @classmethod + def tags_management_url(cls, parameters: Parameters, dataset_id: str) -> str: + return cls.catalog_url(parameters) + f"/{dataset_id}{UrlBuilder.DREMIO_TAGS_ENDPOINT}" @classmethod def create_reflection_url(cls, parameters: Parameters): diff --git a/dbt/adapters/dremio/connections.py b/dbt/adapters/dremio/connections.py index 6cbeaea4..5f590e71 100644 --- a/dbt/adapters/dremio/connections.py +++ b/dbt/adapters/dremio/connections.py @@ -48,7 +48,6 @@ logger = AdapterLogger("dremio") - class DremioConnectionManager(SQLConnectionManager): TYPE = "dremio" DEFAULT_CONNECTION_RETRIES = 5 @@ -132,8 +131,8 @@ def add_commit_query(self): # Auto_begin may not be relevant with the rest_api def add_query( - self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, - fetch=False + self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, + fetch=False ): connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -231,6 +230,92 @@ def create_catalog(self, relation): logger.debug(f"Creating folder(s): {database}.{schema}") self._create_folders(database, schema, rest_client) return + + # dbt docs integration with Dremio wikis and tags + def process_wikis(self, relation, text: str): + logger.debug("Integrating wikis") + thread_connection = self.get_thread_connection() + connection = self.open(thread_connection) + rest_client = connection.handle.get_client() + database = relation.database + schema = relation.schema + + path = self._create_path_list(database,schema) + identifier = relation.identifier + path.append(identifier) + try: + catalog_info = rest_client.get_catalog_item( + catalog_id=None, + catalog_path=path, + ) + except DremioNotFoundException: + logger.debug("Catalog not found. Returning") + return + + object_id = catalog_info.get("id") + stored_wiki = rest_client.retrieve_wiki(object_id) + wiki_content = stored_wiki.get("text") + wiki_version = stored_wiki.get("version", None) + + if wiki_version is None: + logger.debug(f"Creating wiki for {'.'.join(path)}") + result = rest_client.create_wiki(object_id, text) + logger.debug(result) + return + + if wiki_content != text: + if text == "": # text is empty, delete wiki + logger.debug(f"Deleting wiki for {'.'.join(path)}") + result = rest_client.delete_wiki(object_id, wiki_version) + logger.debug(result) + return + + logger.debug(f"Updating wiki for {'.'.join(path)}") + result = rest_client.update_wiki(object_id, text, wiki_version) + logger.debug(result) + + def process_tags(self, relation, tags: list[str]): + logger.debug("Integrating tags") + thread_connection = self.get_thread_connection() + connection = self.open(thread_connection) + rest_client = connection.handle.get_client() + database = relation.database + schema = relation.schema + + path = self._create_path_list(database,schema) + identifier = relation.identifier + path.append(identifier) + try: + catalog_info = rest_client.get_catalog_item( + catalog_id=None, + catalog_path=path, + ) + except DremioNotFoundException: + logger.debug("Catalog not found. Returning") + return + + object_id = catalog_info.get("id") + stored_tags = rest_client.retrieve_tags(object_id) + tags_list = stored_tags.get("tags") + tags_version = stored_tags.get("version", None) + + if tags_version is None: + logger.debug(f"Creating tags for {'.'.join(path)}") + result = rest_client.create_tags(object_id, tags) + logger.debug(result) + return + + if tags_list != tags: + if tags == []: # tags is empty, delete tags + logger.debug(f"Deleting tags for {'.'.join(path)}") + result = rest_client.delete_tags(object_id, tags_version) + logger.debug(result) + return + + logger.debug(f"Updating tags for {'.'.join(path)}") + result = rest_client.update_tags(object_id, tags, tags_version) + logger.debug(result) + def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], diff --git a/dbt/adapters/dremio/impl.py b/dbt/adapters/dremio/impl.py index db139b74..2f24acdb 100644 --- a/dbt/adapters/dremio/impl.py +++ b/dbt/adapters/dremio/impl.py @@ -178,6 +178,15 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False + # dbt docs integration with Dremio wikis and tags + @available + def process_wikis(self, relation: DremioRelation, text: str) -> None: + self.connections.process_wikis(relation, text) + + @available + def process_tags(self, relation: DremioRelation, tags: list[str]) -> None: + self.connections.process_tags(relation, tags) + @available def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], date_dimensions: List[str], measures: List[str], computations: List[str], diff --git a/dbt/include/dremio/macros/adapters/persist_docs.sql b/dbt/include/dremio/macros/adapters/persist_docs.sql new file mode 100644 index 00000000..cf96748a --- /dev/null +++ b/dbt/include/dremio/macros/adapters/persist_docs.sql @@ -0,0 +1,6 @@ +{% macro dremio__persist_docs(relation, model, for_relation, for_columns) -%} + {% if for_relation and config.persist_relation_docs() %} + {% do adapter.process_wikis(relation, model.description) %} + {% do adapter.process_tags(relation, model.tags) %} + {% endif %} +{% endmacro %} diff --git a/dbt/include/dremio/macros/materializations/view/view.sql b/dbt/include/dremio/macros/materializations/view/view.sql index c3b9285c..7435d125 100644 --- a/dbt/include/dremio/macros/materializations/view/view.sql +++ b/dbt/include/dremio/macros/materializations/view/view.sql @@ -41,6 +41,8 @@ limitations under the License.*/ {{ enable_default_reflection() }} + {% do persist_docs(target_relation, model) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {{ run_hooks(post_hooks) }} diff --git a/tests/functional/adapter/dremio_specific/test_persist_docs.py b/tests/functional/adapter/dremio_specific/test_persist_docs.py new file mode 100644 index 00000000..4c7bcacf --- /dev/null +++ b/tests/functional/adapter/dremio_specific/test_persist_docs.py @@ -0,0 +1,363 @@ +# Copyright (C) 2022 Dremio Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from dbt.tests.adapter.persist_docs.test_persist_docs import BasePersistDocs +from dbt.tests.adapter.persist_docs.fixtures import ( + _DOCS__MY_FUN_DOCS, + _MODELS__TABLE, + _MODELS__VIEW +) +from dbt.tests.util import run_dbt, write_file + +from dbt.adapters.dremio.api.parameters import ParametersBuilder + +from build.lib.dbt.adapters.dremio.api.rest.client import DremioRestClient + +from tests.utils.util import BUCKET + +# Excluded seed +_PROPERTIES__SCHEMA_YML = """ +version: 2 +models: + - name: table_model + description: | + Table model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + - name: name + description: | + Some stuff here and then a call to + {{ doc('my_fun_doc')}} + config: + tags: ["test_tag1", "test_tag2", "test_tag3"] + - name: view_model + description: | + View model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + config: + tags: "test_tag" +""" + +_PROPERTIES__UPDATING_VIEW_SCHEMA_YML = """ +version: 2 +models: + - name: table_model + description: | + Table model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + - name: name + description: | + Some stuff here and then a call to + {{ doc('my_fun_doc')}} + config: + tags: ["test_tag1", "test_tag2", "test_tag3"] + - name: view_model + description: "Updated view description!" + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + config: + tags: ["test_tag","new_tag"] +""" + +_PROPERTIES__DELETING_VIEW_SCHEMA_YML = """ +version: 2 +models: + - name: table_model + description: | + Table model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + - name: name + description: | + Some stuff here and then a call to + {{ doc('my_fun_doc')}} + config: + tags: ["test_tag1", "test_tag2", "test_tag3"] + - name: view_model + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + 80% of statistics are made up on the spot + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting +""" + +class TestPersistDocs(BasePersistDocs): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "test": { + "+persist_docs": { + "relation": True + }, + } + } + } + + # This ensures the schema works with our datalake + @pytest.fixture(scope="class") + def unique_schema(self, request, prefix) -> str: + test_file = request.module.__name__ + # We only want the last part of the name + test_file = test_file.split(".")[-1] + unique_schema = f"{BUCKET}.{prefix}_{test_file}" + return unique_schema + + # Override this fixture to set root_path=schema + @pytest.fixture(scope="class") + def dbt_profile_data(self, unique_schema, dbt_profile_target, profiles_config_update): + profile = { + "test": { + "outputs": { + "default": {}, + }, + "target": "default", + }, + } + target = dbt_profile_target + target["schema"] = unique_schema + target["root_path"] = f"{unique_schema}" + profile["test"]["outputs"]["default"] = target + + if profiles_config_update: + profile.update(profiles_config_update) + return profile + + # Overriding this fixture and setting autouse to be False so we are able to perform + # run_dbt accordingly in each of the following tests + @pytest.fixture(scope="class", autouse=False) + def setUp(self, project): + pass + + @pytest.fixture(scope="class") + def client(self, adapter): + credentials = adapter.connections.profile.credentials + parameters = ParametersBuilder.build(credentials) + client = DremioRestClient(parameters.get_parameters()) + + return client + + # Removing unnecessary models and adding schema + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": _MODELS__TABLE, + "view_model.sql": _MODELS__VIEW, + "schema.yml": _PROPERTIES__SCHEMA_YML, + } + + # Removing schema from properties + @pytest.fixture(scope="class") + def properties(self): + return { + "my_fun_docs.md": _DOCS__MY_FUN_DOCS, + } + + def _create_path_list(self, database, schema): + path = [database] + if schema != 'no_schema': + folders = schema.split(".") + path.extend(folders) + return path + + def _get_relation_id(self, project, client, identifier): + client.start() + + # Replacing dbt_test to dbt_test_source if it is table + if identifier == "table_model": + database = "dbt_test_source" + else: + database = project.database + schema = project.test_schema + path = self._create_path_list(database, schema) + + path.append(identifier) + + catalog_info = client.get_catalog_item( + catalog_id=None, + catalog_path=path, + ) + return catalog_info.get("id") + + # Overriding the original test, to be ignored because it was testing + # the original persist_docs behavior, which does not apply anymore + def test_has_comments_pglike(self, project): + pass + + def test_table_model_create_wikis_and_tags(self, project, client): + run_dbt(["run", "--select", "table_model"]) + object_id = self._get_relation_id(project, client, "table_model") + wiki = client.retrieve_wiki(object_id) + tags = client.retrieve_tags(object_id) + self._assert_table_wikis_and_tags(wiki, tags) + + def test_view_model_create_wikis_and_tags(self, project, client): + # Create + Get + run_dbt(["run", "--select", "view_model"]) + object_id = self._get_relation_id(project, client, "view_model") + wiki = client.retrieve_wiki(object_id) + tags = client.retrieve_tags(object_id) + self._assert_view_wikis_and_tags(wiki, tags) + + def test_view_model_wikis_and_tags_remain_when_no_changes(self, project, client): + # No changes in wikis / tags , version should be the same + run_dbt(["run", "--select", "view_model"]) + object_id = self._get_relation_id(project, client, "view_model") + wiki = client.retrieve_wiki(object_id) + tags = client.retrieve_tags(object_id) + self._assert_view_wikis_and_tags(wiki, tags) + + def test_view_model_update_wikis_and_tags(self, project, client): + # Previous tags + object_id = self._get_relation_id(project, client, "view_model") + tags = client.retrieve_tags(object_id) + # Update + write_file(_PROPERTIES__UPDATING_VIEW_SCHEMA_YML, project.project_root, "models", "schema.yml") + run_dbt(["run", "--select", "view_model"]) + object_id = self._get_relation_id(project, client, "view_model") + updated_wiki = client.retrieve_wiki(object_id) + updated_tags = client.retrieve_tags(object_id) + self._assert_view_wikis_and_tags_update(updated_wiki, updated_tags, tags["version"]) + + def test_view_model_delete_wikis_and_tags(self, project, client): + # Previous tags + object_id = self._get_relation_id(project, client, "view_model") + tags = client.retrieve_tags(object_id) + # Delete + write_file(_PROPERTIES__DELETING_VIEW_SCHEMA_YML, project.project_root, "models", "schema.yml") + run_dbt(["run", "--select", "view_model"]) + object_id = self._get_relation_id(project, client, "view_model") + deleted_wiki = client.retrieve_wiki(object_id) + deleted_tags = client.retrieve_tags(object_id) + self._assert_view_wikis_and_tags_delete(deleted_wiki, deleted_tags, tags["version"]) + + def _assert_table_wikis_and_tags(self, wiki, tags): + expected_wiki = """Table model description "with double quotes" +and with 'single quotes' as welll as other; +'''abc123''' +reserved -- characters +80% of statistics are made up on the spot +-- +/* comment */ +Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting +""" + expected_tags = ["test_tag1", "test_tag2", "test_tag3"] + assert wiki.get("text") == expected_wiki and wiki.get("version") == 0 + assert tags.get("tags") == expected_tags + + def _assert_view_wikis_and_tags(self, wiki, tags): + expected_wiki = """View model description "with double quotes" +and with 'single quotes' as welll as other; +'''abc123''' +reserved -- characters +80% of statistics are made up on the spot +-- +/* comment */ +Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting +""" + expected_tags = ["test_tag"] + assert wiki.get("text") == expected_wiki and wiki.get("version") == 0 + assert tags.get("tags") == expected_tags + + def _assert_view_wikis_and_tags_update(self, wiki, tags, previous_tag_version): + expected_wiki = "Updated view description!" + expected_tags = ["test_tag","new_tag"] + assert wiki.get("text") == expected_wiki and wiki.get("version") == 1 + assert tags.get("tags") == expected_tags and tags.get("version") != previous_tag_version + + def _assert_view_wikis_and_tags_delete(self, wiki, tags, previous_tag_version): + assert wiki.get("text") == "" and wiki.get("version") == 2 + assert tags.get("tags") == [] and tags.get("version") != previous_tag_version