Skip to content

Commit

Permalink
[feature] add dlt to duckdb sanity check flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben8t committed Jan 23, 2025
1 parent 0ec721f commit 85b21e5
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions sanitychecks/flows/dlt_duckdb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
id: dlt_duckdb
namespace: qa

tasks:

- id: write
type: io.kestra.plugin.core.storage.Write
content: |
import dlt
import requests
pipeline = dlt.pipeline(
pipeline_name='product_pipeline',
destination='duckdb',
dataset_name='product_data'
)
data = []
for product in list(range(1, 50)):
response = requests.get(f'https://dummyjson.com/products/{product}')
response.raise_for_status()
data.append(response.json())
# Extract, normalize, and load the data
pipeline.run(data, table_name='product')
extension: .py

- id: upload
type: io.kestra.plugin.core.namespace.UploadFiles
filesMap:
ingest.py: "{{ outputs.write.uri }}"
namespace: "{{ flow.namespace }}"

- id: product_api_to_duckdb
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.10.16-slim
namespaceFiles:
enabled: true
beforeCommands:
- pip install dlt[duckdb]
warningOnStdErr: false
commands:
- python ingest.py
- ls -altr
outputFiles:
- product_pipeline.duckdb

- id: duckdb
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
data.duckdb: "{{ outputs.product_api_to_duckdb.outputFiles['product_pipeline.duckdb']}}"
sql: |
ATTACH '{{workingDir}}/data.duckdb' AS data;
SELECT category, SUM(price) AS sum_price FROM data.product_data.product GROUP BY 1;
fetchType: STORE

- id: assert
type: io.kestra.plugin.core.execution.Assert
conditions:
- "{{ fromIon(read(outputs.duckdb['uri'])).category == 'home-decoration' }}"
- "{{ fromIon(read(outputs.duckdb['uri'])).sum_price == 194.95000000000002 }}"

0 comments on commit 85b21e5

Please sign in to comment.