Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Support for task resiliency on persistence features, and tests to ver…
Browse files Browse the repository at this point in the history
…ify the same using AOP based failure injection. (#1830)

Added the configuration to control WorkflowRepairService from the sweeper service.

Replaced AOP based QueueDAO failure tests with Spock Spy's
  • Loading branch information
kishorebanala authored Sep 18, 2020
1 parent 749f922 commit 2494a33
Show file tree
Hide file tree
Showing 21 changed files with 543 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public interface Configuration {

String EVENT_QUEUE_POLL_SCHEDULER_THREAD_COUNT_PROPERTY_NAME = "workflow.event.queue.scheduler.poll.thread.count";

String WORKFLOW_REPAIR_SERVICE_ENABLED = "workflow.repairservice.enabled";
boolean WORKFLOW_REPAIR_SERVICE_ENABLED_DEFAULT_VALUE = false;

//TODO add constants for input/output external payload related properties.

default DB getDB() {
Expand Down Expand Up @@ -376,6 +379,17 @@ default int getEventSchedulerPollThreadCount()
return getIntProperty(EVENT_QUEUE_POLL_SCHEDULER_THREAD_COUNT_PROPERTY_NAME, Runtime.getRuntime().availableProcessors());
}

/**
* Configuration to enable {@link com.netflix.conductor.core.execution.WorkflowRepairService}, that tries to keep
* ExecutionDAO and QueueDAO in sync, based on the task or workflow state.
*
* This is disabled by default; To enable, the Queueing layer must implement QueueDAO.containsMessage method.
* @return
*/
default boolean isWorkflowRepairServiceEnabled() {
return getBooleanProperty(WORKFLOW_REPAIR_SERVICE_ENABLED, WORKFLOW_REPAIR_SERVICE_ENABLED_DEFAULT_VALUE);
}

/**
* @param name Name of the property
* @param defaultValue Default value when not specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,8 @@ private long getTaskDuration(long s, Task task) {
@VisibleForTesting
boolean scheduleTask(Workflow workflow, List<Task> tasks) {
List<Task> createdTasks;
List<Task> tasksToBeQueued;
boolean startedSystemTasks = false;

try {
if (tasks == null || tasks.isEmpty()) {
Expand All @@ -1408,12 +1410,10 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
.filter(isSystemTask)
.collect(Collectors.toList());

List<Task> tasksToBeQueued = createdTasks.stream()
tasksToBeQueued = createdTasks.stream()
.filter(isSystemTask.negate())
.collect(Collectors.toList());

boolean startedSystemTasks = false;

// Traverse through all the system tasks, start the sync tasks, in case of async queue the tasks
for (Task task : systemTasks) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
Expand All @@ -1438,9 +1438,6 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
tasksToBeQueued.add(task);
}
}

addTaskToQueue(tasksToBeQueued);
return startedSystemTasks;
} catch (Exception e) {
List<String> taskIds = tasks.stream()
.map(Task::getTaskId)
Expand All @@ -1453,6 +1450,19 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
// rollbackTasks(workflow.getWorkflowId(), createdTasks);
throw new TerminateWorkflowException(errorMsg);
}

// On addTaskToQueue failures, ignore the exceptions and let WorkflowRepairService take care of republishing the messages to the queue.
try {
addTaskToQueue(tasksToBeQueued);
} catch (Exception e) {
List<String> taskIds = tasksToBeQueued.stream()
.map(Task::getTaskId)
.collect(Collectors.toList());
String errorMsg = String.format("Error pushing tasks to the queue: %s, for workflow: %s", taskIds, workflow.getWorkflowId());
LOGGER.warn(errorMsg, e);
Monitors.error(className, "scheduleTask");
}
return startedSystemTasks;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

/**
* A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the
* task or workflow state.
*
* This service expects that the underlying Queueing layer implements QueueDAO.containsMessage method. This can be controlled
* with {@link com.netflix.conductor.core.config.Configuration#isWorkflowRepairServiceEnabled()} property.
*/
public class WorkflowRepairService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);

private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final Configuration configuration;

private final Predicate<Task> isSystemTask = task -> WorkflowSystemTask.is(task.getTaskType());

@Inject
public WorkflowRepairService(
ExecutionDAO executionDAO,
QueueDAO queueDAO,
Configuration configuration
) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.configuration = configuration;
}

/**
* Verify and repair if the workflowId exists in deciderQueue, and then if each scheduled task has relevant message
* in the queue.
* @param workflowId
* @param includeTasks
* @return
*/
public boolean verifyAndRepairWorkflow(String workflowId, boolean includeTasks) {
Workflow workflow = executionDAO.getWorkflow(workflowId, includeTasks);
AtomicBoolean repaired = new AtomicBoolean(false);
repaired.set(verifyAndRepairDeciderQueue(workflow));
if (includeTasks) {
workflow.getTasks().forEach(task -> {
repaired.set(verifyAndRepairTask(task));
});
}
return repaired.get();
}

/**
* Verify and repair tasks in a workflow
* @param workflowId
*/
public void verifyAndRepairWorkflowTasks(String workflowId) {
Workflow workflow = executionDAO.getWorkflow(workflowId, true);
workflow.getTasks().forEach(task -> verifyAndRepairTask(task));
}

/**
* Verify and fix if Workflow decider queue contains this workflowId.
* @param workflow
* @return
*/
private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
if (!workflow.getStatus().isTerminal()) {
String queueName = WorkflowExecutor.DECIDER_QUEUE;
if (!queueDAO.containsMessage(queueName, workflow.getWorkflowId())) {
queueDAO.push(queueName, workflow.getWorkflowId(), configuration.getSweepFrequency());
Monitors.recordQueueMessageRepushFromRepairService(queueName);
return true;
}
}
return false;
}

/**
* Verify if ExecutionDAO and QueueDAO agree for the provided task.
* @param task
* @return
*/
@VisibleForTesting
protected boolean verifyAndRepairTask(Task task) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (task.getStatus() == Task.Status.SCHEDULED) {
if (isSystemTask.test(task) && !workflowSystemTask.isAsync()) {
return false;
}
// Ensure QueueDAO contains this taskId
String taskQueueName = QueueUtils.getQueueName(task);
if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) {
queueDAO.push(taskQueueName, task.getTaskId(), task.getCallbackAfterSeconds());
Monitors.recordQueueMessageRepushFromRepairService(task.getTaskDefName());
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ public class WorkflowSweeper {
private static final String className = WorkflowSweeper.class.getSimpleName();

@Inject
public WorkflowSweeper(WorkflowExecutor workflowExecutor, Configuration config, QueueDAO queueDAO) {
public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService, Configuration config, QueueDAO queueDAO) {
this.config = config;
this.queueDAO = queueDAO;
this.executorThreadPoolSize = config.getIntProperty("workflow.sweeper.thread.count", 5);
if(this.executorThreadPoolSize > 0) {
this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize);
init(workflowExecutor);
init(workflowExecutor, workflowRepairService);
logger.info("Workflow Sweeper Initialized");
} else {
logger.warn("Workflow sweeper is DISABLED");
}

}

public void init(WorkflowExecutor workflowExecutor) {
public void init(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) {
ScheduledExecutorService deciderPool = Executors.newScheduledThreadPool(1);
deciderPool.scheduleWithFixedDelay(() -> {
try {
Expand All @@ -83,15 +83,15 @@ public void init(WorkflowExecutor workflowExecutor) {
int retrievedWorkflows = (workflowIds != null) ? workflowIds.size() : 0;
logger.debug("Sweeper retrieved {} workflows from the decider queue.", retrievedWorkflows);

sweep(workflowIds, workflowExecutor);
sweep(workflowIds, workflowExecutor, workflowRepairService);
} catch (Exception e) {
Monitors.error(className, "sweep");
logger.error("Error when sweeping workflow", e);
}
}, 500, 500, TimeUnit.MILLISECONDS);
}

public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) throws Exception {
public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) throws Exception {

List<Future<?>> futures = new LinkedList<>();
for (String workflowId : workflowIds) {
Expand All @@ -103,6 +103,12 @@ public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) t
if(logger.isDebugEnabled()) {
logger.debug("Running sweeper for workflow {}", workflowId);
}

if (config.isWorkflowRepairServiceEnabled()) {
// Verify and repair tasks in the workflow.
workflowRepairService.verifyAndRepairWorkflowTasks(workflowId);
}

boolean done = workflowExecutor.decide(workflowId);
if(!done) {
queueDAO.setUnackTimeout(WorkflowExecutor.DECIDER_QUEUE, workflowId, config.getSweepFrequency() * 1000);
Expand All @@ -117,6 +123,7 @@ public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) t
}

} catch (Exception e) {
queueDAO.setUnackTimeout(WorkflowExecutor.DECIDER_QUEUE, workflowId, config.getSweepFrequency() * 1000);
Monitors.error(className, "sweep");
logger.error("Error running sweep for " + workflowId, e);
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,14 @@ default boolean postpone(String queueName, String messageId, int priority, long
push(queueName, messageId, priority, postponeDurationInSeconds);
return true;
}

/**
* Check if the message with given messageId exists in the Queue.
* @param queueName
* @param messageId
* @return
*/
default boolean containsMessage(String queueName, String messageId) {
throw new UnsupportedOperationException("Please ensure your provided Queue implementation overrides and implements this method.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,8 @@ public static void recordSystemTaskWorkerPollingLimited(String queueName) {
public static void recordEventQueuePollSize(String queueType, int val) {
gauge(Monitors.classQualifier, "event_queue_poll", val, "queueType", queueType);
}

public static void recordQueueMessageRepushFromRepairService(String queueName) {
counter(classQualifier, "queue_message_repushed", "queueName", queueName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ public interface AdminService {
*/
List<Task> getListOfPendingTask(@NotEmpty(message = "TaskType cannot be null or empty.") String taskType,
Integer start, Integer count);

/**
* Verify that the Workflow is consistent, and run repairs as needed.
* @param workflowId
* @return
*/
boolean verifyAndRepairWorkflowConsistency(@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.WorkflowRepairService;
import com.netflix.conductor.dao.QueueDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.validation.constraints.NotEmpty;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
Expand All @@ -46,15 +48,21 @@ public class AdminServiceImpl implements AdminService {

private final QueueDAO queueDAO;

private final WorkflowRepairService workflowRepairService;

private String version;

private String buildDate;

@Inject
public AdminServiceImpl(Configuration config, ExecutionService executionService, QueueDAO queueDAO) {
public AdminServiceImpl(Configuration config,
ExecutionService executionService,
QueueDAO queueDAO,
WorkflowRepairService workflowRepairService) {
this.config = config;
this.executionService = executionService;
this.queueDAO = queueDAO;
this.workflowRepairService = workflowRepairService;
this.version = "UNKNOWN";
this.buildDate = "UNKNOWN";

Expand Down Expand Up @@ -97,6 +105,11 @@ public List<Task> getListOfPendingTask(String taskType, Integer start, Integer c
return tasks.subList(start, total);
}

@Override
public boolean verifyAndRepairWorkflowConsistency(@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId) {
return workflowRepairService.verifyAndRepairWorkflow(workflowId, true);
}

/**
* Queue up all the running workflows for sweep.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,32 @@ public void testScheduleTaskFailure() {
workflowExecutor.scheduleTask(workflow, tasks);
}

/**
* Simulate Queue push failures and assert that scheduleTask doesn't throw an exception.
*/
@Test
public void testQueueFailuresDuringScheduleTask() {
Workflow workflow = new Workflow();
workflow.setWorkflowId("wid_01");

List<Task> tasks = new LinkedList<>();

Task task1 = new Task();
task1.setTaskType(TaskType.TASK_TYPE_SIMPLE);
task1.setTaskDefName("task_1");
task1.setReferenceTaskName("task_1");
task1.setWorkflowInstanceId(workflow.getWorkflowId());
task1.setTaskId("tid_01");
task1.setStatus(Status.SCHEDULED);
task1.setRetryCount(0);

tasks.add(task1);

when(executionDAOFacade.createTasks(tasks)).thenReturn(tasks);
doThrow(new RuntimeException()).when(queueDAO).push(anyString(), anyString(), anyInt(), anyLong());
assertFalse(workflowExecutor.scheduleTask(workflow, tasks));
}

@Test
@SuppressWarnings("unchecked")
public void testCompleteWorkflow() {
Expand Down
Loading

0 comments on commit 2494a33

Please sign in to comment.