Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix next task not scheduled after switch completes (#3197)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx authored Aug 24, 2022
1 parent 6b8e920 commit 630b870
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ TaskModel createDynamicForkTask(
forkDynamicTask.setTaskDefName(TaskType.TASK_TYPE_FORK);
forkDynamicTask.setStartTime(System.currentTimeMillis());
forkDynamicTask.setEndTime(System.currentTimeMillis());
forkDynamicTask.setExecuted(true);
List<String> forkedTaskNames =
dynForkTasks.stream()
.map(WorkflowTask::getTaskReferenceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
forkTask.setEndTime(epochMillis);
forkTask.setInputData(taskInput);
forkTask.setStatus(TaskModel.Status.COMPLETED);
forkTask.setExecuted(true);

tasksToBeScheduled.add(forkTask);
List<List<WorkflowTask>> forkTasks = workflowTask.getForkTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
switchTask.getInputData().put("case", evalResult);
switchTask.getOutputData().put("evaluationResult", Collections.singletonList(evalResult));
switchTask.setStartTime(System.currentTimeMillis());
switchTask.setStatus(TaskModel.Status.COMPLETED);
switchTask.setExecuted(true);
switchTask.setStatus(TaskModel.Status.IN_PROGRESS);
tasksToBeScheduled.add(switchTask);

// get the list of tasks based on the evaluated expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testWorkflowWithNoTasksWithSwitch() throws Exception {
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
outcome = deciderService.decide(workflow);
assertFalse(outcome.isComplete);
assertEquals(outcome.tasksToBeUpdated.toString(), 1, outcome.tasksToBeUpdated.size());
assertEquals(outcome.tasksToBeUpdated.toString(), 3, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals("junit_task_3", outcome.tasksToBeScheduled.get(0).getTaskDefName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ class SwitchTaskSpec extends AbstractSpecification {
@Shared
def COND_TASK_WF = "ConditionalTaskWF"

@Shared
def SWITCH_NODEFAULT_WF = "SwitchWithNoDefaultCaseWF"

def setup() {
//initialization code for each feature
workflowTestUtil.registerWorkflows('simple_switch_task_integration_test.json',
'switch_and_fork_join_integration_test.json',
'conditional_switch_task_workflow_integration_test.json')
'conditional_switch_task_workflow_integration_test.json',
'switch_with_no_default_case_integration_test.json')
}

def "Test simple switch workflow"() {
Expand Down Expand Up @@ -351,4 +355,42 @@ class SwitchTaskSpec extends AbstractSpecification {
tasks[3].status == Task.Status.COMPLETED
}
}

def "Test switch with no default case workflow"() {
given: "Workflow input"
Map input = new HashMap<String, Object>()
input['param1'] = 'p1'
input['param2'] = 'p2'

when: "A switch workflow is started with the workflow input"
def workflowInstanceId = workflowExecutor.startWorkflow(SWITCH_NODEFAULT_WF, 1,
'switch_no_default_workflow', input,
null, null, null)

then: "verify that the workflow is in running state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[0].taskType == 'SWITCH'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.SCHEDULED
}

when: "the task 'integration_task_2' is polled and completed"
def polledAndCompletedTaskTry = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker')

then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTaskTry)

and: "verify that the 'integration_task_2' is COMPLETED and the workflow is in COMPLETED state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 2
tasks[0].taskType == 'SWITCH'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.COMPLETED
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"name": "SwitchWithNoDefaultCaseWF",
"description": "switch_with_no_default_case",
"version": 1,
"tasks": [
{
"name": "switchTask",
"taskReferenceName": "switchTask",
"inputParameters": {
"case": "${workflow.input.case}"
},
"type": "SWITCH",
"evaluatorType": "value-param",
"expression": "case",
"decisionCases": {
"c": [
{
"name": "integration_task_1",
"taskReferenceName": "t1",
"inputParameters": {
"p1": "${workflow.input.param1}",
"p2": "${workflow.input.param2}"
},
"type": "SIMPLE",
"decisionCases": {},
"defaultCase": [],
"forkTasks": [],
"startDelay": 0,
"joinOn": [],
"optional": false,
"defaultExclusiveJoinTask": [],
"asyncComplete": false,
"loopOver": []
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "integration_task_2",
"taskReferenceName": "t2",
"inputParameters": {
"p1": "${workflow.input.param1}",
"p2": "${workflow.input.param2}"
},
"type": "SIMPLE",
"decisionCases": {},
"defaultCase": [],
"forkTasks": [],
"startDelay": 0,
"joinOn": [],
"optional": false,
"defaultExclusiveJoinTask": [],
"asyncComplete": false,
"loopOver": []
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"ownerEmail": "[email protected]"
}

0 comments on commit 630b870

Please sign in to comment.