Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Optimizing reading from datastore during WorkflowSweeper#sweep (#3816)
Browse files Browse the repository at this point in the history
* Optimizing reading from datastore during WorkflowSweeper#sweep
* Added javadoc to decideWithLock method
---------

Co-authored-by: Boyan Georgiev <[email protected]>
  • Loading branch information
wildMythicWest and Boyan Georgiev authored Nov 13, 2023
1 parent b3a1196 commit 9669d46
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,32 @@ public WorkflowModel decide(String workflowId) {
}
}

/**
* This method overloads the {@link #decide(String)}. It will acquire a lock and evaluate the
* state of the workflow.
*
* @param workflow the workflow to evaluate the state for
* @return the workflow
*/
public WorkflowModel decideWithLock(WorkflowModel workflow) {
if (workflow == null) {
return null;
}
StopWatch watch = new StopWatch();
watch.start();
if (!executionLockService.acquireLock(workflow.getWorkflowId())) {
return null;
}
try {
return decide(workflow);

} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
watch.stop();
Monitors.recordWorkflowDecisionTime(watch.getTime());
}
}

/**
* @param workflow the workflow to evaluate the state for
* @return true if the workflow has completed (success or failed), false otherwise. Note: This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public void verifyAndRepairWorkflowTasks(String workflowId) {
() ->
new NotFoundException(
"Could not find workflow: " + workflowId));
verifyAndRepairWorkflowTasks(workflow);
}

/** Verify and repair tasks in a workflow. */
public void verifyAndRepairWorkflowTasks(WorkflowModel workflow) {
workflow.getTasks().forEach(this::verifyAndRepairTask);
// repair the parent workflow if needed
verifyAndRepairWorkflow(workflow.getParentWorkflowId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.WorkflowContext;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
Expand All @@ -48,6 +49,7 @@ public class WorkflowSweeper {
private final WorkflowExecutor workflowExecutor;
private final WorkflowRepairService workflowRepairService;
private final QueueDAO queueDAO;
private final ExecutionDAOFacade executionDAOFacade;

private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName();

Expand All @@ -56,10 +58,12 @@ public WorkflowSweeper(
WorkflowExecutor workflowExecutor,
Optional<WorkflowRepairService> workflowRepairService,
ConductorProperties properties,
QueueDAO queueDAO) {
QueueDAO queueDAO,
ExecutionDAOFacade executionDAOFacade) {
this.properties = properties;
this.queueDAO = queueDAO;
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.workflowRepairService = workflowRepairService.orElse(null);
LOGGER.info("WorkflowSweeper initialized.");
}
Expand All @@ -77,12 +81,14 @@ public void sweep(String workflowId) {
WorkflowContext.set(workflowContext);
LOGGER.debug("Running sweeper for workflow {}", workflowId);

workflow = executionDAOFacade.getWorkflowModel(workflowId, true);

if (workflowRepairService != null) {
// Verify and repair tasks in the workflow.
workflowRepairService.verifyAndRepairWorkflowTasks(workflowId);
workflowRepairService.verifyAndRepairWorkflowTasks(workflow);
}

workflow = workflowExecutor.decide(workflowId);
workflow = workflowExecutor.decideWithLock(workflow);
if (workflow != null && workflow.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -42,6 +43,7 @@ public class TestWorkflowSweeper {
private WorkflowExecutor workflowExecutor;
private WorkflowRepairService workflowRepairService;
private QueueDAO queueDAO;
private ExecutionDAOFacade executionDAOFacade;
private WorkflowSweeper workflowSweeper;

private int defaultPostPoneOffSetSeconds = 1800;
Expand All @@ -52,9 +54,14 @@ public void setUp() {
workflowExecutor = mock(WorkflowExecutor.class);
queueDAO = mock(QueueDAO.class);
workflowRepairService = mock(WorkflowRepairService.class);
executionDAOFacade = mock(ExecutionDAOFacade.class);
workflowSweeper =
new WorkflowSweeper(
workflowExecutor, Optional.of(workflowRepairService), properties, queueDAO);
workflowExecutor,
Optional.of(workflowRepairService),
properties,
queueDAO,
executionDAOFacade);
}

@Test
Expand Down

0 comments on commit 9669d46

Please sign in to comment.