Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev' into 2.31
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Apr 14, 2021
2 parents 16a5192 + 4479c57 commit 1006a2f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
40 changes: 40 additions & 0 deletions client/python/conductor/ConductorWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
from conductor.conductor import WFClientMgr
from threading import Thread
import socket
from enum import Enum

hostname = socket.gethostname()

class TaskStatus(Enum):
IN_PROGRESS = 'IN_PROGRESS'
FAILED = 'FAILED'
FAILED_WITH_TERMINAL_ERROR = 'FAILED_WITH_TERMINAL_ERROR'
COMPLETED = 'COMPLETED'

def __str__(self):
return str(self.value)



class ConductorWorker:
"""
Expand Down Expand Up @@ -66,6 +77,35 @@ def __init__(self, server_url, thread_count, polling_interval, worker_id=None):
self.polling_interval = polling_interval
self.worker_id = worker_id or hostname

@staticmethod
def task_result(status: TaskStatus, output=None, logs=None, reasonForIncompletion=None):
"""
Get task result
Parameters
----------
status: TaskStatus
The status of the task
Ex: TaskStatus.COMPLETED
output: dict
results of task processing
logs: list
log list
reasonForIncompletion: str, optional
the reason for not completing the task if any
"""
if logs is None:
logs = []
if output is None:
output = {}
ret = {
'status': status.__str__(),
'output': output,
'logs': logs
}
if reasonForIncompletion:
ret['reasonForIncompletion'] = reasonForIncompletion
return ret

def execute(self, task, exec_function):
try:
resp = exec_function(task)
Expand Down
8 changes: 6 additions & 2 deletions client/python/kitchensink_workers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import print_function
from conductor.ConductorWorker import ConductorWorker
from conductor.ConductorWorker import ConductorWorker,TaskStatus

def execute(task):
return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}, 'logs': ['one','two']}
return ConductorWorker.task_result(
status=TaskStatus.COMPLETED,
output= {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0},
logs=['one','two']
)

def execute4(task):
forkTasks = [{"name": "task_1", "taskReferenceName": "task_1_1", "type": "SIMPLE"},{"name": "sub_workflow_4", "taskReferenceName": "wf_dyn", "type": "SUB_WORKFLOW", "subWorkflowParam": {"name": "sub_flow_1"}}];
Expand Down

0 comments on commit 1006a2f

Please sign in to comment.