Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Support for task resiliency on persistence features, and tests to ver…
Browse files Browse the repository at this point in the history
…ify the same using AOP based failure injection.

Added the configuration to control WorkflowRepairService from the sweeper service.
  • Loading branch information
kishorebanala committed Sep 11, 2020
1 parent 289ca1c commit 114a91d
Show file tree
Hide file tree
Showing 21 changed files with 551 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.netflix.conductor.common.constraints;


import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Interceptor intended for failure injection during unit / integration testing.
*/
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD)
public @interface FaultInjectionInterceptor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public interface Configuration {

String EVENT_QUEUE_POLL_SCHEDULER_THREAD_COUNT_PROPERTY_NAME = "workflow.event.queue.scheduler.poll.thread.count";

String WORKFLOW_REPAIR_SERVICE_ENABLED = "workflow.repairservice.enabled";
boolean WORKFLOW_REPAIR_SERVICE_ENABLED_DEFAULT_VALUE = false;

//TODO add constants for input/output external payload related properties.

default DB getDB() {
Expand Down Expand Up @@ -376,6 +379,17 @@ default int getEventSchedulerPollThreadCount()
return getIntProperty(EVENT_QUEUE_POLL_SCHEDULER_THREAD_COUNT_PROPERTY_NAME, Runtime.getRuntime().availableProcessors());
}

/**
* Configuration to enable {@link com.netflix.conductor.core.execution.WorkflowRepairService}, that tries to keep
* ExecutionDAO and QueueDAO in sync, based on the task or workflow state.
*
* This is disabled by default; To enable, the Queueing layer must implement QueueDAO.containsMessage method.
* @return
*/
default boolean isWorkflowRepairServiceEnabled() {
return getBooleanProperty(WORKFLOW_REPAIR_SERVICE_ENABLED, WORKFLOW_REPAIR_SERVICE_ENABLED_DEFAULT_VALUE);
}

/**
* @param name Name of the property
* @param defaultValue Default value when not specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,6 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
executionDAOFacade.updateWorkflow(workflow);

List<Task> tasks = workflow.getTasks();
// Remove from the task queue if they were there
tasks.forEach(task -> queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()));

// Update non-terminal tasks' status to CANCELED
for (Task task : tasks) {
Expand Down Expand Up @@ -776,7 +774,6 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
}
executionDAOFacade.updateWorkflow(workflow);
}
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
executionDAOFacade.removeFromPendingWorkflow(workflow.getWorkflowName(), workflow.getWorkflowId());

// Send to atlas
Expand All @@ -785,6 +782,11 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowTerminated(workflow);
}

//remove from the sweep queue
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
// Remove from the task queue if they were there
tasks.forEach(task -> queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()));
} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -1018,7 +1020,7 @@ public boolean decide(String workflowId) {
if(TaskType.SUB_WORKFLOW.name().equals(workflowTask.getTaskType()) && StringUtils.isNotBlank(workflowTask.getSubWorkflowId())) {
Workflow subWorkflow = executionDAOFacade.getWorkflowById(workflowTask.getSubWorkflowId(), true);
if(subWorkflow != null) {
skipTasksAffectedByTerminateTask(subWorkflow);
skipTasksAffectedByTerminateTask(subWorkflow);
}
}
}
Expand Down Expand Up @@ -1072,7 +1074,7 @@ public boolean decide(String workflowId) {
}

/**
* When a TERMINATE task runs, it only affects the workflow in which it runs; it does not do anything with
* When a TERMINATE task runs, it only affects the workflow in which it runs; it does not do anything with
* in-progress tasks and subworkflows that are still running. This recursive method will ensure that all tasks within
* all subworkflows are set to SKIPPED status so they can complete.
* @param workflow a subworkflow within the hierarchy of the original workflow containing the TERMINATE task
Expand Down Expand Up @@ -1377,6 +1379,8 @@ private long getTaskDuration(long s, Task task) {
@VisibleForTesting
boolean scheduleTask(Workflow workflow, List<Task> tasks) {
List<Task> createdTasks;
List<Task> tasksToBeQueued;
boolean startedSystemTasks = false;

try {
if (tasks == null || tasks.isEmpty()) {
Expand All @@ -1402,12 +1406,10 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
.filter(isSystemTask)
.collect(Collectors.toList());

List<Task> tasksToBeQueued = createdTasks.stream()
tasksToBeQueued = createdTasks.stream()
.filter(isSystemTask.negate())
.collect(Collectors.toList());

boolean startedSystemTasks = false;

// Traverse through all the system tasks, start the sync tasks, in case of async queue the tasks
for (Task task : systemTasks) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
Expand All @@ -1432,9 +1434,6 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
tasksToBeQueued.add(task);
}
}

addTaskToQueue(tasksToBeQueued);
return startedSystemTasks;
} catch (Exception e) {
List<String> taskIds = tasks.stream()
.map(Task::getTaskId)
Expand All @@ -1447,6 +1446,19 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
// rollbackTasks(workflow.getWorkflowId(), createdTasks);
throw new TerminateWorkflowException(errorMsg);
}

// On addTaskToQueue failures, ignore the exceptions and let WorkflowRepairService take care of republishing the messages to the queue.
try {
addTaskToQueue(tasksToBeQueued);
} catch (Exception e) {
List<String> taskIds = tasksToBeQueued.stream()
.map(Task::getTaskId)
.collect(Collectors.toList());
String errorMsg = String.format("Error pushing tasks to the queue: %s, for workflow: %s", taskIds, workflow.getWorkflowId());
LOGGER.warn(errorMsg, e);
Monitors.error(className, "scheduleTask");
}
return startedSystemTasks;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

/**
* A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the
* task or workflow state.
*
* This service expects that the underlying Queueing layer implements QueueDAO.containsMessage method. This can be controlled
* with Configuration.isWorkflowRepairServiceEnabled() property.
*/
public class WorkflowRepairService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);

private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final Configuration configuration;

private final Predicate<Task> isSystemTask = task -> WorkflowSystemTask.is(task.getTaskType());

@Inject
public WorkflowRepairService(
ExecutionDAO executionDAO,
QueueDAO queueDAO,
Configuration configuration
) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.configuration = configuration;
}

/**
* Verify and repair if the workflowId exists in deciderQueue, and then if each scheduled task has relevant message
* in the queue.
* @param workflowId
* @param includeTasks
* @return
*/
public boolean verifyAndRepairWorkflow(String workflowId, boolean includeTasks) {
Workflow workflow = executionDAO.getWorkflow(workflowId, includeTasks);
AtomicBoolean repaired = new AtomicBoolean(false);
repaired.set(verifyAndRepairDeciderQueue(workflow));
if (includeTasks) {
workflow.getTasks().forEach(task -> {
repaired.set(verifyAndRepairTask(task));
});
}
return repaired.get();
}

/**
* Verify and repair tasks in a workflow
* @param workflowId
*/
public void verifyAndRepairWorkflowTasks(String workflowId) {
Workflow workflow = executionDAO.getWorkflow(workflowId, true);
workflow.getTasks().forEach(task -> verifyAndRepairTask(task));
}

/**
* Verify and fix if Workflow decider queue contains this workflowId.
* @param workflow
* @return
*/
private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
if (!workflow.getStatus().isTerminal()) {
String queueName = WorkflowExecutor.DECIDER_QUEUE;
if (!queueDAO.containsMessage(queueName, workflow.getWorkflowId())) {
queueDAO.push(queueName, workflow.getWorkflowId(), configuration.getSweepFrequency());
Monitors.recordQueueMessageRepushFromRepairService(queueName);
return true;
}
}
return false;
}

/**
* Verify if ExecutionDAO and QueueDAO agree for the provided task.
* @param task
* @return
*/
@VisibleForTesting
protected boolean verifyAndRepairTask(Task task) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (task.getStatus().equals(Task.Status.SCHEDULED)) {
if (isSystemTask.test(task) && !workflowSystemTask.isAsync()) {
return false;
}
// Ensure QueueDAO contains this taskId
if (!queueDAO.containsMessage(task.getTaskDefName(), task.getTaskId())) {
queueDAO.push(task.getTaskDefName(), task.getTaskId(), task.getCallbackAfterSeconds());
Monitors.recordQueueMessageRepushFromRepairService(task.getTaskDefName());
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ public class WorkflowSweeper {
private static final String className = WorkflowSweeper.class.getSimpleName();

@Inject
public WorkflowSweeper(WorkflowExecutor workflowExecutor, Configuration config, QueueDAO queueDAO) {
public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService, Configuration config, QueueDAO queueDAO) {
this.config = config;
this.queueDAO = queueDAO;
this.executorThreadPoolSize = config.getIntProperty("workflow.sweeper.thread.count", 5);
if(this.executorThreadPoolSize > 0) {
this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize);
init(workflowExecutor);
init(workflowExecutor, workflowRepairService);
logger.info("Workflow Sweeper Initialized");
} else {
logger.warn("Workflow sweeper is DISABLED");
}

}

public void init(WorkflowExecutor workflowExecutor) {
public void init(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) {
ScheduledExecutorService deciderPool = Executors.newScheduledThreadPool(1);
deciderPool.scheduleWithFixedDelay(() -> {
try {
Expand All @@ -83,15 +83,15 @@ public void init(WorkflowExecutor workflowExecutor) {
int retrievedWorkflows = (workflowIds != null) ? workflowIds.size() : 0;
logger.debug("Sweeper retrieved {} workflows from the decider queue.", retrievedWorkflows);

sweep(workflowIds, workflowExecutor);
sweep(workflowIds, workflowExecutor, workflowRepairService);
} catch (Exception e) {
Monitors.error(className, "sweep");
logger.error("Error when sweeping workflow", e);
}
}, 500, 500, TimeUnit.MILLISECONDS);
}

public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) throws Exception {
public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) throws Exception {

List<Future<?>> futures = new LinkedList<>();
for (String workflowId : workflowIds) {
Expand All @@ -103,6 +103,12 @@ public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) t
if(logger.isDebugEnabled()) {
logger.debug("Running sweeper for workflow {}", workflowId);
}

if (config.isWorkflowRepairServiceEnabled()) {
// Verify and repair tasks in the workflow.
workflowRepairService.verifyAndRepairWorkflowTasks(workflowId);
}

boolean done = workflowExecutor.decide(workflowId);
if(!done) {
queueDAO.setUnackTimeout(WorkflowExecutor.DECIDER_QUEUE, workflowId, config.getSweepFrequency() * 1000);
Expand All @@ -117,6 +123,7 @@ public void sweep(List<String> workflowIds, WorkflowExecutor workflowExecutor) t
}

} catch (Exception e) {
queueDAO.setUnackTimeout(WorkflowExecutor.DECIDER_QUEUE, workflowId, config.getSweepFrequency() * 1000);
Monitors.error(className, "sweep");
logger.error("Error running sweep for " + workflowId, e);
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,14 @@ default boolean postpone(String queueName, String messageId, int priority, long
push(queueName, messageId, priority, postponeDurationInSeconds);
return true;
}

/**
* Check if the message with given messageId exists in the Queue.
* @param queueName
* @param messageId
* @return
*/
default boolean containsMessage(String queueName, String messageId) {
throw new UnsupportedOperationException("Please ensure your provided Queue implementation overrides and implements this method.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,8 @@ public static void recordSystemTaskWorkerPollingLimited(String queueName) {
public static void recordEventQueuePollSize(String queueType, int val) {
gauge(Monitors.classQualifier, "event_queue_poll", val, "queueType", queueType);
}

public static void recordQueueMessageRepushFromRepairService(String queueName) {
counter(classQualifier, "queue_message_repushed", "queueName", queueName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ public interface AdminService {
*/
List<Task> getListOfPendingTask(@NotEmpty(message = "TaskType cannot be null or empty.") String taskType,
Integer start, Integer count);

/**
* Verify that the Workflow is consistent, and run repairs as needed.
* @param workflowId
* @return
*/
boolean verifyAndRepairWorkflowConsistency(@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId);
}
Loading

0 comments on commit 114a91d

Please sign in to comment.