Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Make the task definition optional for the DO_WHILE task (#1645)
Browse files Browse the repository at this point in the history
* make task definition optional for DO_WHILE task

* add test case for do_while task without task definition

Co-authored-by: u447 <[email protected]>
  • Loading branch information
rickfish and u447 authored Apr 29, 2020
1 parent 84799c5 commit 3cafdf9
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
int retryCount = taskMapperContext.getRetryCount();
TaskDef taskDefinition = Optional.ofNullable(taskMapperContext.getTaskDefinition())
.orElseGet(() -> Optional.ofNullable(metadataDAO.getTaskDef(taskToSchedule.getName()))
.orElseThrow(() -> {
String reason = String.format("Invalid task specified. Cannot find task by name %s in the task definitions", taskToSchedule.getName());
return new TerminateWorkflowException(reason);
}));
.orElseGet(() -> new TaskDef()));

Task loopTask = new Task();
loopTask.setTaskType(SystemTaskType.DO_WHILE.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.events.ScriptEvaluator;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -127,7 +129,13 @@ boolean markLoopTaskSuccess(Task task) {

@VisibleForTesting
boolean getEvaluatedCondition(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) throws ScriptException {
TaskDef taskDefinition = workflowExecutor.getTaskDefinition(task);
TaskDef taskDefinition = null;
try {
taskDefinition = workflowExecutor.getTaskDefinition(task);
} catch(TerminateWorkflowException e) {
// It is ok to not have a task definition for a DO_WHILE task
}

Map<String, Object> taskInput = parametersUtils.getTaskInputV2(task.getWorkflowTask().getInputParameters(), workflow, task.getTaskId(), taskDefinition);
taskInput.put(task.getReferenceTaskName(), task.getOutputData());
List<Task> loopOver = workflow.getTasks().stream().filter(t -> (task.getWorkflowTask().has(TaskUtils.removeIterationFromTaskRefName(t.getReferenceTaskName())) && !task.getReferenceTaskName().equals(t.getReferenceTaskName()))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public void testForkJoin() throws Exception {
@Test
public void testDoWhileSingleIteration() throws Exception {
try {
createDoWhileWorkflowWithIteration(1, false);
createDoWhileWorkflowWithIteration(1, false, true);
} catch (Exception e) {
}
TaskDef taskDef = new TaskDef();
Expand Down Expand Up @@ -718,7 +718,7 @@ public void testDoWhileSingleIteration() throws Exception {
@Test
public void testDoWhileTwoIteration() throws Exception {
try {
createDoWhileWorkflowWithIteration(2, false);
createDoWhileWorkflowWithIteration(2, false, true);
} catch (Exception e) {
}

Expand Down Expand Up @@ -847,7 +847,91 @@ public void testDoWhileTwoIteration() throws Exception {
@Test
public void testLoopConditionWithInputParamter() throws Exception {
try {
createDoWhileWorkflowWithIteration(2, true);
createDoWhileWorkflowWithIteration(2, true, true);
} catch (Exception e) {
}

TaskDef taskDef = new TaskDef();
taskDef.setName("http1");
taskDef.setTimeoutSeconds(2);
taskDef.setRetryCount(1);
taskDef.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef.setRetryDelaySeconds(10);
metadataService.registerTaskDef(Arrays.asList(taskDef));

TaskDef taskDef2 = new TaskDef();
taskDef2.setName("http0");
taskDef2.setTimeoutSeconds(2);
taskDef2.setRetryCount(1);
taskDef2.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef2.setRetryDelaySeconds(10);
metadataService.registerTaskDef(Arrays.asList(taskDef2));

TaskDef taskDef1 = new TaskDef();
taskDef1.setName("http2");
taskDef1.setTimeoutSeconds(2);
taskDef1.setRetryCount(1);
taskDef1.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef1.setRetryDelaySeconds(10);
metadataService.registerTaskDef(Arrays.asList(taskDef1));

Map<String, Object> input = new HashMap<>();
String workflowId = startOrLoadWorkflowExecution(DO_WHILE_WF + "_3", 1, "looptest", input, null, null);
System.out.println("testDoWhile.wfid=" + workflowId);
printTaskStatuses(workflowId, "initiated");

Workflow workflow = workflowExecutionService.getExecutionStatus(workflowId, true);
assertNotNull(workflow);
assertEquals("Found " + workflow.getTasks(), RUNNING, workflow.getStatus());

Task task = workflowExecutionService.poll("HTTP", "test");
assertNotNull(task);
assertTrue(task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration())));
assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId()));

task.setStatus(COMPLETED);
workflowExecutionService.updateTask(task);

task = workflowExecutionService.poll("FORK_JOIN", "test");
assertNull(task); // fork task is completed

task = workflowExecutionService.poll("HTTP", "test");
assertNotNull(task);
assertTrue(task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration())));
assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId()));

task.setStatus(COMPLETED);
workflowExecutionService.updateTask(task);

task = workflowExecutionService.poll("HTTP", "test");
assertNotNull(task);
assertTrue(task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration())));
assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId()));

task.setStatus(COMPLETED);
workflowExecutionService.updateTask(task);

task = workflowExecutionService.poll("JOIN", "test");
assertNull(task); // Both HTTP task completed.

workflow = workflowExecutionService.getExecutionStatus(workflowId, true);
assertNotNull(workflow);
assertEquals("Found " + workflow.getTasks(), WorkflowStatus.COMPLETED, workflow.getStatus());
}

@Test
public void testLoopConditionWithInputParamterWithDef() throws Exception {
testLoopConditionWithInputParamter(true);
}

@Test
public void testLoopConditionWithInputParamterNoDef() throws Exception {
testLoopConditionWithInputParamter(false);
}

private void testLoopConditionWithInputParamter(boolean useDef) throws Exception {
try {
createDoWhileWorkflowWithIteration(2, true, useDef);
} catch (Exception e) {
}

Expand Down Expand Up @@ -1782,7 +1866,7 @@ private void createDecisionWorkflow() {
metadataService.updateWorkflowDef(workflowDef);
}

private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputParameter) {
private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputParameter, boolean useTaskDef) {
WorkflowDef workflowDef = new WorkflowDef();
if (isInputParameter) {
workflowDef.setName(DO_WHILE_WF + "_3");
Expand All @@ -1802,14 +1886,15 @@ private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputPa
input.put("value", "${workflow.input.loop}");
loopTask.setInputParameters(input);

TaskDef taskDef = new TaskDef();
taskDef.setName("loopTask");
taskDef.setTimeoutSeconds(200);
taskDef.setRetryCount(1);
taskDef.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef.setRetryDelaySeconds(10);

metadataService.registerTaskDef(Arrays.asList(taskDef));
if(useTaskDef) {
TaskDef taskDef = new TaskDef();
taskDef.setName("loopTask");
taskDef.setTimeoutSeconds(200);
taskDef.setRetryCount(1);
taskDef.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef.setRetryDelaySeconds(10);
metadataService.registerTaskDef(Arrays.asList(taskDef));
}

Map<String, Object> inputParams1 = new HashMap<>();
inputParams1.put("p1", "workflow.input.param1");
Expand Down Expand Up @@ -1858,14 +1943,15 @@ private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputPa
workflowDef.getTasks().add(loopTask);

if (iteration == 2 && isInputParameter == false) {
TaskDef taskDef2 = new TaskDef();
taskDef2.setName("loopTask2");
taskDef2.setTimeoutSeconds(200);
taskDef2.setRetryCount(3);
taskDef2.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef2.setRetryDelaySeconds(10);

metadataService.registerTaskDef(Arrays.asList(taskDef2));
if(useTaskDef) {
TaskDef taskDef2 = new TaskDef();
taskDef2.setName("loopTask2");
taskDef2.setTimeoutSeconds(200);
taskDef2.setRetryCount(3);
taskDef2.setTimeoutPolicy(TimeoutPolicy.RETRY);
taskDef2.setRetryDelaySeconds(10);
metadataService.registerTaskDef(Arrays.asList(taskDef2));
}
WorkflowTask loopTask2 = new WorkflowTask();
loopTask2.setType(TaskType.DO_WHILE.name());
loopTask2.setTaskReferenceName("loopTask2");
Expand Down

0 comments on commit 3cafdf9

Please sign in to comment.