Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
feat: execute_sql magic function (#340)
Browse files Browse the repository at this point in the history
* Provide `execute_sql` function

* Parse jinja in execute_sql

* Pass runtime config into execute_sql

* execute_sql integration tests

* Test execute sql on with dbt v > 1.0.0

* Check dbt version before running execute_sql

* Pr comments

* Add hack comment [skip ci]
  • Loading branch information
mederka authored Jun 3, 2022
1 parent 043a3aa commit e7a8768
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 12 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ jobs:
working-directory: integration_tests
env:
FAL_STATS_ENABLED: false
run: behave
run: |
if [[ '${{ matrix.dbt }}' =~ ^0.*$ ]]
then
behave --tags=-dbtv1
else
behave
fi
16 changes: 16 additions & 0 deletions integration_tests/features/execute_sql_function.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@dbtv1
Feature: `execute_sql` function
Background: Project Setup
Given the project 009_execute_sql_function

Scenario: Use execute_sql function
When the following command is invoked:
"""
fal flow run --profiles-dir $profilesDir --project-dir $baseDir --experimental-flow
"""
Then the following models are calculated:
| execute_sql_model_one | execute_sql_model_two |
And the following scripts are ran:
| execute_sql_model_one.query_other_model.py |
And the script execute_sql_model_one.query_other_model.py output file has the lines:
| Model dataframe information: | RangeIndex: 1 entries, 0 to 0 |
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: "fal_test"
version: "1.0.0"
config-version: 2
profile: "fal_test"
source-paths: ["models"]
data-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "{{ env_var('temp_dir') }}/target"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
import io
import os
from functools import reduce

df: pd.DataFrame = execute_sql('SELECT * FROM {{ ref("execute_sql_model_one")}}')

buf = io.StringIO()
df.info(buf=buf, memory_usage=False)
info = buf.getvalue()
output = f"\nModel dataframe information:\n{info}"
temp_dir = os.environ["temp_dir"]
write_dir = open(
reduce(
os.path.join, [temp_dir, context.current_model.name + ".query_other_model.txt"]
),
"w",
)
write_dir.write(output)
write_dir.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{{ config(materialized='table') }}
-- {{ ref("execute_sql_model_two") }}

WITH data AS (
SELECT
'some text' AS my_text
)

SELECT *
FROM data
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config(materialized='table') }}

WITH data AS (
SELECT
cast(1 AS integer) AS my_int
)

SELECT *
FROM data
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: 2

sources:
- name: results
database: "{{ env_var('DBT_DATABASE', 'test') }}"
schema: "{{ env_var('DBT_SCHEMA', 'dbt_fal') }}"
tables:
- name: some_source

models:
- name: execute_sql_model_one
meta:
fal:
scripts:
after:
- fal_scripts/query_other_model.py
- name: execute_sql_model_two
1 change: 1 addition & 0 deletions src/fal/fal_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def exec(self, faldbt: FalDbt):
"list_sources": faldbt.list_sources,
"list_features": faldbt.list_features,
"el": faldbt.el,
"execute_sql": faldbt.execute_sql,
}

if self.model is not None:
Expand Down
36 changes: 25 additions & 11 deletions src/faldbt/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dbt.adapters.base import BaseRelation
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.parsed import ParsedModelNode
from dbt.config import RuntimeConfig

from . import parse

Expand All @@ -29,11 +30,12 @@

DBT_V1 = dbt.semver.VersionSpecifier.from_version_string("1.0.0")
DBT_VCURRENT = dbt.version.get_installed_version()
IS_DBT_V0 = not DBT_VCURRENT.compare(DBT_V1) >= 0

if DBT_VCURRENT.compare(DBT_V1) >= 0:
from dbt.contracts.sql import ResultTable, RemoteRunResult
else:
if IS_DBT_V0:
from faldbt.cp.contracts.sql import ResultTable, RemoteRunResult
else:
from dbt.contracts.sql import ResultTable, RemoteRunResult


@dataclass
Expand Down Expand Up @@ -67,20 +69,28 @@ def initialize_dbt_flags(profiles_dir: str):

# NOTE: Once we get an adapter, we must call `connection_for` or `connection_named` to use it
def _get_adapter(
project_dir: str, profiles_dir: str, profile_target: str
project_dir: str,
profiles_dir: str,
profile_target: str,
config: RuntimeConfig = None,
) -> SQLAdapter:
config = parse.get_dbt_config(
project_dir, profiles_dir, profile_target=profile_target
)
if config is None:
config = parse.get_dbt_config(
project_dir, profiles_dir, profile_target=profile_target
)
adapter: SQLAdapter = adapters_factory.get_adapter(config) # type: ignore

return adapter


def _execute_sql(
project_dir: str, profiles_dir: str, sql: str, profile_target: str = None
project_dir: str,
profiles_dir: str,
sql: str,
profile_target: str = None,
config: RuntimeConfig = None,
) -> Tuple[AdapterResponse, RemoteRunResult]:
adapter = _get_adapter(project_dir, profiles_dir, profile_target)
adapter = _get_adapter(project_dir, profiles_dir, profile_target, config)

# HACK: we need to include uniqueness (UUID4) to avoid clashes
name = "SQL:" + str(hash(sql)) + ":" + str(uuid4())
Expand Down Expand Up @@ -140,10 +150,14 @@ def _get_target_relation(


def execute_sql(
project_dir: str, profiles_dir: str, sql: str, profile_target: str = None
project_dir: str,
profiles_dir: str,
sql: str,
profile_target: str = None,
config: RuntimeConfig = None,
) -> RemoteRunResult:
_, result = _execute_sql(
project_dir, profiles_dir, sql, profile_target=profile_target
project_dir, profiles_dir, sql, profile_target=profile_target, config=config
)
return result

Expand Down
30 changes: 30 additions & 0 deletions src/faldbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,36 @@ def _lazy_setup_firestore(self):
"Could not find acceptable Default GCP Application credentials"
)

@telemetry.log_call("execute_sql")
def execute_sql(self, sql: str) -> pd.DataFrame:
"""Execute a sql query."""
if lib.IS_DBT_V0:
raise NotImplementedError(
"execute_sql only supported in dbt version >= 1.0.0"
)

from dbt.lib import compile_sql

compiled = compile_sql(self._manifest.nativeManifest, self.project_dir, sql)

# Hack: we need to pass config in because of weird behavior of execute_sql when
# ran from GitHub Actions. For some reason, it can not find the right profile.
# Haven't been able to reproduce this behavior locally and therefore developed
# this workaround.
query_result = lib.execute_sql(
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
sql=compiled.compiled_sql,
profile_target=self._profile_target,
config=self._config,
)

return pd.DataFrame.from_records(
query_result.table.rows,
columns=query_result.table.column_names,
coerce_float=True,
)


def _firestore_dict_to_document(data: Dict, key_column: str):
output = {}
Expand Down

0 comments on commit e7a8768

Please sign in to comment.