diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index e9c326410b..91d39d8f51 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -336,6 +336,15 @@ private void retry(WorkflowModel workflow) { for (TaskModel task : workflow.getTasks()) { switch (task.getStatus()) { case FAILED: + if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())) { + task.setStatus(IN_PROGRESS); + addTaskToQueue(task); + // Task doesn't have to be updated yet. Will be updated along with other + // Workflow tasks downstream. + } else { + retriableMap.put(task.getReferenceTaskName(), task); + } + break; case FAILED_WITH_TERMINAL_ERROR: case TIMED_OUT: retriableMap.put(task.getReferenceTaskName(), task); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index bc3da8c609..56422ff441 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -1029,7 +1029,7 @@ public void testRetryWorkflowMultipleRetries() { } @Test - public void testRetryWorkflowWithJoinTask() { + public void testRetryWorkflowWithCancelledJoinTask() { // setup WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("testRetryWorkflowId"); @@ -1100,6 +1100,78 @@ public void testRetryWorkflowWithJoinTask() { assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus()); } + @Test + public void testRetryWorkflowWithFailedJoinTask() { + // setup + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId("testRetryWorkflowId"); + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testRetryWorkflowId"); + workflowDef.setVersion(1); + workflow.setWorkflowDefinition(workflowDef); + workflow.setOwnerApp("junit_testRetryWorkflowId"); + workflow.setCreateTime(10L); + workflow.setEndTime(100L); + //noinspection unchecked + workflow.setOutput(Collections.EMPTY_MAP); + workflow.setStatus(WorkflowModel.Status.FAILED); + + TaskModel forkTask = new TaskModel(); + forkTask.setTaskType(TaskType.FORK_JOIN.toString()); + forkTask.setTaskId(UUID.randomUUID().toString()); + forkTask.setSeq(1); + forkTask.setRetryCount(1); + forkTask.setStatus(TaskModel.Status.COMPLETED); + forkTask.setReferenceTaskName("task_fork"); + + TaskModel task_1_1 = new TaskModel(); + task_1_1.setTaskId(UUID.randomUUID().toString()); + task_1_1.setSeq(20); + task_1_1.setRetryCount(1); + task_1_1.setTaskType(TaskType.SIMPLE.toString()); + task_1_1.setStatus(TaskModel.Status.FAILED); + task_1_1.setTaskDefName("task1"); + task_1_1.setWorkflowTask(new WorkflowTask()); + task_1_1.setReferenceTaskName("task1_ref1"); + + TaskModel task_2_1 = new TaskModel(); + task_2_1.setTaskId(UUID.randomUUID().toString()); + task_2_1.setSeq(22); + task_2_1.setRetryCount(1); + task_2_1.setStatus(TaskModel.Status.CANCELED); + task_2_1.setTaskType(TaskType.SIMPLE.toString()); + task_2_1.setTaskDefName("task2"); + task_2_1.setWorkflowTask(new WorkflowTask()); + task_2_1.setReferenceTaskName("task2_ref1"); + + TaskModel joinTask = new TaskModel(); + joinTask.setTaskType(TaskType.JOIN.toString()); + joinTask.setTaskId(UUID.randomUUID().toString()); + joinTask.setSeq(25); + joinTask.setRetryCount(1); + joinTask.setStatus(TaskModel.Status.FAILED); + joinTask.setReferenceTaskName("task_join"); + joinTask.getInputData() + .put( + "joinOn", + Arrays.asList( + task_1_1.getReferenceTaskName(), task_2_1.getReferenceTaskName())); + + workflow.getTasks().addAll(Arrays.asList(forkTask, task_1_1, task_2_1, joinTask)); + // end of setup + + // when: + when(executionDAOFacade.getWorkflowModel(anyString(), anyBoolean())).thenReturn(workflow); + when(metadataDAO.getWorkflowDef(anyString(), anyInt())) + .thenReturn(Optional.of(new WorkflowDef())); + + workflowExecutor.retry(workflow.getWorkflowId(), false); + + assertEquals(6, workflow.getTasks().size()); + assertEquals(WorkflowModel.Status.FAILED, workflow.getPreviousStatus()); + assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus()); + } + @Test public void testRetryFromLastFailedSubWorkflowTaskThenStartWithLastFailedTask() { IDGenerator idGenerator = new IDGenerator();