diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 336db6c828..aff7358342 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -25,7 +25,6 @@ import org.springframework.util.unit.DataUnit; import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy; @ConfigurationProperties("conductor.app") public class ConductorProperties { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/OffsetEvaluationStrategy.java b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java similarity index 57% rename from core/src/main/java/com/netflix/conductor/core/execution/offset/OffsetEvaluationStrategy.java rename to core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java index 775a1c6ec9..9fbfa085be 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/offset/OffsetEvaluationStrategy.java +++ b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java @@ -10,48 +10,25 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.core.execution.offset; +package com.netflix.conductor.core.config; /** * Strategies used for computation of the task offset. The offset is used to postpone the task * execution in the queue. */ public enum OffsetEvaluationStrategy { - /** - * Constant offset evaluation strategy - using default offset value. - * - * @see ConstantDefaultOffsetEvaluation - */ - CONSTANT_DEFAULT_OFFSET(new ConstantDefaultOffsetEvaluation()), + /** Constant offset evaluation strategy - using default offset value. */ + CONSTANT_DEFAULT_OFFSET, /** * Computes the evaluation offset for a postponed task based on the task's poll count and a * default offset. In this strategy offset increases exponentially until it reaches the default * offset. - * - * @see BackoffToDefaultOffsetEvaluation */ - BACKOFF_TO_DEFAULT_OFFSET(new BackoffToDefaultOffsetEvaluation()), + BACKOFF_TO_DEFAULT_OFFSET, /** * Computes the evaluation offset for a postponed task based on the queue size and the task's * poll count. In this strategy offset increases exponentially until it reaches the (default * offset * queue size) value. - * - * @see ScaledByQueueSizeOffsetEvaluation - */ - SCALED_BY_QUEUE_SIZE(new ScaledByQueueSizeOffsetEvaluation()); - - private final TaskOffsetEvaluation taskOffsetEvaluation; - - OffsetEvaluationStrategy(final TaskOffsetEvaluation taskOffsetEvaluation) { - this.taskOffsetEvaluation = taskOffsetEvaluation; - } - - /** - * Get the task offset evaluation strategy. - * - * @return {@link TaskOffsetEvaluation} */ - public TaskOffsetEvaluation getTaskOffsetEvaluation() { - return taskOffsetEvaluation; - } + SCALED_BY_QUEUE_SIZE; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index 980478fdd6..8d04e01973 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -20,8 +20,9 @@ import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.core.dal.ExecutionDAOFacade; -import com.netflix.conductor.core.execution.offset.OffsetEvaluationStrategy; +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.MetadataDAO; @@ -37,7 +38,7 @@ public class AsyncSystemTaskExecutor { private final QueueDAO queueDAO; private final MetadataDAO metadataDAO; private final long queueTaskMessagePostponeSecs; - private final long systemTaskCallbackTime; + private final TaskOffsetEvaluationSelector taskOffsetEvaluationSelector; private final WorkflowExecutor workflowExecutor; private final Map systemTaskOffsetEvaluation; @@ -47,14 +48,14 @@ public AsyncSystemTaskExecutor( ExecutionDAOFacade executionDAOFacade, QueueDAO queueDAO, MetadataDAO metadataDAO, + TaskOffsetEvaluationSelector taskOffsetEvaluationSelector, ConductorProperties conductorProperties, WorkflowExecutor workflowExecutor) { this.executionDAOFacade = executionDAOFacade; this.queueDAO = queueDAO; this.metadataDAO = metadataDAO; + this.taskOffsetEvaluationSelector = taskOffsetEvaluationSelector; this.workflowExecutor = workflowExecutor; - this.systemTaskCallbackTime = - conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds(); this.queueTaskMessagePostponeSecs = conductorProperties.getTaskExecutionPostponeDuration().getSeconds(); this.systemTaskOffsetEvaluation = conductorProperties.getSystemTaskOffsetEvaluation(); @@ -170,14 +171,14 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { hasTaskExecutionCompleted = true; LOGGER.debug("{} removed from queue: {}", task, queueName); } else { - final long callbackAfterSeconds = - systemTaskOffsetEvaluation - .getOrDefault( - TaskType.of(task.getTaskType()), - OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET) - .getTaskOffsetEvaluation() - .computeEvaluationOffset( - task, systemTaskCallbackTime, queueDAO.getSize(queueName)); + final var evaluationStrategy = + systemTaskOffsetEvaluation.getOrDefault( + TaskType.of(task.getTaskType()), + OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET); + final var callbackAfterSeconds = + taskOffsetEvaluationSelector + .taskOffsetEvaluation(evaluationStrategy) + .computeEvaluationOffset(task, queueDAO.getSize(queueName)); task.setCallbackAfterSeconds(callbackAfterSeconds); queueDAO.postpone( queueName, diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java index e0a931a290..bb4b274f5f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java @@ -12,6 +12,10 @@ */ package com.netflix.conductor.core.execution.offset; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.model.TaskModel; /** @@ -31,11 +35,22 @@ * 51010 * */ +@Component final class BackoffToDefaultOffsetEvaluation implements TaskOffsetEvaluation { + private final long defaultOffset; + + BackoffToDefaultOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET; + } + @Override - public long computeEvaluationOffset( - final TaskModel taskModel, final long defaultOffset, final int queueSize) { + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { final int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; if (index == 0) { return 0L; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java index 4a087d0650..e8863e4424 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java @@ -12,13 +12,29 @@ */ package com.netflix.conductor.core.execution.offset; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.model.TaskModel; /** Dummy implementation of {@link TaskOffsetEvaluation} that always returns the default offset. */ +@Component final class ConstantDefaultOffsetEvaluation implements TaskOffsetEvaluation { + + private final long defaultOffset; + + ConstantDefaultOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET; + } + @Override - public long computeEvaluationOffset( - final TaskModel taskModel, final long defaultOffset, final int queueSize) { + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { return defaultOffset; } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java index e743c0342a..d83d18cd94 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java @@ -12,6 +12,10 @@ */ package com.netflix.conductor.core.execution.offset; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.model.TaskModel; /** @@ -33,11 +37,22 @@ * 4528 * */ +@Component final class ScaledByQueueSizeOffsetEvaluation implements TaskOffsetEvaluation { + private final long defaultOffset; + + ScaledByQueueSizeOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.SCALED_BY_QUEUE_SIZE; + } + @Override - public long computeEvaluationOffset( - final TaskModel taskModel, final long defaultOffset, final int queueSize) { + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; if (index == 0) { return 0L; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java index d1500a852e..5b2baadfa7 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java @@ -12,20 +12,24 @@ */ package com.netflix.conductor.core.execution.offset; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.model.TaskModel; /** Service used for computation of the evaluation offset for the postponed task. */ -public sealed interface TaskOffsetEvaluation - permits BackoffToDefaultOffsetEvaluation, - ConstantDefaultOffsetEvaluation, - ScaledByQueueSizeOffsetEvaluation { +public interface TaskOffsetEvaluation { + /** + * Get the type of the offset evaluation strategy. + * + * @return @{@link OffsetEvaluationStrategy} + */ + OffsetEvaluationStrategy type(); + /** * Compute the evaluation offset for the postponed task. * * @param taskModel details about the postponed task - * @param defaultOffset the default offset provided by the configuration properties [seconds] * @param queueSize the actual size of the queue before the task is postponed * @return the computed evaluation offset [seconds] */ - long computeEvaluationOffset(TaskModel taskModel, long defaultOffset, int queueSize); + long computeEvaluationOffset(TaskModel taskModel, int queueSize); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java new file mode 100644 index 0000000000..c2ef916840 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; + +@Component +public final class TaskOffsetEvaluationSelector { + + private final Map evaluations; + + @Autowired + public TaskOffsetEvaluationSelector(final List evaluations) { + this.evaluations = + evaluations.stream() + .collect(Collectors.toMap(TaskOffsetEvaluation::type, Function.identity())); + } + + /** + * Get the implementation of the offset evaluation for the given strategy. + * + * @param strategy the strategy to get the implementation for + * @return {@link TaskOffsetEvaluation} + * @throws IllegalStateException if no implementation is found for the given strategy + */ + @NonNull + public TaskOffsetEvaluation taskOffsetEvaluation(final OffsetEvaluationStrategy strategy) { + final var taskOffsetEvaluation = evaluations.get(strategy); + if (taskOffsetEvaluation == null) { + throw new IllegalStateException( + "No TaskOffsetEvaluation found for strategy: " + strategy); + } + return taskOffsetEvaluation; + } +} diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy index d960eb2b59..e8539570c3 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy @@ -16,7 +16,10 @@ import java.time.Duration import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.core.config.ConductorProperties +import com.netflix.conductor.core.config.OffsetEvaluationStrategy import com.netflix.conductor.core.dal.ExecutionDAOFacade +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluation +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask import com.netflix.conductor.core.operation.StartWorkflowOperation @@ -61,7 +64,25 @@ class AsyncSystemTaskExecutorTest extends Specification { properties.taskExecutionPostponeDuration = Duration.ofSeconds(1) properties.systemTaskWorkerCallbackDuration = Duration.ofSeconds(1) - executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, properties, workflowExecutor) + def offsetEvaluationStrategySelector = createTaskOffsetEvaluationSelector( + properties.systemTaskWorkerCallbackDuration.toSeconds()) + executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, + offsetEvaluationStrategySelector, properties, workflowExecutor) + } + + private static TaskOffsetEvaluationSelector createTaskOffsetEvaluationSelector(final long offset) { + def offsetEvaluation = new TaskOffsetEvaluation() { + @Override + OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET + } + + @Override + long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + return offset + } + } + return new TaskOffsetEvaluationSelector(List.of(offsetEvaluation)) } // this is not strictly a unit test, but its essential to test AsyncSystemTaskExecutor with SubWorkflow @@ -238,8 +259,6 @@ class AsyncSystemTaskExecutorTest extends Specification { taskDefName: "taskDefName", workflowPriority: 10) WorkflowModel workflow = new WorkflowModel(workflowId: workflowId, status: WorkflowModel.Status.RUNNING) String queueName = QueueUtils.getQueueName(task) - workflowSystemTask.getEvaluationOffset(task, 1) >> Optional.empty(); - when: executor.execute(workflowSystemTask, taskId) diff --git a/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java b/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java index 37000f7480..ae540fceb8 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java @@ -12,14 +12,15 @@ */ package com.netflix.conductor.core.execution.offset; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import java.time.Duration; + import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.model.TaskModel; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -27,26 +28,19 @@ @ExtendWith(MockitoExtension.class) class BackoffToDefaultOffsetEvaluationTest { - private static TaskOffsetEvaluation offsetEvaluation; - - @BeforeAll - static void setUp() { - offsetEvaluation = new BackoffToDefaultOffsetEvaluation(); - } - - @AfterAll - static void tearDown() { - offsetEvaluation = null; - } @Mock private TaskModel taskModel; + @Mock private ConductorProperties conductorProperties; @ParameterizedTest @CsvSource({"0, 5, 0", "1, 5, 0", "2, 5, 2", "3, 5, 4", "4, 5, 5", "4, 10, 8", "5, 10, 10"}) void testComputeEvaluationOffset( final int pollCount, final long defaultOffset, final long expectedOffset) { + when(conductorProperties.getSystemTaskWorkerCallbackDuration()) + .thenReturn(Duration.ofSeconds(defaultOffset)); + final var offsetEvaluation = new BackoffToDefaultOffsetEvaluation(conductorProperties); when(taskModel.getPollCount()).thenReturn(pollCount); - final var result = offsetEvaluation.computeEvaluationOffset(taskModel, defaultOffset, 10); + final var result = offsetEvaluation.computeEvaluationOffset(taskModel, 10); assertEquals(expectedOffset, result); } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java index 171331380a..04cf97250e 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java @@ -12,14 +12,15 @@ */ package com.netflix.conductor.core.execution.offset; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import java.time.Duration; + import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.model.TaskModel; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -28,19 +29,8 @@ @ExtendWith(MockitoExtension.class) class ScaledByQueueSizeOffsetEvaluationTest { - private static TaskOffsetEvaluation offsetEvaluation; - - @BeforeAll - static void setUp() { - offsetEvaluation = new ScaledByQueueSizeOffsetEvaluation(); - } - - @AfterAll - static void tearDown() { - offsetEvaluation = null; - } - @Mock private TaskModel taskModel; + @Mock private ConductorProperties conductorProperties; @ParameterizedTest @CsvSource({ @@ -57,9 +47,11 @@ void testComputeEvaluationOffset( final long defaultOffset, final int queueSize, final long expectedOffset) { + when(conductorProperties.getSystemTaskWorkerCallbackDuration()) + .thenReturn(Duration.ofSeconds(defaultOffset)); + final var offsetEvaluation = new ScaledByQueueSizeOffsetEvaluation(conductorProperties); when(taskModel.getPollCount()).thenReturn(pollCount); - final var result = - offsetEvaluation.computeEvaluationOffset(taskModel, defaultOffset, queueSize); + final var result = offsetEvaluation.computeEvaluationOffset(taskModel, queueSize); assertEquals(expectedOffset, result); } }