Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #744 from Netflix/feature/retry_multiple_failed_ta…
Browse files Browse the repository at this point in the history
…sks-AL

Feature/retry multiple failed tasks al
  • Loading branch information
Alex authored Sep 9, 2018
2 parents a729c71 + 5293884 commit 854bcbc
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -68,6 +69,10 @@
import static com.netflix.conductor.core.execution.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.NOT_FOUND;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.maxBy;
import static java.util.stream.Collectors.toSet;

/**
* @author Viren Workflow services provider interface
Expand Down Expand Up @@ -152,7 +157,7 @@ public String startWorkflow(String workflowName, int workflowVersion, Map<String
.filter(task -> task.getType().equals(WorkflowTask.Type.SIMPLE.name()))
.map(WorkflowTask::getName)
.filter(task -> metadataDAO.getTaskDef(task) == null)
.collect(Collectors.toSet());
.collect(toSet());

if (!missingTaskDefs.isEmpty()) {
logger.error("Cannot find the task definitions for the following tasks used in workflow: {}", missingTaskDefs);
Expand Down Expand Up @@ -244,6 +249,12 @@ public void rewind(String workflowId) {
decide(workflowId);
}

/**
* Gets the last instance of each failed task and reschedule each
* Gets all cancelled tasks and schedule all of them except JOIN (join should change status to INPROGRESS)
* Switch workflow back to RUNNING status and aall decider.
* @param workflowId
*/
public void retry(String workflowId) {
Workflow workflow = executionDAO.getWorkflow(workflowId, true);
if (!workflow.getStatus().isTerminal()) {
Expand All @@ -253,50 +264,27 @@ public void retry(String workflowId) {
throw new ApplicationException(CONFLICT, "Workflow has not started yet");
}

// First get the failed task and the cancelled task
Task failedTask = null;
List<Task> cancelledTasks = new ArrayList<>();
for (Task t : workflow.getTasks()) {
if (t.getStatus().equals(FAILED)) {
failedTask = t;
} else if (t.getStatus().equals(CANCELED)) {
cancelledTasks.add(t);
}
}
List<Task> failedTasks = getFailedTasksToRetry(workflow);

List<Task> cancelledTasks = workflow.getTasks().stream()
.filter(x->CANCELED.equals(x.getStatus())).collect(Collectors.toList());

if (failedTask != null && failedTask.getStatus().isSuccessful()) {
if (failedTasks.isEmpty()) {
throw new ApplicationException(CONFLICT,
"The last task has not failed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again.");
"There are no failed tasks! Use restart if you want to attempt entire workflow execution again.");
}

List<Task> rescheduledTasks = new ArrayList<>();
// Now reschedule the failed task
Task taskToBeRetried = failedTask.copy();
taskToBeRetried.setTaskId(IDGenerator.generate());
taskToBeRetried.setRetriedTaskId(failedTask.getTaskId());
taskToBeRetried.setStatus(SCHEDULED);
taskToBeRetried.setRetryCount(failedTask.getRetryCount() + 1);
rescheduledTasks.add(taskToBeRetried);

// update the failed task in the DAO
failedTask.setRetried(true);
executionDAO.updateTask(failedTask);
failedTasks.forEach(failedTask -> {
rescheduledTasks.add(taskToBeRescheduled(failedTask));
});

// Reschedule the cancelled task but if the join is cancelled set that to in progress
cancelledTasks.forEach(task -> {
if (task.getTaskType().equalsIgnoreCase(WorkflowTask.Type.JOIN.toString())) {
task.setStatus(IN_PROGRESS);
executionDAO.updateTask(task);
cancelledTasks.forEach(cancelledTask -> {
if (cancelledTask.getTaskType().equalsIgnoreCase(WorkflowTask.Type.JOIN.toString())) {
cancelledTask.setStatus(IN_PROGRESS);
executionDAO.updateTask(cancelledTask);
} else {
Task taskToBeRescheduled = task.copy();
taskToBeRescheduled.setTaskId(IDGenerator.generate());
taskToBeRescheduled.setRetriedTaskId(task.getTaskId());
taskToBeRescheduled.setStatus(SCHEDULED);
taskToBeRescheduled.setRetryCount(task.getRetryCount() + 1);
rescheduledTasks.add(taskToBeRescheduled);
// since the canceled task is being retried, update this
task.setRetried(true);
executionDAO.updateTask(task);
rescheduledTasks.add(taskToBeRescheduled(cancelledTask));
}
});

Expand All @@ -309,13 +297,46 @@ public void retry(String workflowId) {
decide(workflowId);
}

/**
* Get all failed and cancelled tasks.
* for failed tasks - get one for each task reference name(latest failed using seq id)
* @param workflow
* @return list of latest failed tasks, one for each task reference reference type.
*/
@VisibleForTesting
List<Task> getFailedTasksToRetry(Workflow workflow) {
return workflow.getTasks().stream()
.filter(x -> FAILED.equals(x.getStatus()))
.collect(groupingBy(Task::getReferenceTaskName, maxBy(comparingInt(Task::getSeq))))
.values().stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
}

/**
* Reschedule a task
* @param task failed or cancelled task
* @return new instance of a task with "SCHEDULED" status
*/
private Task taskToBeRescheduled(Task task) {
Task taskToBeRetried = task.copy();
taskToBeRetried.setTaskId(IDGenerator.generate());
taskToBeRetried.setRetriedTaskId(task.getTaskId());
taskToBeRetried.setStatus(SCHEDULED);
taskToBeRetried.setRetryCount(task.getRetryCount() + 1);

// update the failed task in the DAO
task.setRetried(true);
executionDAO.updateTask(task);
return taskToBeRetried;
}

public Task getPendingTaskByWorkflow(String taskReferenceName, String workflowId) {
return executionDAO.getTasksForWorkflow(workflowId).stream()
.filter(isNonTerminalTask)
.filter(task -> task.getReferenceTaskName().equals(taskReferenceName))
.findFirst() // There can only be one task by a given reference name running at a time.
.orElse(null);
}


@VisibleForTesting
void completeWorkflow(Workflow wf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask.Type;
import com.netflix.conductor.common.run.Workflow;
Expand All @@ -44,11 +45,14 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -70,13 +74,14 @@ public class TestWorkflowExecutor {

private WorkflowExecutor workflowExecutor;
private ExecutionDAO executionDAO;
private MetadataDAO metadataDAO;
private QueueDAO queueDAO;

@Before
public void init() {
TestConfiguration config = new TestConfiguration();
MetadataDAO metadataDAO = mock(MetadataDAO.class);
executionDAO = mock(ExecutionDAO.class);
metadataDAO = mock(MetadataDAO.class);
queueDAO = mock(QueueDAO.class);
ObjectMapper objectMapper = new ObjectMapper();
ParametersUtils parametersUtils = new ParametersUtils();
Expand Down Expand Up @@ -256,4 +261,201 @@ public void testCompleteWorkflow() throws Exception {
assertEquals(1, updateTasksCalledCounter.get());
assertEquals(1, removeQueueEntryCalledCounter.get());
}

@Test
public void testGetFailedTasksToRetry() {
//setup
Task task_1_1 = new Task();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(1);
task_1_1.setStatus(Status.FAILED);
task_1_1.setTaskDefName("task_1_def");
task_1_1.setReferenceTaskName("task_1_ref_1");

Task task_1_2 = new Task();
task_1_2.setTaskId(UUID.randomUUID().toString());
task_1_2.setSeq(10);
task_1_2.setStatus(Status.FAILED);
task_1_2.setTaskDefName("task_1_def");
task_1_2.setReferenceTaskName("task_1_ref_2");

Task task_1_3_1 = new Task();
task_1_3_1.setTaskId(UUID.randomUUID().toString());
task_1_3_1.setSeq(100);
task_1_3_1.setStatus(Status.FAILED);
task_1_3_1.setTaskDefName("task_1_def");
task_1_3_1.setReferenceTaskName("task_1_ref_3");


Task task_1_3_2 = new Task();
task_1_3_2.setTaskId(UUID.randomUUID().toString());
task_1_3_2.setSeq(101);
task_1_3_2.setStatus(Status.FAILED);
task_1_3_2.setTaskDefName("task_1_def");
task_1_3_2.setReferenceTaskName("task_1_ref_3");


Task task_2_1 = new Task();
task_2_1.setTaskId(UUID.randomUUID().toString());
task_2_1.setSeq(2);
task_2_1.setStatus(Status.COMPLETED);
task_2_1.setTaskDefName("task_2_def");
task_2_1.setReferenceTaskName("task_2_ref_1");

Task task_2_2 = new Task();
task_2_2.setTaskId(UUID.randomUUID().toString());
task_2_2.setSeq(20);
task_2_2.setStatus(Status.FAILED);
task_2_2.setTaskDefName("task_2_def");
task_2_2.setReferenceTaskName("task_2_ref_2");

Task task_3_1 = new Task();
task_3_1.setTaskId(UUID.randomUUID().toString());
task_3_1.setSeq(20);
task_3_1.setStatus(Status.TIMED_OUT);
task_3_1.setTaskDefName("task_3_def");
task_3_1.setReferenceTaskName("task_3_ref_1");

Workflow workflow = new Workflow();

//2 different task definitions
workflow.setTasks(Arrays.asList(task_1_1,task_2_1));
List<Task> tasks = workflowExecutor.getFailedTasksToRetry(workflow);
assertEquals(1, tasks.size());
assertEquals(task_1_1.getTaskId(), tasks.get(0).getTaskId());

//2 tasks with the same definition but different reference numbers
workflow.setTasks(Arrays.asList(task_1_3_1,task_1_3_2));
tasks = workflowExecutor.getFailedTasksToRetry(workflow);
assertEquals(1, tasks.size());
assertEquals(task_1_3_2.getTaskId(), tasks.get(0).getTaskId());

//3 tasks with definitions and reference numbers
workflow.setTasks(Arrays.asList(task_1_1,task_1_2, task_1_3_1, task_1_3_2, task_2_1, task_2_2, task_3_1));
tasks = workflowExecutor.getFailedTasksToRetry(workflow);
assertEquals(4, tasks.size());
assertTrue(tasks.contains(task_1_1));
assertTrue(tasks.contains(task_1_2));
assertTrue(tasks.contains(task_2_2));
assertTrue(tasks.contains(task_1_3_2));
}


@Test(expected = ApplicationException.class)
public void testRetryNonTerminalWorkflow() {
Workflow workflow = new Workflow();
workflow.setWorkflowId("testRetryNonTerminalWorkflow");
workflow.setStatus(Workflow.WorkflowStatus.COMPLETED);
when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);

workflowExecutor.retry(workflow.getWorkflowId());

}

@Test(expected = ApplicationException.class)
public void testRetryWorkflowNoTasks() {
Workflow workflow = new Workflow();
workflow.setWorkflowId("ApplicationException");
workflow.setStatus(Workflow.WorkflowStatus.FAILED);
workflow.setTasks(new ArrayList());
when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);

workflowExecutor.retry(workflow.getWorkflowId());
}

@Test
public void testRetryWorkflow() {
//setup
Workflow workflow = new Workflow();
workflow.setWorkflowId("testRetryWorkflowId");
workflow.setWorkflowType("testRetryWorkflowId");
workflow.setOwnerApp("junit_testRetryWorkflowId");
workflow.setStartTime(10L);
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);
workflow.setStatus(Workflow.WorkflowStatus.FAILED);

AtomicInteger updateWorkflowCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateWorkflowCalledCounter.incrementAndGet();
return null;
}).when(executionDAO).updateWorkflow(any());

AtomicInteger updateTasksCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateTasksCalledCounter.incrementAndGet();
return null;
}).when(executionDAO).updateTasks(any());

AtomicInteger updateTasksAlledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateTasksCalledCounter.incrementAndGet();
return null;
}).when(executionDAO).updateTask(any());

AtomicInteger removeQueueEntryCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
removeQueueEntryCalledCounter.incrementAndGet();
return null;
}).when(queueDAO).remove(anyString(), anyString());

// add 2 failed task in 2 forks and 1 cancelled in the 3rd fork
Task task_1_1 = new Task();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
task_1_1.setTaskType(Type.SIMPLE.toString());
task_1_1.setStatus(Status.CANCELED);
task_1_1.setTaskDefName("task1");
task_1_1.setReferenceTaskName("task1_ref1");

Task task_1_2 = new Task();
task_1_2.setTaskId(UUID.randomUUID().toString());
task_1_2.setSeq(21);
task_1_2.setTaskType(Type.SIMPLE.toString());
task_1_2.setStatus(Status.FAILED);
task_1_2.setTaskDefName("task1");
task_1_2.setReferenceTaskName("task1_ref1");

Task task_2_1 = new Task();
task_2_1.setTaskId(UUID.randomUUID().toString());
task_2_1.setSeq(22);
task_2_1.setStatus(Status.FAILED);
task_2_1.setTaskType(Type.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setReferenceTaskName("task2_ref1");


Task task_3_1 = new Task();
task_3_1.setTaskId(UUID.randomUUID().toString());
task_3_1.setSeq(23);
task_3_1.setStatus(Status.CANCELED);
task_3_1.setTaskType(Type.SIMPLE.toString());
task_3_1.setTaskDefName("task3");
task_3_1.setReferenceTaskName("task3_ref1");

Task task_4_1 = new Task();
task_4_1.setTaskId(UUID.randomUUID().toString());
task_4_1.setSeq(122);
task_4_1.setStatus(Status.FAILED);
task_4_1.setTaskType(Type.SIMPLE.toString());
task_4_1.setTaskDefName("task1");
task_4_1.setReferenceTaskName("task4_refABC");

workflow.setTasks(Arrays.asList(task_1_1,task_1_2, task_2_1, task_3_1, task_4_1));
//end of setup

//when:
when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
WorkflowDef workflowDef = new WorkflowDef();
when(metadataDAO.get(anyString(), anyInt())).thenReturn(workflowDef);

workflowExecutor.retry(workflow.getWorkflowId());

assertEquals(Workflow.WorkflowStatus.COMPLETED, workflow.getStatus());
assertEquals(2, updateWorkflowCalledCounter.get());
assertEquals(7, updateTasksCalledCounter.get());
assertEquals(1, removeQueueEntryCalledCounter.get());


}
}

0 comments on commit 854bcbc

Please sign in to comment.