Skip to content

Commit

Permalink
Sweeper fix - add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
meggarr committed Dec 22, 2023
1 parent a0ab2b3 commit 29d7a58
Showing 1 changed file with 52 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;

import io.orkes.conductor.metrics.MetricsCollector;

Expand All @@ -50,21 +51,24 @@ public class OrkesWorkflowSweepWorker {
private final QueueDAO queueDAO;
private final ConductorProperties properties;
private final WorkflowExecutor workflowExecutor;
private final ExecutionDAOFacade executionDAOFacade;
private final ExecutionDAO executionDAO;
private final MetricsCollector metricsCollector;
private final SystemTaskRegistry systemTaskRegistry;
private final ExecutionLockService executionLockService;

public OrkesWorkflowSweepWorker(
QueueDAO queueDAO,
WorkflowExecutor workflowExecutor,
ExecutionDAOFacade executionDAOFacade,
ExecutionDAO executionDAO,
MetricsCollector metricsCollector,
SystemTaskRegistry systemTaskRegistry,
ExecutionLockService executionLockService,
ConductorProperties properties) {
this.queueDAO = queueDAO;
this.executionDAOFacade = executionDAOFacade;
this.executionDAO = executionDAO;
this.metricsCollector = metricsCollector;
this.systemTaskRegistry = systemTaskRegistry;
this.executionLockService = executionLockService;
this.properties = properties;
this.workflowExecutor = workflowExecutor;
}
Expand All @@ -76,11 +80,25 @@ public CompletableFuture<Void> sweepAsync(String workflowId) {
}

private void sweep(String workflowId) {
boolean workflowLocked = false;
try {
log.info("Running sweeper for workflow {}", workflowId);
workflowLocked = executionLockService.acquireLock(workflowId);
if (!workflowLocked) {
return;
}
log.info("Running sweeper for workflow {}, acquired lock", workflowId);
// 1. Run decide on the workflow
WorkflowModel workflow = decideAndRemove(workflowId);
WorkflowModel workflow = executionDAO.getWorkflow(workflowId, true);
if (workflow == null) {
throw new NotFoundException(
String.format("Workflow NOT found by id: %s", workflowId));
}
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
log.debug(
"Repair/decide result for workflow {} - {}",
workflowId,
workflow == null ? null : workflow.getStatus());
if (workflow == null) {
// The workflow does not exist anymore, possible if it was completed and
// archived
Expand Down Expand Up @@ -109,32 +127,43 @@ private void sweep(String workflowId) {
"Workflow {} doesn't have an open pending task, requires force evaluation",
workflow.getWorkflowId());
forceSetLastTaskAsNotExecuted(workflow);
workflow = decideAndRemove(workflowId);
log.debug(
"Force evaluation result for workflow {} - {}",
workflowId,
workflow == null ? null : workflow.getStatus());
// Decide again after setting isExecuted to false
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
log.warn(
"Removing from decider after repair is done, {}, {}",
workflowId,
(workflow == null ? null : workflow.getStatus()));
queueDAO.remove(DECIDER_QUEUE, workflowId);
return;
}
log.debug(
"Force evaluation result for workflow {} - {}",
workflowId,
workflow.getStatus());
}

// 3. If parent workflow exists, call repair on that too - meaning ensure the parent is
// in the decider queue
if (workflow.getParentWorkflowId() != null) {
ensureWorkflowExistsInDecider(workflow.getParentWorkflowId());
}

// 4. TODO: Don't do this now - Check the min timeout for all running tasks and set
// Math.min(minTime, 1 hour) for decider queue
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis());
} catch (NotFoundException e) {
log.warn("Workflow NOT found for id: {}. Removed it from decider queue", workflowId, e);
queueDAO.remove(DECIDER_QUEUE, workflowId);
log.info("Workflow NOT found for id:{}. Removed it from decider queue", workflowId, e);
return;
} catch (Exception e) {
log.error("Error running sweep for " + workflowId, e);
log.error("Error running sweep for workflow {}", workflowId, e);
} finally {
if (workflowLocked) {
executionLockService.releaseLock(workflowId);
log.debug("Sweeper released lock for workflow {}", workflowId);
}
}

// 4. TODO: Don't do this now - Check the min timeout for all running tasks and set
// Math.min(minTime, 1 hour) for decider queue
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis());
}

private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) {
Expand All @@ -145,7 +174,7 @@ private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) {
taskModel.getTaskId(),
taskModel.getWorkflowInstanceId());
taskModel.setExecuted(false);
executionDAOFacade.updateTask(taskModel);
executionDAO.updateWorkflow(workflow);
}
}

Expand All @@ -159,14 +188,13 @@ private List<TaskModel> getAllPendingTasks(WorkflowModel workflow) {
}

/** Decide with lock and remove terminal workflow from <code>DECIDER_QUEUE</code> */
private WorkflowModel decideAndRemove(String workflowId) {
WorkflowModel workflowModel = executionDAOFacade.getWorkflowModel(workflowId, true);
workflowModel = workflowExecutor.decideWithLock(workflowModel);
private WorkflowModel decideAndRemove(WorkflowModel workflow) {
WorkflowModel workflowModel = workflowExecutor.decide(workflow);
if (workflowModel == null) {
return null;
}
if (workflowModel.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
queueDAO.remove(DECIDER_QUEUE, workflowModel.getWorkflowId());
}
return workflowModel;
}
Expand Down

0 comments on commit 29d7a58

Please sign in to comment.