Skip to content

Commit

Permalink
Merge pull request #13 from japerry911/python/japerry911-refactor
Browse files Browse the repository at this point in the history
Python Refactor / Improvements / Add Documentation
  • Loading branch information
anna-geller authored Feb 1, 2025
2 parents c17f4b9 + fe96f94 commit f146dd3
Show file tree
Hide file tree
Showing 16 changed files with 818 additions and 455 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ jobs:
with:
python-version: "3.10"

- name: Copy Readme
working-directory: python
run: |
curl -s https://raw.githubusercontent.com/kestra-io/kestra/master/README.md > README.md
- name: Install pypa/build
working-directory: python
run: python -m pip install build --user
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ python/src/kestra.egg-info
python/dist
python/build
python/gradle.properties
python/README.md
python/**/__pycache__
python/venv
python/.venv

### Javascript
javascript/gradle.properties
Expand Down
14 changes: 14 additions & 0 deletions python/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[flake8]
ignore =
# Enforced by Black: line break before binary operator (W503)
W503,
# Enforced by Black: line break after binary operator (W504)
W504
exclude =
.venv/*
dbt/dbt_packages/*
max-line-length = 88
max-complexity = 22
per-file-ignores =
# ignore long lines in tests
tests/*: E501
164 changes: 164 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Kestra Python Client

This Python client provides functionality to interact with the Kestra server for sending metrics, outputs, and logs, as well as executing/polling flows.

## Installation

```bash
pip install kestra
```

## Kestra Class

The `Kestra` class is responsible for sending metrics, outputs, and logs to the Kestra server.

### Methods

- **_send(map_: dict)**: Sends a message to the Kestra server.
- **format(map_: dict) -> str**: Formats a message to be sent to the Kestra server.
- **_metrics(name: str, type_: str, value: int, tags: dict | None = None)**: Sends a metric to the Kestra server.
- **outputs(map_: dict)**: Sends outputs to the Kestra server.
- **counter(name: str, value: int, tags: dict | None = None)**: Sends a counter to the Kestra server.
- **timer(name: str, duration: int | Callable, tags: dict | None = None)**: Sends a timer to the Kestra server.
- **logger() -> Logger**: Retrieves the logger for the Kestra server.

## Flow Class

The `Flow` class is used to execute a Kestra flow and optionally wait for its completion. It can also be used to get the status of an execution and the logs of an execution.

### Initialization

```python
flow = Flow(
wait_for_completion=True, # default is True
poll_interval=1, # seconds. default is 1
labels_from_inputs=False, # default is False
tenant=None # default is None
)
```

You can also set the hostname and authentication credentials using environment variables:

```bash
export KESTRA_HOSTNAME=http://localhost:8080
export KESTRA_USER=admin
export KESTRA_PASSWORD=admin
export KESTRA_API_TOKEN=my_api_token
```

It is worth noting that the KESTRA_API_TOKEN or KESTRA_USER and KESTRA_PASSWORD need to be used, you do not need all at once. The possible Authentication patterns are:

1. KESTRA_API_TOKEN
2. KESTRA_USER and KESTRA_PASSWORD
3. No Authentication (not recommended for production environments)

### Methods

- **_make_request(method: str, url: str, \*\*kwargs) -> requests.Response**: Makes a request to the Kestra server with optional authentication and retries.
- **check_status(execution_id: str) -> requests.Response**: Checks the status of an execution.
- **get_logs(execution_id: str) -> requests.Response**: Retrieves the logs of an execution.
- **execute(namespace: str, flow: str, inputs: dict = None) -> namedtuple**: Executes a Kestra flow and optionally waits for its completion. The namedtuple returned is a namedtuple with the following properties:
- **status**: The status of the execution.
- **log**: The log of the execution.
- **error**: The error of the execution.

### Usage Examples

1. **Trigger a flow and wait for its completion:**

```python
from kestra import Flow
flow = Flow()
flow.execute('mynamespace', 'myflow', {'param': 'value'})
```

2. **Set labels from inputs:**

```python
from kestra import Flow
flow = Flow(labels_from_inputs=True)
flow.execute('mynamespace', 'myflow', {'param': 'value'})
```

3. **Pass a text file to an input of type FILE named 'myfile':**

```python
from kestra import Flow
flow = Flow()
with open('example.txt', 'rb') as fh:
flow.execute('mynamespace', 'myflow', {'files': ('myfile', fh, 'text/plain')})
```

4. **Fire and forget:**

```python
from kestra import Flow
flow = Flow(wait_for_completion=False)
flow.execute('mynamespace', 'myflow', {'param': 'value'})
```

5. **Overwrite the username and password:**

```python
from kestra import Flow
flow = Flow()
flow.user = 'admin'
flow.password = 'admin'
flow.execute('mynamespace', 'myflow')
```

6. **Set the hostname, username, and password using environment variables:**

```python
from kestra import Flow
import os

os.environ["KESTRA_HOSTNAME"] = "http://localhost:8080"
os.environ["KESTRA_USER"] = "admin"
os.environ["KESTRA_PASSWORD"] = "admin"
flow = Flow()
flow.execute('mynamespace', 'myflow', {'param': 'value'})
```

## Error Handling

The client includes retry logic with exponential backoff for certain HTTP status codes, and raises a `FailedExponentialBackoff` exception if the request fails after multiple retries.

## Kestra Class

### Logging

The `Kestra` class provides a logger that formats logs in JSON format, making it easier to integrate with log management systems.

```python
from kestra import Kestra

Kestra.logger().info("Hello, world!")
```

### Outputs

The `Kestra` class provides a method to send key-value-based outputs to
the Kestra server. If you want to output large objects, write them to a
file and specify them within the `outputFiles` property of the Python
script task.

```python
Kestra.outputs({"my_output": "my_value"})
```

### Counters

The `Kestra` class provides a method to send counter metrics to the Kestra server.

```python
Kestra.counter("my_counter", 1)
```

### Timers

The `Kestra` class provides a method to send timer metrics to the Kestra server.

```python
Kestra.timer("my_timer", 1)
```
19 changes: 11 additions & 8 deletions python/setup.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
from setuptools import setup
from pathlib import Path
import re
import os
from pathlib import Path

from setuptools import setup

version = os.getenv('VERSION', '0.0.0')
version = os.getenv("VERSION", "0.0.0")

setup(
name="kestra",
version=version
.replace("v", ""),
version=version.replace("v", ""),
package_dir={"": "src"},
include_package_data=True,
install_requires=["requests"],
extras_require={
'test': ["pytest", "requests_mock"],
"test": ["pytest", "requests_mock", "pytest-mock"],
"dev": ["isort", "black", "flake8"],
},
python_requires=">=3",
description="Kestra is an infinitely scalable orchestration and scheduling platform, creating, running, scheduling, and monitoring millions of complex pipelines.",
description=(
"Kestra is an infinitely scalable orchestration and scheduling platform, "
"creating, running, scheduling, and monitoring millions of complex pipelines."
),
long_description=(Path(__file__).parent / "README.md").read_text(),
long_description_content_type="text/markdown",
url="https://kestra.io",
Expand Down
6 changes: 6 additions & 0 deletions python/src/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class FailedExponentialBackoff(Exception):
"""
Exception raised when the exponential backoff fails.
"""

pass
Loading

0 comments on commit f146dd3

Please sign in to comment.