Skip to content

Commit

Permalink
Merge pull request #80 from wagov/dbt-duckdb-plugin
Browse files Browse the repository at this point in the history
Dbt duckdb plugin
  • Loading branch information
adonm authored Jun 12, 2024
2 parents a6fe610 + b0b52e5 commit 8708a9f
Show file tree
Hide file tree
Showing 21 changed files with 281 additions and 188 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ and workstations for sensitive activities).
*Note: If you create/use a Github Codespace on any of the wagov repos,
SQU_CONFIG should be configured automatically.*

Before using, config needs to be loaded into `squ.core.cache`, which can
be done automatically from json in a keyvault by setting the env var
`SQU_CONFIG` to `"keyvault/tenantid"`.
Before using, config needs to be loaded into `nbdev_squ.core.cache`,
which can be done automatically from json in a keyvault by setting the
env var `SQU_CONFIG` to `"keyvault/tenantid"`.

``` bash
export SQU_CONFIG="{{ keyvault }}/{{ tenantid }}"
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ npm ci
npm run build
pip install build==1.2.1
python -m build
# To release to pypi run below after a build
# To release to pypi run below after a build. Make sure to bump version with nbdev_bump_version.
# twine upload --skip-existing dist/*
4 changes: 1 addition & 3 deletions dbt_example_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,5 @@ clean-targets: # directories to be removed by `dbt clean`
models:
dbt_example_project:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
squ:
+materialized: table
+materialized: view
27 changes: 0 additions & 27 deletions dbt_example_project/models/example/my_first_dbt_model.sql

This file was deleted.

6 changes: 0 additions & 6 deletions dbt_example_project/models/example/my_second_dbt_model.sql

This file was deleted.

21 changes: 0 additions & 21 deletions dbt_example_project/models/example/schema.yml

This file was deleted.

7 changes: 7 additions & 0 deletions dbt_example_project/models/squ/T1547_001.kql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
let selection= dynamic(['reg',' ADD', @'Software\Microsoft\Windows\CurrentVersion\Run']);
let filter_known = dynamic(['Discord.exe','Skype.exe','LiveChat.exe','Promethean Desktop.exe']);
DeviceProcessEvents
| where ActionType == "ProcessCreated"
| where ProcessCommandLine has_all (selection)
| where InitiatingProcessFileName !in (filter_known) //Known False-Positive
| where ProcessCommandLine !contains "PaperCut"
12 changes: 0 additions & 12 deletions dbt_example_project/models/squ/config.yml

This file was deleted.

1 change: 1 addition & 0 deletions dbt_example_project/models/squ/hunt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{source('kql_source', 'T1547_001')}}
15 changes: 15 additions & 0 deletions dbt_example_project/models/squ/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: 2

sources:
- name: kql_source
config:
plugin: squ
meta:
kql_path: "models/squ/{name}.kql"
tables:
- name: T1547_001

models:
- name: hunt
config:
materialized: table
4 changes: 0 additions & 4 deletions dbt_example_project/models/squ/squ_python_model.py

This file was deleted.

11 changes: 4 additions & 7 deletions dbt_example_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ default:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1

prod:
type: duckdb
path: prod.duckdb
threads: 4
path: target/dev.duckdb
plugins:
- module: nbdev_squ.api
alias: squ

target: dev
1 change: 0 additions & 1 deletion install.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# This is meant to be run to setup and prep for committing a release
# use nbdev_bump_version to increment the version itself then rerun this to update README.md _docs etc
from subprocess import run
import configparser, re

run(["pip", "install", "nbdev"])
run(["pip", "install", "-e", "."]) # get current project in dev mode
Expand Down
2 changes: 1 addition & 1 deletion nbdev_squ/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.3.5"
__version__ = "1.3.6"
6 changes: 6 additions & 0 deletions nbdev_squ/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
'nbdev_squ.api.Clients.jira': ('api.html#clients.jira', 'nbdev_squ/api.py'),
'nbdev_squ.api.Clients.runzero': ('api.html#clients.runzero', 'nbdev_squ/api.py'),
'nbdev_squ.api.Clients.tio': ('api.html#clients.tio', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin': ('api.html#plugin', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.configure_cursor': ('api.html#plugin.configure_cursor', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.default_materialization': ( 'api.html#plugin.default_materialization',
'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.initialize': ('api.html#plugin.initialize', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.load': ('api.html#plugin.load', 'nbdev_squ/api.py'),
'nbdev_squ.api.atlaskit_transformer': ('api.html#atlaskit_transformer', 'nbdev_squ/api.py'),
'nbdev_squ.api.chunks': ('api.html#chunks', 'nbdev_squ/api.py'),
'nbdev_squ.api.finalise_query': ('api.html#finalise_query', 'nbdev_squ/api.py'),
Expand Down
34 changes: 33 additions & 1 deletion nbdev_squ/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
# %% auto 0
__all__ = ['logger', 'clients', 'columns_of_interest', 'columns', 'Clients', 'list_workspaces_safe', 'list_workspaces',
'list_subscriptions', 'list_securityinsights_safe', 'list_securityinsights', 'chunks', 'loganalytics_query',
'query_all', 'finalise_query', 'hunt', 'atlaskit_transformer', 'security_incidents', 'security_alerts']
'query_all', 'finalise_query', 'hunt', 'atlaskit_transformer', 'security_incidents', 'security_alerts',
'Plugin']

# %% ../nbs/01_api.ipynb 3
import pandas, json, logging, time, requests, io, pkgutil, httpx_cache
from .core import *
from pathlib import Path
from diskcache import memoize_stampede
from importlib.metadata import version
from subprocess import run, CalledProcessError
Expand All @@ -17,6 +19,7 @@
from functools import cached_property
from atlassian import Jira
from tenable.io import TenableIO
from dbt.adapters.duckdb.plugins import BasePlugin, SourceConfig

# %% ../nbs/01_api.ipynb 5
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -268,3 +271,32 @@ def security_alerts(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("
# Sorts by TimeGenerated (TODO)
query = "SecurityAlert | summarize arg_max(TimeGenerated, *) by SystemAlertId"
return query_all(query, timespan=(start.to_pydatetime(), timedelta))

# %% ../nbs/01_api.ipynb 28
class Plugin(BasePlugin):
def initialize(self, config):
login()

def configure_cursor(self, cursor):
pass

def load(self, source_config: SourceConfig):
if "kql_path" in source_config:
kql_path = source_config["kql_path"]
kql_path = kql_path.format(**source_config.as_dict())
query = Path(kql_path).read_text()
return query_all(query, timespan=pandas.Timedelta(source_config.get("timespan", "14d")))
raise Exception("huh")
elif "list_workspaces" in source_config: # untested
return list_workspaces()
elif "client_api" in source_config: # untested
api_result = getattr(clients, source_config["client_api"])(**json.loads(source_config.get("kwargs", "{}")))
if isinstance(api_result, pandas.DataFrame):
return api_result
else:
return pandas.DataFrame(api_result)
else:
raise Exception("No valid config found for squ plugin (kql_path or api required)")

def default_materialization(self):
return "view"
110 changes: 109 additions & 1 deletion nbs/01_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"#| export\n",
"import pandas, json, logging, time, requests, io, pkgutil, httpx_cache\n",
"from nbdev_squ.core import *\n",
"from pathlib import Path\n",
"from diskcache import memoize_stampede\n",
"from importlib.metadata import version\n",
"from subprocess import run, CalledProcessError\n",
Expand All @@ -45,7 +46,8 @@
"from benedict import benedict\n",
"from functools import cached_property\n",
"from atlassian import Jira\n",
"from tenable.io import TenableIO"
"from tenable.io import TenableIO\n",
"from dbt.adapters.duckdb.plugins import BasePlugin, SourceConfig"
]
},
{
Expand Down Expand Up @@ -499,6 +501,112 @@
"df = security_incidents(start=pandas.Timestamp(\"now\", tz=\"UTC\") - pandas.Timedelta(\"14d\"), timedelta=pandas.Timedelta(\"14d\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## dbt-duckdb plugin\n",
"\n",
"The below squ plugin makes querying kql in duckdb projects via the [DuckDB user-defined function (UDF)](https://duckdb.org/docs/api/python/function.html) interface much easier. This could be extended to other clients pretty easily, just have to make sure data is returned as a dataframe. To use it there are a few dbt project files that need to be configured:\n",
"\n",
"### DBT ./profiles.yml\n",
"\n",
"See [DBT Connection Profiles](https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles)\n",
"\n",
"```yaml\n",
"default:\n",
" outputs:\n",
" dev:\n",
" type: duckdb\n",
" path: target/dev.duckdb\n",
" plugins: \n",
" - module: nbdev_squ.api\n",
" alias: squ\n",
"\n",
" target: dev\n",
"```\n",
"\n",
"### DBT ./models/squ/schema.yml\n",
"\n",
"See [DBT Add sources to your DAG](https://docs.getdbt.com/docs/build/sources) for how to add 'externally defined' sources, this is using the code below based on the [dbt-duckdb plugin](https://github.com/duckdb/dbt-duckdb?tab=readme-ov-file#writing-your-own-plugins) architecture\n",
"\n",
"```yaml\n",
"version: 2\n",
"\n",
"sources:\n",
" - name: kql_source\n",
" config:\n",
" plugin: squ\n",
" meta:\n",
" kql_path: \"models/squ/{name}.kql\"\n",
" tables:\n",
" - name: T1547_001\n",
"\n",
"models:\n",
" - name: hunt\n",
" config:\n",
" materialized: table\n",
"```\n",
"\n",
"Once the source is defined, dbt cli tools and other sql models can refer to it, the dbt-duckdb framework makes it available as a referencable view usable throughout the project:\n",
"\n",
"#### DBT ./models/squ/hunt.sql\n",
"\n",
"See [DBT SQL models](https://docs.getdbt.com/docs/build/sql-models) for how to write the select statement templates DBT organises into the DAG\n",
"\n",
"```sql\n",
"select * from {{source('kql_source', 'T1547_001')}}\n",
"```\n",
"\n",
"#### DBT cli usage\n",
"\n",
"See [About the dbt run command](https://docs.getdbt.com/reference/commands/run) (can use `--empty` to validate before a full run)\n",
"\n",
"```bash\n",
"cd dbt_example_project\n",
"dbt run # will build the whole dag including any materialisations like the hunt table above\n",
"dbt show --inline \"select * from {{ source('kql_source', 'T1547_001') }}\" # will use live source\n",
"dbt show --inline \"select * from {{ ref('hunt') }}\" # will use materialized table in db built by `dbt run`\n",
"dbt docs generate # will build documentation for the project\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| exports\n",
"class Plugin(BasePlugin):\n",
" def initialize(self, config):\n",
" login()\n",
"\n",
" def configure_cursor(self, cursor):\n",
" pass\n",
"\n",
" def load(self, source_config: SourceConfig):\n",
" if \"kql_path\" in source_config:\n",
" kql_path = source_config[\"kql_path\"]\n",
" kql_path = kql_path.format(**source_config.as_dict())\n",
" query = Path(kql_path).read_text()\n",
" return query_all(query, timespan=pandas.Timedelta(source_config.get(\"timespan\", \"14d\")))\n",
" raise Exception(\"huh\")\n",
" elif \"list_workspaces\" in source_config: # untested\n",
" return list_workspaces()\n",
" elif \"client_api\" in source_config: # untested\n",
" api_result = getattr(clients, source_config[\"client_api\"])(**json.loads(source_config.get(\"kwargs\", \"{}\")))\n",
" if isinstance(api_result, pandas.DataFrame):\n",
" return api_result\n",
" else:\n",
" return pandas.DataFrame(api_result)\n",
" else:\n",
" raise Exception(\"No valid config found for squ plugin (kql_path or api required)\")\n",
"\n",
" def default_materialization(self):\n",
" return \"view\""
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
2 changes: 1 addition & 1 deletion nbs/index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"source": [
"*Note: If you create/use a Github Codespace on any of the wagov repos, SQU_CONFIG should be configured automatically.*\n",
"\n",
"Before using, config needs to be loaded into `squ.core.cache`, which can be done automatically from json in a keyvault by setting the env var `SQU_CONFIG` to `\"keyvault/tenantid\"`.\n",
"Before using, config needs to be loaded into `nbdev_squ.core.cache`, which can be done automatically from json in a keyvault by setting the env var `SQU_CONFIG` to `\"keyvault/tenantid\"`.\n",
"\n",
"```bash\n",
"export SQU_CONFIG=\"{{ keyvault }}/{{ tenantid }}\"\n",
Expand Down
Loading

0 comments on commit 8708a9f

Please sign in to comment.