diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 5b905a2853..d23c4f5a25 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -42,6 +42,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; import com.netflix.conductor.dao.*; import com.netflix.conductor.metrics.Monitors; @@ -513,6 +514,9 @@ public void updateTask(TaskModel taskModel) { if (!properties.isAsyncIndexingEnabled()) { indexDAO.indexTask(new TaskSummary(taskModel.toTask())); } + } catch (TerminateWorkflowException e) { + // re-throw it so we can terminate the workflow + throw e; } catch (Exception e) { String errorMsg = String.format( diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index eab4961c72..0159c82646 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -60,7 +60,10 @@ public boolean execute( if (hasFailures) { failureReason.append(forkedTask.getReasonForIncompletion()).append(" "); } - task.addOutput(joinOnRef, forkedTask.getOutputData()); + // Only add to task output if it's not empty + if (!forkedTask.getOutputData().isEmpty()) { + task.addOutput(joinOnRef, forkedTask.getOutputData()); + } if (!taskStatus.isTerminal()) { allDone = false; } diff --git a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java index 80a266acc8..5e19ba956d 100644 --- a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java +++ b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; import org.apache.commons.io.IOUtils; import org.junit.Before; @@ -29,10 +30,13 @@ import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.execution.TestDeciderService; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; import com.netflix.conductor.dao.*; +import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,4 +168,19 @@ public void testAddEventExecution() { assertTrue(added); verify(indexDAO, times(1)).asyncAddEventExecution(any()); } + + @Test(expected = TerminateWorkflowException.class) + public void testUpdateTaskThrowsTerminateWorkflowException() { + TaskModel task = new TaskModel(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(UUID.randomUUID().toString()); + task.setTaskDefName("task1"); + + doThrow(new TerminateWorkflowException("failed")) + .when(externalPayloadStorageUtils) + .verifyAndUpload(task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT); + + executionDAOFacade.updateTask(task); + } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index 782996a488..46ec3342ef 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -673,4 +673,93 @@ class DynamicForkJoinSpec extends AbstractSpecification { cleanup: "roll back the change made to integration_task_2 definition" metadataService.updateTaskDef(persistedTask2Definition) } + + def "Test dynamic fork join empty output"() { + when: " a dynamic fork join workflow is started" + def workflowInstanceId = workflowExecutor.startWorkflow(DYNAMIC_FORK_JOIN_WF, 1, + 'dynamic_fork_join_workflow', [:], + null, null, null) + + then: "verify that the workflow has been successfully started and the first task is in scheduled state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.SCHEDULED + } + + when: " the first task is 'integration_task_1' output has a list of dynamic tasks" + WorkflowTask workflowTask2 = new WorkflowTask() + workflowTask2.name = 'integration_task_2' + workflowTask2.taskReferenceName = 'xdt1' + + WorkflowTask workflowTask3 = new WorkflowTask() + workflowTask3.name = 'integration_task_3' + workflowTask3.taskReferenceName = 'xdt2' + + def dynamicTasksInput = ['xdt1': ['k1': 'v1'], 'xdt2': ['k2': 'v2']] + + and: "The 'integration_task_1' is polled and completed" + def pollAndCompleteTask1Try = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.worker', + ['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': dynamicTasksInput]) + + then: "verify that the task was completed" + workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask1Try) + + and: "verify that workflow has progressed further ahead and new dynamic tasks have been scheduled" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.COMPLETED + tasks[1].taskType == 'FORK' + tasks[1].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[2].status == Task.Status.SCHEDULED + tasks[3].taskType == 'integration_task_3' + tasks[3].status == Task.Status.SCHEDULED + tasks[4].taskType == 'JOIN' + tasks[4].status == Task.Status.IN_PROGRESS + tasks[4].referenceTaskName == 'dynamicfanouttask_join' + } + + when: "Poll and complete 'integration_task_2' and 'integration_task_3'" + def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker') + def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker') + + then: "verify that the tasks were polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) + workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2']) + + and: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 6 + tasks[2].taskType == 'integration_task_2' + tasks[2].status == Task.Status.COMPLETED + tasks[3].taskType == 'integration_task_3' + tasks[3].status == Task.Status.COMPLETED + tasks[4].taskType == 'JOIN' + tasks[4].inputData['joinOn'] == ['xdt1', 'xdt2'] + tasks[4].status == Task.Status.COMPLETED + tasks[4].referenceTaskName == 'dynamicfanouttask_join' + tasks[4].outputData.isEmpty() + tasks[5].taskType == 'integration_task_4' + tasks[5].status == Task.Status.SCHEDULED + } + + when: "Poll and complete 'integration_task_4'" + def pollAndCompleteTask4Try = workflowTestUtil.pollAndCompleteTask('integration_task_4', 'task4.worker') + + then: "verify that the tasks were polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask4Try) + + and: "verify that the workflow is complete" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 6 + tasks[5].taskType == 'integration_task_4' + tasks[5].status == Task.Status.COMPLETED + } + } }