Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Failure Join Node IN_PROGESS Does Not Change
Browse files Browse the repository at this point in the history
  • Loading branch information
ghkdwlgns612 committed Nov 2, 2023
1 parent d164531 commit ca656ff
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ private void retry(WorkflowModel workflow) {
for (TaskModel task : workflow.getTasks()) {
switch (task.getStatus()) {
case FAILED:
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())) {
task.setStatus(IN_PROGRESS);
addTaskToQueue(task);
// Task doesn't have to be updated yet. Will be updated along with other
// Workflow tasks downstream.
} else {
retriableMap.put(task.getReferenceTaskName(), task);
}
break;
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
retriableMap.put(task.getReferenceTaskName(), task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ public void testRetryWorkflowMultipleRetries() {
}

@Test
public void testRetryWorkflowWithJoinTask() {
public void testRetryWorkflowWithCancelledJoinTask() {
// setup
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("testRetryWorkflowId");
Expand Down Expand Up @@ -1100,6 +1100,78 @@ public void testRetryWorkflowWithJoinTask() {
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
}

@Test
public void testRetryWorkflowWithFailedJoinTask() {
// setup
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("testRetryWorkflowId");
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("testRetryWorkflowId");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRetryWorkflowId");
workflow.setCreateTime(10L);
workflow.setEndTime(100L);
//noinspection unchecked
workflow.setOutput(Collections.EMPTY_MAP);
workflow.setStatus(WorkflowModel.Status.FAILED);

TaskModel forkTask = new TaskModel();
forkTask.setTaskType(TaskType.FORK_JOIN.toString());
forkTask.setTaskId(UUID.randomUUID().toString());
forkTask.setSeq(1);
forkTask.setRetryCount(1);
forkTask.setStatus(TaskModel.Status.COMPLETED);
forkTask.setReferenceTaskName("task_fork");

TaskModel task_1_1 = new TaskModel();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
task_1_1.setRetryCount(1);
task_1_1.setTaskType(TaskType.SIMPLE.toString());
task_1_1.setStatus(TaskModel.Status.FAILED);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

TaskModel task_2_1 = new TaskModel();
task_2_1.setTaskId(UUID.randomUUID().toString());
task_2_1.setSeq(22);
task_2_1.setRetryCount(1);
task_2_1.setStatus(TaskModel.Status.CANCELED);
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");

TaskModel joinTask = new TaskModel();
joinTask.setTaskType(TaskType.JOIN.toString());
joinTask.setTaskId(UUID.randomUUID().toString());
joinTask.setSeq(25);
joinTask.setRetryCount(1);
joinTask.setStatus(TaskModel.Status.FAILED);
joinTask.setReferenceTaskName("task_join");
joinTask.getInputData()
.put(
"joinOn",
Arrays.asList(
task_1_1.getReferenceTaskName(), task_2_1.getReferenceTaskName()));

workflow.getTasks().addAll(Arrays.asList(forkTask, task_1_1, task_2_1, joinTask));
// end of setup

// when:
when(executionDAOFacade.getWorkflowModel(anyString(), anyBoolean())).thenReturn(workflow);
when(metadataDAO.getWorkflowDef(anyString(), anyInt()))
.thenReturn(Optional.of(new WorkflowDef()));

workflowExecutor.retry(workflow.getWorkflowId(), false);

assertEquals(6, workflow.getTasks().size());
assertEquals(WorkflowModel.Status.FAILED, workflow.getPreviousStatus());
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
}

@Test
public void testRetryFromLastFailedSubWorkflowTaskThenStartWithLastFailedTask() {
IDGenerator idGenerator = new IDGenerator();
Expand Down

0 comments on commit ca656ff

Please sign in to comment.