Skip to content

Commit

Permalink
Add TaskHandler (includes YAML configuration)
Browse files Browse the repository at this point in the history
  • Loading branch information
markus-kunze committed Nov 13, 2024
1 parent 6ff1051 commit 5f338a9
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 105 deletions.
23 changes: 23 additions & 0 deletions etc/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
flowable:

worker:
topics:
sentinel_discover_data:
module: worker.sentinel.tasks
handler: SentinelDiscoverHandler
sentinel_download_data:
module: worker.sentinel.tasks
handler: SentinelDownloadHandler

handlers:
SentinelDiscoverHandler:
subscription_config:
number_of_retries: 10

SentinelDownloadHandler:
subscription_config:
number_of_retries: 20
handler_config:
download_timeout: 10
base_dir: /data/sentinel

30 changes: 30 additions & 0 deletions src/worker/common/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
import yaml
import importlib
from pathlib import Path
from datetime import datetime
from worker.common.log_utils import configure_logging
from worker.common.client import flowableClient
Expand All @@ -13,6 +16,33 @@ class SubscriptionManager:
def __init__(self):
self.client = flowableClient
self.subscriptions = {}
self._subscribe_handlers_from_config()


def _subscribe_handlers_from_config(self):
# Load config
config_path = Path(__file__).parent.parent.parent.parent / "etc/config.yaml"
with open(config_path) as f:
config_all = yaml.safe_load(f)
config_worker = config_all["worker"]

# Create handlers map using config
handler_instances = {}
for topic, handler_config in config_worker["topics"].items():
module = importlib.import_module(handler_config["module"])
handler_class = getattr(module, handler_config["handler"])
handler = handler_class(config_worker["handlers"])
handler_instances[topic] = handler

# Subscribe handlers
for topic, handler in handler_instances.items():
self.subscribe(
topic=topic,
settings={
"callback_handler": handler.execute,
**handler.subscription_config,
}
)

def subscriptions_info(self):
subscriptions = {}
Expand Down
21 changes: 21 additions & 0 deletions src/worker/common/task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from worker.common.types import ExternalJob, JobResultBuilder, JobResult

class TaskHandler:
def __init__(self, handlers_config: dict):
self.log_context = {}
handler_name = self.__class__.__name__
self.config_all = handlers_config.get(handler_name, {})

# Merge with base config
self.subscription_config = {
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
**self.config_all.get("subscription_config", {}),
}
self.handler_config = self.config_all.get("handler_config", {})

def execute(self, job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
raise NotImplementedError
7 changes: 0 additions & 7 deletions src/worker/sentinel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from worker.common.config import Config
from worker.common.manager import SubscriptionManager
from worker.common.log_utils import configure_logging
from worker.sentinel.tasks import tasks_config

manager = SubscriptionManager()


@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging()
Expand All @@ -19,13 +17,8 @@ async def lifespan(app: FastAPI):
# end all subs before fastapi server shutdown
manager.unsubscribe_all()


app = FastAPI(lifespan=lifespan)

for topic in tasks_config:
manager.subscribe(topic=topic, settings=tasks_config[topic])


@app.get("/subscriptions")
def get_subscriptions():
return {"subscriptions": manager.subscriptions_info()}
Expand Down
200 changes: 102 additions & 98 deletions src/worker/sentinel/tasks.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,62 @@
import os
import datetime
import time
import json
from dateutil.parser import parse
from worker.common.log_utils import configure_logging, log_with_context
from worker.common.types import ExternalJob, JobResultBuilder, JobResult
from worker.common.client import flowableClient
from registration_library.providers import esa_cdse as cdse
from worker.common.task_handler import TaskHandler

configure_logging()


def sentinel_discover_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
"""
Searches for new data since last workflow execution
Variables needed:
start_time
end_time
order_id
Variables set:
scenes: List of scenes found
"""

log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Discovering new sentinel data ...", log_context)

# Workflow variables
start_time = job.get_variable("start_time")
end_time = job.get_variable("end_time")
order_id = job.get_variable("order_id")
if order_id is None:
order_id = job.process_instance_id

if start_time is None and end_time is None:
history = flowableClient.get_process_instance_history(job.process_instance_id)
if "startTime" in history:
current_time = parse(history["startTime"]) # 2024-03-17T01:02:22.487+0000
log_with_context("use startTime from workflow: %s" % current_time, log_context)
else:
current_time = datetime.datetime.now()
log_with_context("use datetime.now() as startTime: %s" % current_time, log_context)
end_time = datetime.datetime(current_time.year, current_time.month, current_time.day, current_time.hour)
start_time = end_time - datetime.timedelta(hours=1)
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

log_with_context(f"Search interval: {start_time} - {end_time}", log_context)

# discovering scenes
scenes = []
scene1 = {"scene": {"name": "scene1"}}
# scene2 = {"scene": {"name": "scene2"}}
scenes.append(scene1)
# scenes.append(scene2)

# build result
return result.success().variable_json(name="scenes", value=scenes)
class SentinelDiscoverHandler(TaskHandler):
def execute(self, job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult :
"""
Searches for new data since last workflow execution
Variables needed:
start_time
end_time
order_id
Variables set:
scenes: List of scenes found
"""

log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Discovering new sentinel data ...", log_context)

# Workflow variables
start_time = job.get_variable("start_time")
end_time = job.get_variable("end_time")
order_id = job.get_variable("order_id")
if order_id is None:
order_id = job.process_instance_id

if start_time is None and end_time is None:
history = flowableClient.get_process_instance_history(job.process_instance_id)
if "startTime" in history:
current_time = parse(history["startTime"]) # 2024-03-17T01:02:22.487+0000
log_with_context("use startTime from workflow: %s" % current_time, log_context)
else:
current_time = datetime.datetime.now()
log_with_context("use datetime.now() as startTime: %s" % current_time, log_context)
end_time = datetime.datetime(current_time.year, current_time.month, current_time.day, current_time.hour)
start_time = end_time - datetime.timedelta(hours=1)
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
log_with_context(f"Search interval: {start_time} - {end_time}", log_context)

# Discovering scenes
try:
scenes = cdse.search_scenes_ingestion(date_from=start_time, date_to=end_time, filters=None)
except Exception as e:
log_with_context(f"Error searching scenes: {e}", log_context)
return result.error(f"Error searching scenes: {e}")

return result.success().variable_json(name="scenes", value=scenes)


def sentinel_download_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
Expand Down Expand Up @@ -114,53 +117,54 @@ def sentinel_register_metadata(job: ExternalJob, result: JobResultBuilder, confi
return result.success()


tasks_config = {
"sentinel_discover_data": {
"callback_handler": sentinel_discover_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_download_data": {
"callback_handler": sentinel_download_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_unzip": {
"callback_handler": sentinel_unzip,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_check_integrity": {
"callback_handler": sentinel_check_integrity,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_extract_metadata": {
"callback_handler": sentinel_extract_metadata,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_register_metadata": {
"callback_handler": sentinel_register_metadata,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
}
# tasks_config = {
# "sentinel_discover_data": {
# "callback_handler": sentinel_discover_data,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },
# "sentinel_download_data": {
# "callback_handler": sentinel_download_data,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },

# "sentinel_unzip": {
# "callback_handler": sentinel_unzip,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },
# "sentinel_check_integrity": {
# "callback_handler": sentinel_check_integrity,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },
# "sentinel_extract_metadata": {
# "callback_handler": sentinel_extract_metadata,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },
# "sentinel_register_metadata": {
# "callback_handler": sentinel_register_metadata,
# "lock_duration": "PT1M",
# "number_of_retries": 5,
# "scope_type": None,
# "wait_period_seconds": 1,
# "number_of_tasks": 1,
# },
# }

0 comments on commit 5f338a9

Please sign in to comment.