Skip to content

Commit

Permalink
FM-751 Split OffsetEvaluationStrategy and implementations
Browse files Browse the repository at this point in the history
- goal: cleaner goals, separated configuration and implementation
  aspects
- we can directly inject ConductorProperties into implementations
  of strategies that are represented by Spring components
- introduction of TaskOffsetEvaluationSelector that allows
  other component to load implementation of specific strategy
  • Loading branch information
jaro0149 committed Nov 13, 2024
1 parent 4f4478c commit 9f92dd7
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.springframework.util.unit.DataUnit;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy;

@ConfigurationProperties("conductor.app")
public class ConductorProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,25 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;
package com.netflix.conductor.core.config;

/**
* Strategies used for computation of the task offset. The offset is used to postpone the task
* execution in the queue.
*/
public enum OffsetEvaluationStrategy {
/**
* Constant offset evaluation strategy - using default offset value.
*
* @see ConstantDefaultOffsetEvaluation
*/
CONSTANT_DEFAULT_OFFSET(new ConstantDefaultOffsetEvaluation()),
/** Constant offset evaluation strategy - using default offset value. */
CONSTANT_DEFAULT_OFFSET,
/**
* Computes the evaluation offset for a postponed task based on the task's poll count and a
* default offset. In this strategy offset increases exponentially until it reaches the default
* offset.
*
* @see BackoffToDefaultOffsetEvaluation
*/
BACKOFF_TO_DEFAULT_OFFSET(new BackoffToDefaultOffsetEvaluation()),
BACKOFF_TO_DEFAULT_OFFSET,
/**
* Computes the evaluation offset for a postponed task based on the queue size and the task's
* poll count. In this strategy offset increases exponentially until it reaches the (default
* offset * queue size) value.
*
* @see ScaledByQueueSizeOffsetEvaluation
*/
SCALED_BY_QUEUE_SIZE(new ScaledByQueueSizeOffsetEvaluation());

private final TaskOffsetEvaluation taskOffsetEvaluation;

OffsetEvaluationStrategy(final TaskOffsetEvaluation taskOffsetEvaluation) {
this.taskOffsetEvaluation = taskOffsetEvaluation;
}

/**
* Get the task offset evaluation strategy.
*
* @return {@link TaskOffsetEvaluation}
*/
public TaskOffsetEvaluation getTaskOffsetEvaluation() {
return taskOffsetEvaluation;
}
SCALED_BY_QUEUE_SIZE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.config.OffsetEvaluationStrategy;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy;
import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
Expand All @@ -37,7 +38,7 @@ public class AsyncSystemTaskExecutor {
private final QueueDAO queueDAO;
private final MetadataDAO metadataDAO;
private final long queueTaskMessagePostponeSecs;
private final long systemTaskCallbackTime;
private final TaskOffsetEvaluationSelector taskOffsetEvaluationSelector;
private final WorkflowExecutor workflowExecutor;
private final Map<TaskType, OffsetEvaluationStrategy> systemTaskOffsetEvaluation;

Expand All @@ -47,14 +48,14 @@ public AsyncSystemTaskExecutor(
ExecutionDAOFacade executionDAOFacade,
QueueDAO queueDAO,
MetadataDAO metadataDAO,
TaskOffsetEvaluationSelector taskOffsetEvaluationSelector,
ConductorProperties conductorProperties,
WorkflowExecutor workflowExecutor) {
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
this.metadataDAO = metadataDAO;
this.taskOffsetEvaluationSelector = taskOffsetEvaluationSelector;
this.workflowExecutor = workflowExecutor;
this.systemTaskCallbackTime =
conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds();
this.queueTaskMessagePostponeSecs =
conductorProperties.getTaskExecutionPostponeDuration().getSeconds();
this.systemTaskOffsetEvaluation = conductorProperties.getSystemTaskOffsetEvaluation();
Expand Down Expand Up @@ -170,14 +171,14 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
hasTaskExecutionCompleted = true;
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
final long callbackAfterSeconds =
systemTaskOffsetEvaluation
.getOrDefault(
TaskType.of(task.getTaskType()),
OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET)
.getTaskOffsetEvaluation()
.computeEvaluationOffset(
task, systemTaskCallbackTime, queueDAO.getSize(queueName));
final var evaluationStrategy =
systemTaskOffsetEvaluation.getOrDefault(
TaskType.of(task.getTaskType()),
OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET);
final var callbackAfterSeconds =
taskOffsetEvaluationSelector
.taskOffsetEvaluation(evaluationStrategy)
.computeEvaluationOffset(task, queueDAO.getSize(queueName));
task.setCallbackAfterSeconds(callbackAfterSeconds);
queueDAO.postpone(
queueName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
*/
package com.netflix.conductor.core.execution.offset;

import org.springframework.stereotype.Component;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.config.OffsetEvaluationStrategy;
import com.netflix.conductor.model.TaskModel;

/**
Expand All @@ -31,11 +35,22 @@
* <tr><td>5</td><td>10</td><td>10</td></tr>
* </table>
*/
@Component
final class BackoffToDefaultOffsetEvaluation implements TaskOffsetEvaluation {

private final long defaultOffset;

BackoffToDefaultOffsetEvaluation(final ConductorProperties conductorProperties) {
defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds();
}

@Override
public OffsetEvaluationStrategy type() {
return OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET;
}

@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) {
final int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,29 @@
*/
package com.netflix.conductor.core.execution.offset;

import org.springframework.stereotype.Component;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.config.OffsetEvaluationStrategy;
import com.netflix.conductor.model.TaskModel;

/** Dummy implementation of {@link TaskOffsetEvaluation} that always returns the default offset. */
@Component
final class ConstantDefaultOffsetEvaluation implements TaskOffsetEvaluation {

private final long defaultOffset;

ConstantDefaultOffsetEvaluation(final ConductorProperties conductorProperties) {
defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds();
}

@Override
public OffsetEvaluationStrategy type() {
return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET;
}

@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) {
return defaultOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
*/
package com.netflix.conductor.core.execution.offset;

import org.springframework.stereotype.Component;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.config.OffsetEvaluationStrategy;
import com.netflix.conductor.model.TaskModel;

/**
Expand All @@ -33,11 +37,22 @@
* <tr><td>4</td><td>5</td><td>2</td><td>8</td></tr>
* </table>
*/
@Component
final class ScaledByQueueSizeOffsetEvaluation implements TaskOffsetEvaluation {

private final long defaultOffset;

ScaledByQueueSizeOffsetEvaluation(final ConductorProperties conductorProperties) {
defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds();
}

@Override
public OffsetEvaluationStrategy type() {
return OffsetEvaluationStrategy.SCALED_BY_QUEUE_SIZE;
}

@Override
public long computeEvaluationOffset(
final TaskModel taskModel, final long defaultOffset, final int queueSize) {
public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@
*/
package com.netflix.conductor.core.execution.offset;

import com.netflix.conductor.core.config.OffsetEvaluationStrategy;
import com.netflix.conductor.model.TaskModel;

/** Service used for computation of the evaluation offset for the postponed task. */
public sealed interface TaskOffsetEvaluation
permits BackoffToDefaultOffsetEvaluation,
ConstantDefaultOffsetEvaluation,
ScaledByQueueSizeOffsetEvaluation {
public interface TaskOffsetEvaluation {
/**
* Get the type of the offset evaluation strategy.
*
* @return @{@link OffsetEvaluationStrategy}
*/
OffsetEvaluationStrategy type();

/**
* Compute the evaluation offset for the postponed task.
*
* @param taskModel details about the postponed task
* @param defaultOffset the default offset provided by the configuration properties [seconds]
* @param queueSize the actual size of the queue before the task is postponed
* @return the computed evaluation offset [seconds]
*/
long computeEvaluationOffset(TaskModel taskModel, long defaultOffset, int queueSize);
long computeEvaluationOffset(TaskModel taskModel, int queueSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.offset;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

import com.netflix.conductor.core.config.OffsetEvaluationStrategy;

@Component
public final class TaskOffsetEvaluationSelector {

private final Map<OffsetEvaluationStrategy, TaskOffsetEvaluation> evaluations;

@Autowired
public TaskOffsetEvaluationSelector(final List<TaskOffsetEvaluation> evaluations) {
this.evaluations =
evaluations.stream()
.collect(Collectors.toMap(TaskOffsetEvaluation::type, Function.identity()));
}

/**
* Get the implementation of the offset evaluation for the given strategy.
*
* @param strategy the strategy to get the implementation for
* @return {@link TaskOffsetEvaluation}
* @throws IllegalStateException if no implementation is found for the given strategy
*/
@NonNull
public TaskOffsetEvaluation taskOffsetEvaluation(final OffsetEvaluationStrategy strategy) {
final var taskOffsetEvaluation = evaluations.get(strategy);
if (taskOffsetEvaluation == null) {
throw new IllegalStateException(
"No TaskOffsetEvaluation found for strategy: " + strategy);
}
return taskOffsetEvaluation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import java.time.Duration

import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.core.config.ConductorProperties
import com.netflix.conductor.core.config.OffsetEvaluationStrategy
import com.netflix.conductor.core.dal.ExecutionDAOFacade
import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluation
import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector
import com.netflix.conductor.core.execution.tasks.SubWorkflow
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask
import com.netflix.conductor.core.operation.StartWorkflowOperation
Expand Down Expand Up @@ -61,7 +64,25 @@ class AsyncSystemTaskExecutorTest extends Specification {
properties.taskExecutionPostponeDuration = Duration.ofSeconds(1)
properties.systemTaskWorkerCallbackDuration = Duration.ofSeconds(1)

executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, properties, workflowExecutor)
def offsetEvaluationStrategySelector = createTaskOffsetEvaluationSelector(
properties.systemTaskWorkerCallbackDuration.toSeconds())
executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO,
offsetEvaluationStrategySelector, properties, workflowExecutor)
}

private static TaskOffsetEvaluationSelector createTaskOffsetEvaluationSelector(final long offset) {
def offsetEvaluation = new TaskOffsetEvaluation() {
@Override
OffsetEvaluationStrategy type() {
return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET
}

@Override
long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) {
return offset
}
}
return new TaskOffsetEvaluationSelector(List.of(offsetEvaluation))
}

// this is not strictly a unit test, but its essential to test AsyncSystemTaskExecutor with SubWorkflow
Expand Down Expand Up @@ -238,8 +259,6 @@ class AsyncSystemTaskExecutorTest extends Specification {
taskDefName: "taskDefName", workflowPriority: 10)
WorkflowModel workflow = new WorkflowModel(workflowId: workflowId, status: WorkflowModel.Status.RUNNING)
String queueName = QueueUtils.getQueueName(task)
workflowSystemTask.getEvaluationOffset(task, 1) >> Optional.empty();


when:
executor.execute(workflowSystemTask, taskId)
Expand Down
Loading

0 comments on commit 9f92dd7

Please sign in to comment.