Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update develop #10

Merged
merged 7 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws-batch-terraform-git.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
id: aws-batch-terraform-git
namespace: company.team

inputs:
- id: bucket
type: STRING
defaults: kestra-us
- id: region
type: STRING
defaults: us-east-1

tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
Expand Down Expand Up @@ -56,6 +58,7 @@ tasks:
outputs[key] = value.strip('"')

Kestra.outputs(outputs)

- id: parallel_ecs_fargate_tasks
type: io.kestra.plugin.core.flow.Parallel
tasks:
Expand Down
5 changes: 5 additions & 0 deletions ingest-to-datalake-event-driven.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
id: ingest-to-datalake-event-driven
namespace: company.team

variables:
source_prefix: inbox
destination_prefix: archive
database: default
bucket: kestraio

tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
Expand Down Expand Up @@ -50,6 +52,7 @@ tasks:
outputLocation: s3://{{ vars.bucket }}/query_results/
query: |
OPTIMIZE fruits REWRITE DATA USING BIN_PACK;

triggers:
- id: wait_for_new_s3_objects
type: io.kestra.plugin.aws.s3.Trigger
Expand Down Expand Up @@ -82,6 +85,8 @@ extend:
tags:
- AWS
- Iceberg
- S3
- Trigger
ee: false
demo: false
meta_description: "This workflow ingests data to an S3 data lake using a Python script. "
2 changes: 2 additions & 0 deletions python-generate-outputs-simple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ extend:
This flow generates outputs in a simple Python script, and then logs them.

The flow has two tasks:

1. Generate outputs using Python script

2. Log the outputs generated in the prior task
tags:
- Python
Expand Down
5 changes: 3 additions & 2 deletions python-generate-outputs.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
id: python-generate-outputs
namespace: company.team

variables:
file: orders.csv

tasks:
- id: analyze_orders
type: io.kestra.plugin.scripts.python.Script
Expand Down Expand Up @@ -93,8 +95,7 @@ tasks:

print(f"The script execution took: {processing_time} seconds")
extend:
title: Run a Python script and generate outputs, metrics and downloadable file
artifact specified with a variable
title: Run a Python script and generate outputs, metrics and files specified with a variable
description: >-
This flow generates a CSV file with 100 random orders and then calculates
the sum and average of the "total" column. It then reports the results as
Expand Down
62 changes: 62 additions & 0 deletions s3-trigger-python.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
id: s3-trigger-python
namespace: company.team

variables:
bucket: s3-bucket
region: eu-west-2

tasks:
- id: process_data
type: io.kestra.plugin.scripts.python.Commands
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/kestrapy:latest
namespaceFiles:
enabled: true
inputFiles:
input.csv: "{{ read(trigger.objects[0].uri) }}"
outputFiles:
- data.csv
commands:
- python process_data.py

triggers:
- id: watch
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT1S"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "{{ vars.region }}"
bucket: "{{ vars.bucket }}"
filter: FILES
action: MOVE
moveTo:
key: archive/
maxKeys: 1
extend:
title: "Detect New Files in S3 and process them in Python"
description: >-
This flow will be triggered whenever a new file is
detected in the specified S3 bucket. It will download the file into the
internal storage and move the S3 object to an `archive` folder (i.e. S3
object prefix with the name `archive`).

The Python code will read the file as an `inputFile` called `input.csv`
and processing it to generate a new file called `data.csv`.


It's recommended to set the `accessKeyId` and `secretKeyId` properties as
secrets.


This flow assumes AWS credentials stored as secrets `AWS_ACCESS_KEY_ID`,
`AWS_SECRET_ACCESS_KEY`.
tags:
- S3
- Trigger
- Python
ee: false
demo: false
meta_description: This flow triggers on new file detection in a specified S3
bucket. It downloads files to internal storage and moves objects to an
'archive' folder. The file is processed by a Python script.
71 changes: 71 additions & 0 deletions shell-execute-code.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
id: shell-execute-code
namespace: company.team

inputs:
- id: dataset_url
type: STRING
defaults: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv

tasks:
- id: download_dataset
type: io.kestra.plugin.core.http.Download
uri: "{{ inputs.dataset_url }}"

- id: c_code
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: gcc:latest
commands:
- gcc example.c
- ./a.out
inputFiles:
orders.csv: "{{ outputs.download_dataset.uri }}"
example.c: |
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
FILE *file = fopen("orders.csv", "r");
if (!file) {
printf("Error opening file!\n");
return 1;
}

char line[1024];
double total_revenue = 0.0;

fgets(line, 1024, file);
while (fgets(line, 1024, file)) {
char *token = strtok(line, ",");
int i = 0;
double total = 0.0;

while (token) {
if (i == 6) {
total = atof(token);
total_revenue += total;
}
token = strtok(NULL, ",");
i++;
}
}

fclose(file);
printf("Total Revenue: $%.2f\n", total_revenue);

return 0;
}
extend:
title: Run C code inside of a Shell environment
description: This flow uses a Shell Command to run C code with an inputFile generated dynamically from a task upstream.
tags:
- CLI
- Inputs
- Software Engineering
- Outputs
ee: false
demo: true
meta_description: This flow executes Shell Shell
Commands to run C code.
2 changes: 1 addition & 1 deletion shell-scripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ tasks:
- cut -d ',' -f 2 file.csv | head -n 6
extend:
title: Run Shell Scripts and Shell commands in a working directory using a
PROCESS runner
Process Task Runner
description: This flow sequentially executes Shell Scripts and Shell Commands in
the same working directory using a local `Process` Task Runner.
tags:
Expand Down