diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index c6f782fac4..6bae62ea11 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -18,9 +18,20 @@ from conductor.conductor import WFClientMgr from threading import Thread import socket +from enum import Enum hostname = socket.gethostname() +class TaskStatus(Enum): + IN_PROGRESS = 'IN_PROGRESS' + FAILED = 'FAILED' + FAILED_WITH_TERMINAL_ERROR = 'FAILED_WITH_TERMINAL_ERROR' + COMPLETED = 'COMPLETED' + + def __str__(self): + return str(self.value) + + class ConductorWorker: """ @@ -66,6 +77,35 @@ def __init__(self, server_url, thread_count, polling_interval, worker_id=None): self.polling_interval = polling_interval self.worker_id = worker_id or hostname + @staticmethod + def task_result(status: TaskStatus, output=None, logs=None, reasonForIncompletion=None): + """ + Get task result + Parameters + ---------- + status: TaskStatus + The status of the task + Ex: TaskStatus.COMPLETED + output: dict + results of task processing + logs: list + log list + reasonForIncompletion: str, optional + the reason for not completing the task if any + """ + if logs is None: + logs = [] + if output is None: + output = {} + ret = { + 'status': status.__str__(), + 'output': output, + 'logs': logs + } + if reasonForIncompletion: + ret['reasonForIncompletion'] = reasonForIncompletion + return ret + def execute(self, task, exec_function): try: resp = exec_function(task) diff --git a/client/python/kitchensink_workers.py b/client/python/kitchensink_workers.py index 2fcafdf992..31e95072e8 100644 --- a/client/python/kitchensink_workers.py +++ b/client/python/kitchensink_workers.py @@ -1,8 +1,12 @@ from __future__ import print_function -from conductor.ConductorWorker import ConductorWorker +from conductor.ConductorWorker import ConductorWorker,TaskStatus def execute(task): - return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}, 'logs': ['one','two']} + return ConductorWorker.task_result( + status=TaskStatus.COMPLETED, + output= {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}, + logs=['one','two'] + ) def execute4(task): forkTasks = [{"name": "task_1", "taskReferenceName": "task_1_1", "type": "SIMPLE"},{"name": "sub_workflow_4", "taskReferenceName": "wf_dyn", "type": "SUB_WORKFLOW", "subWorkflowParam": {"name": "sub_flow_1"}}];