Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
DoWhile checks if all tasks in an iteration are terminal before movin…
Browse files Browse the repository at this point in the history
…g onto the next
  • Loading branch information
aravindanr committed Jun 29, 2022
1 parent c751761 commit 27af660
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.events.ScriptEvaluator;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

Expand All @@ -38,11 +36,8 @@ public class DoWhile extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(DoWhile.class);

private final ParametersUtils parametersUtils;

public DoWhile(ParametersUtils parametersUtils) {
public DoWhile() {
super(TASK_TYPE_DO_WHILE);
this.parametersUtils = parametersUtils;
}

@Override
Expand All @@ -54,7 +49,6 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor exec
public boolean execute(
WorkflowModel workflow, TaskModel doWhileTaskModel, WorkflowExecutor workflowExecutor) {

boolean allDone = true;
boolean hasFailures = false;
StringBuilder failureReason = new StringBuilder();
Map<String, Object> output = new HashMap<>();
Expand All @@ -78,15 +72,18 @@ public boolean execute(
}
}
Collection<TaskModel> loopOverTasks = relevantTasks.values();
LOGGER.debug(
"Workflow {} waiting for tasks {} to complete iteration {}",
workflow.getWorkflowId(),
loopOverTasks.stream()
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toList()),
doWhileTaskModel.getIteration());

// if the loopOver collection is empty, no tasks inside the loop have been scheduled.

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Workflow {} waiting for tasks {} to complete iteration {}",
workflow.getWorkflowId(),
loopOverTasks.stream()
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toList()),
doWhileTaskModel.getIteration());
}

// if the loopOverTasks collection is empty, no tasks inside the loop have been scheduled.
// so schedule it and exit the method.
if (loopOverTasks.isEmpty()) {
doWhileTaskModel.setIteration(1);
Expand All @@ -103,27 +100,33 @@ public boolean execute(
output.put(
TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()),
loopOverTask.getOutputData());
allDone = taskStatus.isTerminal();
if (!allDone || hasFailures) {
if (hasFailures) {
break;
}
}
doWhileTaskModel
.getOutputData()
.put(String.valueOf(doWhileTaskModel.getIteration()), output);

if (hasFailures) {
LOGGER.debug(
"Task {} failed in {} iteration",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return updateLoopTask(
return markTaskFailure(
doWhileTaskModel, TaskModel.Status.FAILED, failureReason.toString());
} else if (!allDone) {
}

if (!isIterationComplete(doWhileTaskModel, relevantTasks)) {
// current iteration is not complete (all tasks inside the loop are not terminal)
return false;
}

// if we are here, the iteration is complete, and we need to check if there is a next
// iteration by evaluating the loopCondition
boolean shouldContinue;
try {
shouldContinue = getEvaluatedCondition(workflow, doWhileTaskModel, workflowExecutor);
shouldContinue = evaluateCondition(workflow, doWhileTaskModel);
LOGGER.debug(
"Task {} condition evaluated to {}",
doWhileTaskModel.getTaskId(),
Expand All @@ -137,64 +140,82 @@ public boolean execute(
"Task {} took {} iterations to complete",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return markLoopTaskSuccess(doWhileTaskModel);
return markTaskSuccess(doWhileTaskModel);
}
} catch (ScriptException e) {
String message =
String.format(
"Unable to evaluate condition %s , exception %s",
"Unable to evaluate condition %s, exception %s",
doWhileTaskModel.getWorkflowTask().getLoopCondition(), e.getMessage());
LOGGER.error(message);
LOGGER.error("Marking task {} failed with error.", doWhileTaskModel.getTaskId());
return updateLoopTask(
return markTaskFailure(
doWhileTaskModel, TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, message);
}
}

/**
* Check if all tasks in the current iteration have reached terminal state.
*
* @param doWhileTaskModel The {@link TaskModel} of DO_WHILE.
* @param referenceNameToModel Map of taskReferenceName to {@link TaskModel}.
* @return true if all tasks in DO_WHILE.loopOver are in <code>referenceNameToModel</code> and
* reached terminal state.
*/
private boolean isIterationComplete(
TaskModel doWhileTaskModel, Map<String, TaskModel> referenceNameToModel) {
List<WorkflowTask> workflowTasksInsideDoWhile =
doWhileTaskModel.getWorkflowTask().getLoopOver();
int iteration = doWhileTaskModel.getIteration();
boolean allTasksTerminal = true;
for (WorkflowTask workflowTaskInsideDoWhile : workflowTasksInsideDoWhile) {
String taskReferenceName =
TaskUtils.appendIteration(
workflowTaskInsideDoWhile.getTaskReferenceName(), iteration);
if (referenceNameToModel.containsKey(taskReferenceName)) {
TaskModel taskModel = referenceNameToModel.get(taskReferenceName);
if (!taskModel.getStatus().isTerminal()) {
allTasksTerminal = false;
break;
}
} else {
allTasksTerminal = false;
break;
}
}
return allTasksTerminal;
}

boolean scheduleNextIteration(
TaskModel task, WorkflowModel workflow, WorkflowExecutor workflowExecutor) {
TaskModel doWhileTaskModel, WorkflowModel workflow, WorkflowExecutor workflowExecutor) {
LOGGER.debug(
"Scheduling loop tasks for task {} as condition {} evaluated to true",
task.getTaskId(),
task.getWorkflowTask().getLoopCondition());
workflowExecutor.scheduleNextIteration(task, workflow);
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getWorkflowTask().getLoopCondition());
workflowExecutor.scheduleNextIteration(doWhileTaskModel, workflow);
return true; // Return true even though status not changed. Iteration has to be updated in
// execution DAO.
}

boolean updateLoopTask(TaskModel task, TaskModel.Status status, String failureReason) {
task.setReasonForIncompletion(failureReason);
task.setStatus(status);
boolean markTaskFailure(TaskModel taskModel, TaskModel.Status status, String failureReason) {
LOGGER.error("Marking task {} failed with error.", taskModel.getTaskId());
taskModel.setReasonForIncompletion(failureReason);
taskModel.setStatus(status);
return true;
}

boolean markLoopTaskSuccess(TaskModel task) {
boolean markTaskSuccess(TaskModel taskModel) {
LOGGER.debug(
"task {} took {} iterations to complete",
task.getTaskId(),
task.getIteration() + 1);
task.setStatus(TaskModel.Status.COMPLETED);
"Task {} took {} iterations to complete",
taskModel.getTaskId(),
taskModel.getIteration() + 1);
taskModel.setStatus(TaskModel.Status.COMPLETED);
return true;
}

@VisibleForTesting
boolean getEvaluatedCondition(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor)
throws ScriptException {
TaskDef taskDefinition = null;
try {
taskDefinition = workflowExecutor.getTaskDefinition(task);
} catch (TerminateWorkflowException e) {
// It is ok to not have a task definition for a DO_WHILE task
}

Map<String, Object> taskInput =
parametersUtils.getTaskInputV2(
task.getWorkflowTask().getInputParameters(),
workflow,
task.getTaskId(),
taskDefinition);
taskInput.put(task.getReferenceTaskName(), task.getOutputData());
boolean evaluateCondition(WorkflowModel workflow, TaskModel task) throws ScriptException {
Map<String, Object> conditionInput = new HashMap<>(task.getInputData());
conditionInput.put(task.getReferenceTaskName(), task.getOutputData());
List<TaskModel> loopOver =
workflow.getTasks().stream()
.filter(
Expand All @@ -210,17 +231,18 @@ boolean getEvaluatedCondition(
.collect(Collectors.toList());

for (TaskModel loopOverTask : loopOver) {
taskInput.put(
conditionInput.put(
TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()),
loopOverTask.getOutputData());
}

String condition = task.getWorkflowTask().getLoopCondition();
boolean shouldContinue = false;
boolean result = false;
if (condition != null) {
LOGGER.debug("Condition: {} is being evaluated", condition);
// Evaluate the expression by using the Nashorn based script evaluator
shouldContinue = ScriptEvaluator.evalBool(condition, taskInput);
result = ScriptEvaluator.evalBool(condition, conditionInput);
}
return shouldContinue;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class DoWhileSpec extends Specification {
@Subject
DoWhile doWhile

ParametersUtils parametersUtils
WorkflowExecutor workflowExecutor
TaskModel doWhileTaskModel

Expand All @@ -42,12 +41,11 @@ class DoWhileSpec extends Specification {

def setup() {
workflowExecutor = Mock(WorkflowExecutor.class)
parametersUtils = new ParametersUtils(new ObjectMapper())

task1 = new WorkflowTask(name: 'task1', taskReferenceName: 'task1')
task2 = new WorkflowTask(name: 'task2', taskReferenceName: 'task2')

doWhile = new DoWhile(parametersUtils)
doWhile = new DoWhile()
}

def "first iteration"() {
Expand Down Expand Up @@ -75,6 +73,39 @@ class DoWhileSpec extends Specification {
1 * workflowExecutor.scheduleNextIteration(doWhileTaskModel, workflowModel)
}

def "an iteration - one task is complete and other is not scheduled"() {
given: "WorkflowModel consists of one iteration of one task inside DO_WHILE already completed"
taskModel1 = createTaskModel(task1)

and: "loop over contains two tasks"
WorkflowTask doWhileWorkflowTask = new WorkflowTask(taskReferenceName: 'doWhileTask', type: TASK_TYPE_DO_WHILE)
doWhileWorkflowTask.loopCondition = "if (\$.doWhileTask['iteration'] < 2) { true; } else { false; }"
doWhileWorkflowTask.loopOver = [task1, task2] // two tasks

doWhileTaskModel = new TaskModel(workflowTask: doWhileWorkflowTask, taskId: UUID.randomUUID().toString(),
taskType: TASK_TYPE_DO_WHILE, referenceTaskName: doWhileWorkflowTask.taskReferenceName)
doWhileTaskModel.iteration = 1
doWhileTaskModel.outputData['iteration'] = 1
doWhileTaskModel.status = TaskModel.Status.IN_PROGRESS

def workflowModel = new WorkflowModel(workflowDefinition: new WorkflowDef(name: 'test_workflow'))
// setup the WorkflowModel
workflowModel.tasks = [doWhileTaskModel, taskModel1]

// this is the expected format of iteration 1's output data
def iteration1OutputData = [:]
iteration1OutputData[task1.taskReferenceName] = taskModel1.outputData

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)

then: "verify that the return value is false, since the iteration is not complete"
!retVal

and: "verify that the next iteration is NOT scheduled"
0 * workflowExecutor.scheduleNextIteration(doWhileTaskModel, workflowModel)
}

def "next iteration - one iteration of all tasks inside DO_WHILE are complete"() {
given: "WorkflowModel consists of one iteration of tasks inside DO_WHILE already completed"
taskModel1 = createTaskModel(task1)
Expand Down Expand Up @@ -113,9 +144,6 @@ class DoWhileSpec extends Specification {

and: "verify whether the first task in the next iteration is scheduled"
1 * workflowExecutor.scheduleNextIteration(doWhileTaskModel, workflowModel)

and: "verify that WorkflowExecutor.getTaskDefinition throws TerminateWorkflowException, execute method is not impacted"
1 * workflowExecutor.getTaskDefinition(doWhileTaskModel) >> { throw new TerminateWorkflowException("") }
}

def "next iteration - a task failed in the previous iteration"() {
Expand Down
Loading

0 comments on commit 27af660

Please sign in to comment.