Skip to content

Commit

Permalink
Wikis and tags feature implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
99Lys committed Jan 9, 2025
1 parent 8ad45ec commit 47f998f
Show file tree
Hide file tree
Showing 8 changed files with 553 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Changes

- Added [DremioRestClient](dbt/adapters/dremio/api/rest/client.py) to isolate all Dremio API calls inside one class
<<<<<<< HEAD
- [#256](https://github.com/dremio/dbt-dremio/pull/256) Reflections are now handled through the Rest API
- Non-admin users are now able to use reflections
- It is now possible to set a custom name for reflections
Expand All @@ -15,6 +16,9 @@
- Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not
- `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio)
- Removing duplicated macros array_append, array_concat as Dremio already has SQL functions analogues.
=======

>>>>>>> 2a84e48 (Wikis and tags feature implementation)
## Dependency

- [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0
Expand Down
75 changes: 75 additions & 0 deletions dbt/adapters/dremio/api/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.



import requests

from dbt.adapters.dremio.api.authentication import DremioPatAuthentication
Expand Down Expand Up @@ -132,6 +133,79 @@ def delete_catalog(self, cid):
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

# dbt docs integration within Dremio wikis and tags
def create_wiki(self, object_id: str, text: str):
url = UrlBuilder.wikis_management_url(self._parameters, object_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"text": text},
ssl_verify=self._parameters.authentication.verify_ssl,
)

def retrieve_wiki(self, object_id: str):
url = UrlBuilder.wikis_management_url(self._parameters, object_id)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def update_wiki(self, object_id: str, text: str, version: int):
url = UrlBuilder.wikis_management_url(self._parameters, object_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"text": text, "version": version},
ssl_verify=self._parameters.authentication.verify_ssl,
)

def delete_wiki(self, object_id: str, version: int):
url = UrlBuilder.wikis_management_url(self._parameters, object_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"text": "", "version": version},
ssl_verify=self._parameters.authentication.verify_ssl,
)


def create_tags(self, dataset_id: str, tags: list[str]):
url = UrlBuilder.tags_management_url(self._parameters, dataset_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"tags": tags},
ssl_verify=self._parameters.authentication.verify_ssl,
)

def retrieve_tags(self, dataset_id: str):
url = UrlBuilder.tags_management_url(self._parameters, dataset_id)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def update_tags(self, dataset_id: str, tags: list[str], version: str):
url = UrlBuilder.tags_management_url(self._parameters, dataset_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"tags": tags, "version": version},
ssl_verify=self._parameters.authentication.verify_ssl,
)

def delete_tags(self, dataset_id: str, version: str):
url = UrlBuilder.tags_management_url(self._parameters, dataset_id)
return _post(
url,
self._parameters.authentication.get_headers(),
json={"tags": [], "version": version},
ssl_verify=self._parameters.authentication.verify_ssl,
)


def get_reflections(self, dataset_id):
url = UrlBuilder.get_reflection_url(self._parameters, dataset_id)
Expand All @@ -158,3 +232,4 @@ def update_reflection(self, reflection_id, payload):
json=payload,
ssl_verify=self._parameters.authentication.verify_ssl,
)

13 changes: 13 additions & 0 deletions dbt/adapters/dremio/api/rest/url_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class UrlBuilder:
SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog"
CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog"

DREMIO_WIKIS_ENDPOINT = "/collaboration/wiki"

DREMIO_TAGS_ENDPOINT = "/collaboration/tag"

SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection"
CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection"

Expand Down Expand Up @@ -145,6 +149,15 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list):
joined_path_str = "/".join(quoted_path_list).replace('"', "")
endpoint = f"/by-path/{joined_path_str}"
return url_path + endpoint

# dbt docs integration within Dremio wikis and tags
@classmethod
def wikis_management_url(cls, parameters: Parameters, object_id: str) -> str:
return cls.catalog_url(parameters) + f"/{object_id}{UrlBuilder.DREMIO_WIKIS_ENDPOINT}"

@classmethod
def tags_management_url(cls, parameters: Parameters, dataset_id: str) -> str:
return cls.catalog_url(parameters) + f"/{dataset_id}{UrlBuilder.DREMIO_TAGS_ENDPOINT}"

@classmethod
def create_reflection_url(cls, parameters: Parameters):
Expand Down
82 changes: 78 additions & 4 deletions dbt/adapters/dremio/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import agate
from typing import Tuple, Optional, List
from typing import Tuple, Optional, Union, List
from contextlib import contextmanager

from dbt.adapters.dremio.api.cursor import DremioCursor
Expand Down Expand Up @@ -48,13 +48,14 @@

logger = AdapterLogger("dremio")


class DremioConnectionManager(SQLConnectionManager):
TYPE = "dremio"
DEFAULT_CONNECTION_RETRIES = 5

retries = DEFAULT_CONNECTION_RETRIES

run = True

@contextmanager
def exception_handler(self, sql):
try:
Expand Down Expand Up @@ -132,8 +133,8 @@ def add_commit_query(self):

# Auto_begin may not be relevant with the rest_api
def add_query(
self, sql, auto_begin=True, bindings=None, abridge_sql_log=False,
fetch=False
self, sql, auto_begin=True, bindings=None, abridge_sql_log=False,
fetch=False
):
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
Expand Down Expand Up @@ -231,6 +232,79 @@ def create_catalog(self, relation):
logger.debug(f"Creating folder(s): {database}.{schema}")
self._create_folders(database, schema, rest_client)
return

# dbt docs integration with Dremio wikis and tags
def docs_integration_with_wikis(self, relation, text: str):
logger.debug("Integrating wikis")
thread_connection = self.get_thread_connection()
connection = self.open(thread_connection)
rest_client = connection.handle.get_client()
database = relation.database
schema = relation.schema

path = self._create_path_list(database,schema)
identifier = relation.identifier
path.append(identifier)
try:
catalog_info = rest_client.get_catalog_item(
catalog_id=None,
catalog_path=path,
)
except DremioNotFoundException:
logger.debug("Catalog not found. Returning")
return

object_id = catalog_info.get("id")
stored_wiki = rest_client.retrieve_wiki(object_id)
wiki_content = stored_wiki.get("text")
wiki_version = stored_wiki.get("version", None)

if wiki_version == None:
logger.debug(f"Creating wiki for {'.'.join(path)}")
logger.debug(rest_client.create_wiki(object_id, text))
elif wiki_content != text:
if text == "": # text is empty, delete wiki
logger.debug(f"Deleting wiki for {'.'.join(path)}")
logger.debug(rest_client.delete_wiki(object_id, wiki_version))
else:
logger.debug(f"Updating wiki for {'.'.join(path)}")
logger.debug(rest_client.update_wiki(object_id, text, wiki_version))

def docs_integration_with_tags(self, relation, tags: Union[str,list[str]]):
logger.debug("Integrating tags")
thread_connection = self.get_thread_connection()
connection = self.open(thread_connection)
rest_client = connection.handle.get_client()
database = relation.database
schema = relation.schema

path = self._create_path_list(database,schema)
identifier = relation.identifier
path.append(identifier)
try:
catalog_info = rest_client.get_catalog_item(
catalog_id=None,
catalog_path=path,
)
except DremioNotFoundException:
logger.debug("Catalog not found. Returning")
return

object_id = catalog_info.get("id")
stored_tags = rest_client.retrieve_tags(object_id)
tags_list = stored_tags.get("tags")
tags_version = stored_tags.get("version", None)

if tags_version == None:
logger.debug(f"Creating tags for {'.'.join(path)}")
logger.debug(rest_client.create_tags(object_id, tags))
elif tags_list != tags:
if tags == []: # tags is empty, delete tags
logger.debug(f"Deleting tags for {'.'.join(path)}")
logger.debug(rest_client.delete_tags(object_id, tags_version))
else:
logger.debug(f"Updating tags for {'.'.join(path)}")
logger.debug(rest_client.update_tags(object_id, tags, tags_version))

def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelation, display: List[str],
dimensions: List[str],
Expand Down
9 changes: 9 additions & 0 deletions dbt/adapters/dremio/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

# dbt docs integration with Dremio wikis and tags
@available
def docs_integration_with_wikis(self, relation: DremioRelation, text: str) -> None:
self.connections.docs_integration_with_wikis(relation, text)

@available
def docs_integration_with_tags(self, relation: DremioRelation, tags: list[str]) -> None:
self.connections.docs_integration_with_tags(relation, tags)

@available
def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str],
date_dimensions: List[str], measures: List[str], computations: List[str],
Expand Down
20 changes: 20 additions & 0 deletions dbt/include/dremio/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- {% macro persist_docs(relation, model, for_relation=true, for_columns=true) -%}
-- {{ return(adapter.dispatch('persist_docs', 'dbt')(relation, model, for_relation, for_columns)) }}
-- {% endmacro %}

-- {% macro default__persist_docs(relation, model, for_relation, for_columns) -%}
-- {% if for_relation and config.persist_relation_docs() and model.description %}
-- {% do run_query(alter_relation_comment(relation, model.description)) %}
-- {% endif %}

-- {% if for_columns and config.persist_column_docs() and model.columns %}
-- {% do run_query(alter_column_comment(relation, model.columns)) %}
-- {% endif %}
-- {% endmacro %}

{% macro dremio__persist_docs(relation, model, for_relation, for_columns) -%}
{% if for_relation and config.persist_relation_docs() %}
{% do adapter.docs_integration_with_wikis(relation, model.description) %}
{% do adapter.docs_integration_with_tags(relation, model.tags) %}
{% endif %}
{% endmacro %}
2 changes: 2 additions & 0 deletions dbt/include/dremio/macros/materializations/view/view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ limitations under the License.*/

{{ enable_default_reflection() }}

{% do persist_docs(target_relation, model) %}

{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ run_hooks(post_hooks) }}
Expand Down
Loading

0 comments on commit 47f998f

Please sign in to comment.