Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1994 from Netflix/ensure_cancel_task_terminate_wo…
Browse files Browse the repository at this point in the history
…rkflow

ensure cancel tasks when workflow is terminated
  • Loading branch information
apanicker-nflx authored Dec 3, 2020
2 parents e422baf + 38c56f5 commit 954c9a8
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -732,30 +732,11 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
try {
// Remove from the task queue if they were there
tasks.forEach(task -> queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()));
// Remove from the sweep queue
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.warn("Error removing the message(s) from queue during Workflow termination", e);
LOGGER.warn("Error removing task(s) from queue during workflow termination : {}", workflowId, e);
}

// Update non-terminal tasks' status to CANCELED
for (Task task : tasks) {
if (!task.getStatus().isTerminal()) {
// Cancel the ones which are not completed yet....
task.setStatus(CANCELED);
if (isSystemTask.test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
try {
workflowSystemTask.cancel(workflow, task, this);
} catch (Exception e) {
throw new ApplicationException(Code.INTERNAL_ERROR,
String.format("Error canceling system task: %s/%s", workflowSystemTask.getName(),
task.getTaskId()), e);
}
}
executionDAOFacade.updateTask(task);
}
}
List<String> erroredTasks = cancelNonTerminalTasks(workflow);

if (workflow.getParentWorkflowId() != null) {
updateParentWorkflowTask(workflow);
Expand Down Expand Up @@ -799,6 +780,11 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowTerminated(workflow);
}

if (!erroredTasks.isEmpty()) {
throw new ApplicationException(Code.INTERNAL_ERROR, String.format("Error canceling system tasks: %s",
String.join(",", erroredTasks)));
}
} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -1000,6 +986,9 @@ public boolean decide(String workflowId) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);

if (workflow.getStatus().isTerminal()) {
if (!workflow.getStatus().isSuccessful()) {
cancelNonTerminalTasks(workflow);
}
return true;
}

Expand Down Expand Up @@ -1096,7 +1085,7 @@ public boolean decide(String workflowId) {
*/
private void skipTasksAffectedByTerminateTask(Workflow workflow) {
if(!workflow.getStatus().isTerminal()) {
List<Task> tasksToBeUpdated = new ArrayList<Task>();
List<Task> tasksToBeUpdated = new ArrayList<>();
for(Task workflowTask : workflow.getTasks()) {
if(!workflowTask.getStatus().isTerminal()) {
workflowTask.setStatus(SKIPPED);
Expand All @@ -1118,6 +1107,37 @@ private void skipTasksAffectedByTerminateTask(Workflow workflow) {
}
}

@VisibleForTesting
List<String> cancelNonTerminalTasks(Workflow workflow) {
List<String> erroredTasks = new ArrayList<>();
// Update non-terminal tasks' status to CANCELED
for (Task task : workflow.getTasks()) {
if (!task.getStatus().isTerminal()) {
// Cancel the ones which are not completed yet....
task.setStatus(CANCELED);
if (isSystemTask.test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
try {
workflowSystemTask.cancel(workflow, task, this);
} catch (Exception e) {
erroredTasks.add(task.getReferenceTaskName());
LOGGER.error("Error canceling system task:{}/{} in workflow: {}",
workflowSystemTask.getName(), task.getTaskId(), workflow.getWorkflowId(), e);
}
}
executionDAOFacade.updateTask(task);
}
}
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
}
return erroredTasks;
}

@VisibleForTesting
List<Task> dedupAndAddTasks(Workflow workflow, List<Task> tasks) {
List<String> tasksInWorkflow = workflow.getTasks().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper;
import com.netflix.conductor.core.execution.mapper.WaitTaskMapper;
import com.netflix.conductor.core.execution.tasks.Lambda;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
Expand Down Expand Up @@ -128,6 +130,9 @@ public void init() {
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("LAMBDA", new LambdaTaskMapper(parametersUtils, metadataDAO));

new SubWorkflow(new JsonMapperProvider().get());
new Lambda();

DeciderService deciderService = new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, taskMappers, config);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService,
Expand Down Expand Up @@ -1492,6 +1497,38 @@ public void testScheduleNextIteration() {
verify(executionDAOFacade).getTaskPollDataByDomain("TEST", "domain1");
}

@Test
public void testCancelNonTerminalTasks() {
Workflow workflow = generateSampleWorkflow();

Task subWorkflowTask = new Task();
subWorkflowTask.setTaskId(UUID.randomUUID().toString());
subWorkflowTask.setTaskType(TaskType.SUB_WORKFLOW.name());
subWorkflowTask.setStatus(Status.IN_PROGRESS);

Task lambdaTask = new Task();
lambdaTask.setTaskId(UUID.randomUUID().toString());
lambdaTask.setTaskType(TaskType.LAMBDA.name());
lambdaTask.setStatus(Status.SCHEDULED);

Task simpleTask = new Task();
simpleTask.setTaskId(UUID.randomUUID().toString());
simpleTask.setTaskType(TaskType.SIMPLE.name());
simpleTask.setStatus(Status.COMPLETED);

workflow.getTasks().addAll(Arrays.asList(subWorkflowTask, lambdaTask, simpleTask));

List<String> erroredTasks = workflowExecutor.cancelNonTerminalTasks(workflow);
assertTrue(erroredTasks.isEmpty());
ArgumentCaptor<Task> argumentCaptor = ArgumentCaptor.forClass(Task.class);
verify(executionDAOFacade, times(2)).updateTask(argumentCaptor.capture());
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(TaskType.SUB_WORKFLOW.name(), argumentCaptor.getAllValues().get(0).getTaskType());
assertEquals(Status.CANCELED, argumentCaptor.getAllValues().get(0).getStatus());
assertEquals(TaskType.LAMBDA.name(), argumentCaptor.getAllValues().get(1).getTaskType());
assertEquals(Status.CANCELED, argumentCaptor.getAllValues().get(1).getStatus());
}

private Workflow generateSampleWorkflow() {
//setup
Workflow workflow = new Workflow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class QueueResiliencySpec extends Specification {
workflowResource.terminate(workflowInstanceId, "Terminated from a test")

then: "Verify that terminate is successful without any exceptions"
1 * queueDAO.remove(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue remove failed from Spy") }
2 * queueDAO.remove(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue remove failed from Spy") }
0 * queueDAO._
with(workflowResource.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.TERMINATED
Expand Down

0 comments on commit 954c9a8

Please sign in to comment.