Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Bugfix/jointask output (#3055)
Browse files Browse the repository at this point in the history
* re-throw TerminateWorkflowException for task update

* Add only non-empty task output for Join
  • Loading branch information
jxu-nflx authored Jun 22, 2022
1 parent 97a0393 commit a99e939
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.*;
import com.netflix.conductor.metrics.Monitors;
Expand Down Expand Up @@ -513,6 +514,9 @@ public void updateTask(TaskModel taskModel) {
if (!properties.isAsyncIndexingEnabled()) {
indexDAO.indexTask(new TaskSummary(taskModel.toTask()));
}
} catch (TerminateWorkflowException e) {
// re-throw it so we can terminate the workflow
throw e;
} catch (Exception e) {
String errorMsg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public boolean execute(
if (hasFailures) {
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
}
task.addOutput(joinOnRef, forkedTask.getOutputData());
// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
task.addOutput(joinOnRef, forkedTask.getOutputData());
}
if (!taskStatus.isTerminal()) {
allDone = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.junit.Before;
Expand All @@ -29,10 +30,13 @@
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.TestDeciderService;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.*;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -164,4 +168,19 @@ public void testAddEventExecution() {
assertTrue(added);
verify(indexDAO, times(1)).asyncAddEventExecution(any());
}

@Test(expected = TerminateWorkflowException.class)
public void testUpdateTaskThrowsTerminateWorkflowException() {
TaskModel task = new TaskModel();
task.setScheduledTime(1L);
task.setSeq(1);
task.setTaskId(UUID.randomUUID().toString());
task.setTaskDefName("task1");

doThrow(new TerminateWorkflowException("failed"))
.when(externalPayloadStorageUtils)
.verifyAndUpload(task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT);

executionDAOFacade.updateTask(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,93 @@ class DynamicForkJoinSpec extends AbstractSpecification {
cleanup: "roll back the change made to integration_task_2 definition"
metadataService.updateTaskDef(persistedTask2Definition)
}

def "Test dynamic fork join empty output"() {
when: " a dynamic fork join workflow is started"
def workflowInstanceId = workflowExecutor.startWorkflow(DYNAMIC_FORK_JOIN_WF, 1,
'dynamic_fork_join_workflow', [:],
null, null, null)

then: "verify that the workflow has been successfully started and the first task is in scheduled state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
}

when: " the first task is 'integration_task_1' output has a list of dynamic tasks"
WorkflowTask workflowTask2 = new WorkflowTask()
workflowTask2.name = 'integration_task_2'
workflowTask2.taskReferenceName = 'xdt1'

WorkflowTask workflowTask3 = new WorkflowTask()
workflowTask3.name = 'integration_task_3'
workflowTask3.taskReferenceName = 'xdt2'

def dynamicTasksInput = ['xdt1': ['k1': 'v1'], 'xdt2': ['k2': 'v2']]

and: "The 'integration_task_1' is polled and completed"
def pollAndCompleteTask1Try = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.worker',
['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': dynamicTasksInput])

then: "verify that the task was completed"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask1Try)

and: "verify that workflow has progressed further ahead and new dynamic tasks have been scheduled"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'FORK'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'integration_task_3'
tasks[3].status == Task.Status.SCHEDULED
tasks[4].taskType == 'JOIN'
tasks[4].status == Task.Status.IN_PROGRESS
tasks[4].referenceTaskName == 'dynamicfanouttask_join'
}

when: "Poll and complete 'integration_task_2' and 'integration_task_3'"
def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')
def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker')

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2'])

and: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 6
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'integration_task_3'
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'JOIN'
tasks[4].inputData['joinOn'] == ['xdt1', 'xdt2']
tasks[4].status == Task.Status.COMPLETED
tasks[4].referenceTaskName == 'dynamicfanouttask_join'
tasks[4].outputData.isEmpty()
tasks[5].taskType == 'integration_task_4'
tasks[5].status == Task.Status.SCHEDULED
}

when: "Poll and complete 'integration_task_4'"
def pollAndCompleteTask4Try = workflowTestUtil.pollAndCompleteTask('integration_task_4', 'task4.worker')

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask4Try)

and: "verify that the workflow is complete"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 6
tasks[5].taskType == 'integration_task_4'
tasks[5].status == Task.Status.COMPLETED
}
}
}

0 comments on commit a99e939

Please sign in to comment.