From 85b21e5859e382b43649496d6bce652fabe968f1 Mon Sep 17 00:00:00 2001 From: Benoit Date: Thu, 23 Jan 2025 17:02:12 +0100 Subject: [PATCH] [feature] add dlt to duckdb sanity check flow --- sanitychecks/flows/dlt_duckdb.yaml | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 sanitychecks/flows/dlt_duckdb.yaml diff --git a/sanitychecks/flows/dlt_duckdb.yaml b/sanitychecks/flows/dlt_duckdb.yaml new file mode 100644 index 0000000..079d217 --- /dev/null +++ b/sanitychecks/flows/dlt_duckdb.yaml @@ -0,0 +1,60 @@ +id: dlt_duckdb +namespace: qa + +tasks: + + - id: write + type: io.kestra.plugin.core.storage.Write + content: | + import dlt + import requests + + pipeline = dlt.pipeline( + pipeline_name='product_pipeline', + destination='duckdb', + dataset_name='product_data' + ) + data = [] + for product in list(range(1, 50)): + response = requests.get(f'https://dummyjson.com/products/{product}') + response.raise_for_status() + data.append(response.json()) + # Extract, normalize, and load the data + pipeline.run(data, table_name='product') + extension: .py + + - id: upload + type: io.kestra.plugin.core.namespace.UploadFiles + filesMap: + ingest.py: "{{ outputs.write.uri }}" + namespace: "{{ flow.namespace }}" + + - id: product_api_to_duckdb + type: io.kestra.plugin.scripts.python.Commands + containerImage: python:3.10.16-slim + namespaceFiles: + enabled: true + beforeCommands: + - pip install dlt[duckdb] + warningOnStdErr: false + commands: + - python ingest.py + - ls -altr + outputFiles: + - product_pipeline.duckdb + + - id: duckdb + type: io.kestra.plugin.jdbc.duckdb.Query + inputFiles: + data.duckdb: "{{ outputs.product_api_to_duckdb.outputFiles['product_pipeline.duckdb']}}" + sql: | + ATTACH '{{workingDir}}/data.duckdb' AS data; + SELECT category, SUM(price) AS sum_price FROM data.product_data.product GROUP BY 1; + fetchType: STORE + + - id: assert + type: io.kestra.plugin.core.execution.Assert + conditions: + - "{{ fromIon(read(outputs.duckdb['uri'])).category == 'home-decoration' }}" + - "{{ fromIon(read(outputs.duckdb['uri'])).sum_price == 194.95000000000002 }}" +