From 6214bd79a1cc09bad46d4415309963099dedbbd5 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 14:47:03 +0000 Subject: [PATCH 1/7] fix: formatting --- python-generate-outputs.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python-generate-outputs.yaml b/python-generate-outputs.yaml index e0c550e..701eee6 100644 --- a/python-generate-outputs.yaml +++ b/python-generate-outputs.yaml @@ -1,7 +1,9 @@ id: python-generate-outputs namespace: company.team + variables: file: orders.csv + tasks: - id: analyze_orders type: io.kestra.plugin.scripts.python.Script @@ -93,8 +95,7 @@ tasks: print(f"The script execution took: {processing_time} seconds") extend: - title: Run a Python script and generate outputs, metrics and downloadable file - artifact specified with a variable + title: Run a Python script and generate outputs, metrics and files specified with a variable description: >- This flow generates a CSV file with 100 random orders and then calculates the sum and average of the "total" column. It then reports the results as From 612bc86b54cc3140822467b7067c5db34ec8912b Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 14:52:29 +0000 Subject: [PATCH 2/7] fix: formatting --- aws-batch-terraform-git.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aws-batch-terraform-git.yaml b/aws-batch-terraform-git.yaml index 63fd4be..3b5813c 100644 --- a/aws-batch-terraform-git.yaml +++ b/aws-batch-terraform-git.yaml @@ -1,5 +1,6 @@ id: aws-batch-terraform-git namespace: company.team + inputs: - id: bucket type: STRING @@ -7,6 +8,7 @@ inputs: - id: region type: STRING defaults: us-east-1 + tasks: - id: wdir type: io.kestra.plugin.core.flow.WorkingDirectory @@ -56,6 +58,7 @@ tasks: outputs[key] = value.strip('"') Kestra.outputs(outputs) + - id: parallel_ecs_fargate_tasks type: io.kestra.plugin.core.flow.Parallel tasks: From f06f9c3601150e4abc2f6d25bda3a4193152a346 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 16:01:12 +0000 Subject: [PATCH 3/7] fix: formatting --- python-generate-outputs-simple.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python-generate-outputs-simple.yaml b/python-generate-outputs-simple.yaml index f370bbc..cab083c 100644 --- a/python-generate-outputs-simple.yaml +++ b/python-generate-outputs-simple.yaml @@ -23,7 +23,9 @@ extend: This flow generates outputs in a simple Python script, and then logs them. The flow has two tasks: + 1. Generate outputs using Python script + 2. Log the outputs generated in the prior task tags: - Python From d016ecaf49aa32f95a4b5a422bd39406450d5356 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 16:31:58 +0000 Subject: [PATCH 4/7] fix: tags --- ingest-to-datalake-event-driven.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ingest-to-datalake-event-driven.yaml b/ingest-to-datalake-event-driven.yaml index f92ef24..6da51ae 100644 --- a/ingest-to-datalake-event-driven.yaml +++ b/ingest-to-datalake-event-driven.yaml @@ -1,10 +1,12 @@ id: ingest-to-datalake-event-driven namespace: company.team + variables: source_prefix: inbox destination_prefix: archive database: default bucket: kestraio + tasks: - id: wdir type: io.kestra.plugin.core.flow.WorkingDirectory @@ -50,6 +52,7 @@ tasks: outputLocation: s3://{{ vars.bucket }}/query_results/ query: | OPTIMIZE fruits REWRITE DATA USING BIN_PACK; + triggers: - id: wait_for_new_s3_objects type: io.kestra.plugin.aws.s3.Trigger @@ -82,6 +85,8 @@ extend: tags: - AWS - Iceberg + - S3 + - Trigger ee: false demo: false meta_description: "This workflow ingests data to an S3 data lake using a Python script. " From 5004a269d2b21b31d2d067598901ba07d3ea71a4 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 17:19:31 +0000 Subject: [PATCH 5/7] feat: shell script example --- shell-execute-code.yaml | 71 +++++++++++++++++++++++++++++++++++++++++ shell-scripts.yaml | 2 +- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 shell-execute-code.yaml diff --git a/shell-execute-code.yaml b/shell-execute-code.yaml new file mode 100644 index 0000000..582615e --- /dev/null +++ b/shell-execute-code.yaml @@ -0,0 +1,71 @@ +id: shell-execute-code +namespace: company.team + +inputs: + - id: dataset_url + type: STRING + defaults: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv + +tasks: + - id: download_dataset + type: io.kestra.plugin.core.http.Download + uri: "{{ inputs.dataset_url }}" + + - id: c_code + type: io.kestra.plugin.scripts.shell.Commands + taskRunner: + type: io.kestra.plugin.scripts.runner.docker.Docker + containerImage: gcc:latest + commands: + - gcc example.c + - ./a.out + inputFiles: + orders.csv: "{{ outputs.download_dataset.uri }}" + example.c: | + #include + #include + #include + + int main() { + FILE *file = fopen("orders.csv", "r"); + if (!file) { + printf("Error opening file!\n"); + return 1; + } + + char line[1024]; + double total_revenue = 0.0; + + fgets(line, 1024, file); + while (fgets(line, 1024, file)) { + char *token = strtok(line, ","); + int i = 0; + double total = 0.0; + + while (token) { + if (i == 6) { + total = atof(token); + total_revenue += total; + } + token = strtok(NULL, ","); + i++; + } + } + + fclose(file); + printf("Total Revenue: $%.2f\n", total_revenue); + + return 0; + } +extend: + title: Run C code inside of a Shell environment + description: This flow uses a Shell Command to run C code with an inputFile generated dynamically from a task upstream. + tags: + - CLI + - Inputs + - Software Engineering + - Outputs + ee: false + demo: true + meta_description: This flow executes Shell Shell + Commands to run C code. diff --git a/shell-scripts.yaml b/shell-scripts.yaml index 085cf8b..b0ad121 100644 --- a/shell-scripts.yaml +++ b/shell-scripts.yaml @@ -30,7 +30,7 @@ tasks: - cut -d ',' -f 2 file.csv | head -n 6 extend: title: Run Shell Scripts and Shell commands in a working directory using a - PROCESS runner + Process Task Runner description: This flow sequentially executes Shell Scripts and Shell Commands in the same working directory using a local `Process` Task Runner. tags: From b281606e2c052200e55a9a7936569d4a1f365443 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 17:37:25 +0000 Subject: [PATCH 6/7] feat: s3 trigger for python --- s3-trigger-python.yaml | 61 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 s3-trigger-python.yaml diff --git a/s3-trigger-python.yaml b/s3-trigger-python.yaml new file mode 100644 index 0000000..493e47d --- /dev/null +++ b/s3-trigger-python.yaml @@ -0,0 +1,61 @@ +id: s3-trigger-python +namespace: company.team + +variables: + bucket: s3-bucket + region: eu-west-2 + +tasks: + - id: process_data + type: io.kestra.plugin.scripts.python.Commands + taskRunner: + type: io.kestra.plugin.scripts.runner.docker.Docker + containerImage: ghcr.io/kestra-io/kestrapy:latest + namespaceFiles: + enabled: true + inputFiles: + input.csv: "{{ read(trigger.objects[0].uri) }}" + outputFiles: + - data.csv + commands: + - python process_data.py + +triggers: + - id: watch + type: io.kestra.plugin.aws.s3.Trigger + interval: "PT1S" + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "{{ vars.region }}" + bucket: "{{ vars.bucket }}" + filter: FILES + action: MOVE + moveTo: + key: archive/ + maxKeys: 1 +extend: + title: "Detect New Files in S3 and process them in Python" + description: >- + This flow will be triggered whenever a new file is + detected in the specified S3 bucket. It will download the file into the + internal storage and move the S3 object to an `archive` folder (i.e. S3 + object prefix with the name `archive`). + + The Python code will read the file as an `inputFile` called `input.csv` + and processing it to generate a new file called `data.csv`. + + + It's recommended to set the `accessKeyId` and `secretKeyId` properties as + secrets. + + + This flow assumes AWS credentials stored as secrets `AWS_ACCESS_KEY_ID`, + `AWS_SECRET_ACCESS_KEY`. + tags: + - S3 + - Trigger + ee: false + demo: false + meta_description: This flow triggers on new file detection in a specified S3 + bucket. It downloads files to internal storage and moves objects to an + 'archive' folder. The file is processed by a Python script. From 2b1aedfb7a94b2613ec486650262fe949e16748b Mon Sep 17 00:00:00 2001 From: Will Russell Date: Wed, 6 Nov 2024 17:38:32 +0000 Subject: [PATCH 7/7] fix: add tag --- s3-trigger-python.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/s3-trigger-python.yaml b/s3-trigger-python.yaml index 493e47d..a099e99 100644 --- a/s3-trigger-python.yaml +++ b/s3-trigger-python.yaml @@ -54,6 +54,7 @@ extend: tags: - S3 - Trigger + - Python ee: false demo: false meta_description: This flow triggers on new file detection in a specified S3