Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Failure Join Node IN_PROGESS Does Not Change #3833

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ private void retry(WorkflowModel workflow) {
for (TaskModel task : workflow.getTasks()) {
switch (task.getStatus()) {
case FAILED:
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())) {
task.setStatus(IN_PROGRESS);
break;
}
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
retriableMap.put(task.getReferenceTaskName(), task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ public void testRetryWorkflowMultipleRetries() {
}

@Test
public void testRetryWorkflowWithJoinTask() {
public void testRetryWorkflowWithCancelledJoinTask() {
// setup
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("testRetryWorkflowId");
Expand Down Expand Up @@ -1100,6 +1100,78 @@ public void testRetryWorkflowWithJoinTask() {
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
}

@Test
public void testRetryWorkflowWithFailedJoinTask() {
// setup
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("testRetryWorkflowId");
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("testRetryWorkflowId");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRetryWorkflowId");
workflow.setCreateTime(10L);
workflow.setEndTime(100L);
//noinspection unchecked
workflow.setOutput(Collections.EMPTY_MAP);
workflow.setStatus(WorkflowModel.Status.FAILED);

TaskModel forkTask = new TaskModel();
forkTask.setTaskType(TaskType.FORK_JOIN.toString());
forkTask.setTaskId(UUID.randomUUID().toString());
forkTask.setSeq(1);
forkTask.setRetryCount(1);
forkTask.setStatus(TaskModel.Status.COMPLETED);
forkTask.setReferenceTaskName("task_fork");

TaskModel task_1_1 = new TaskModel();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
task_1_1.setRetryCount(1);
task_1_1.setTaskType(TaskType.SIMPLE.toString());
task_1_1.setStatus(TaskModel.Status.FAILED);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

TaskModel task_2_1 = new TaskModel();
task_2_1.setTaskId(UUID.randomUUID().toString());
task_2_1.setSeq(22);
task_2_1.setRetryCount(1);
task_2_1.setStatus(TaskModel.Status.CANCELED);
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");

TaskModel joinTask = new TaskModel();
joinTask.setTaskType(TaskType.JOIN.toString());
joinTask.setTaskId(UUID.randomUUID().toString());
joinTask.setSeq(25);
joinTask.setRetryCount(1);
joinTask.setStatus(TaskModel.Status.FAILED);
joinTask.setReferenceTaskName("task_join");
joinTask.getInputData()
.put(
"joinOn",
Arrays.asList(
task_1_1.getReferenceTaskName(), task_2_1.getReferenceTaskName()));

workflow.getTasks().addAll(Arrays.asList(forkTask, task_1_1, task_2_1, joinTask));
// end of setup

// when:
when(executionDAOFacade.getWorkflowModel(anyString(), anyBoolean())).thenReturn(workflow);
when(metadataDAO.getWorkflowDef(anyString(), anyInt()))
.thenReturn(Optional.of(new WorkflowDef()));

workflowExecutor.retry(workflow.getWorkflowId(), false);

assertEquals(6, workflow.getTasks().size());
assertEquals(WorkflowModel.Status.FAILED, workflow.getPreviousStatus());
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
}

@Test
public void testRetryFromLastFailedSubWorkflowTaskThenStartWithLastFailedTask() {
IDGenerator idGenerator = new IDGenerator();
Expand Down