From 54a66d3d9ce9d530aeab4924f183277add8fbb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaroslav=20T=C3=B3th?= Date: Wed, 8 Jan 2025 10:38:40 +0100 Subject: [PATCH] [FM-751] Add system task offset evaluation strategy (#179) * FM-751 Add system task offset evaluation strategy - Added option to customize strategy used for computation of a postponed system task per task type: conductor.app.system-task-offset-evaluation.[task-type]=[strategy] [task-type] - type of the task, e.g. join, simple, ... [strategy] - strategy used for computation of the system task offset; currently supported options are: a. 'constant_default_offset' b. 'backoff_to_default_offset' c. 'scaled_by_queue_size' - 'constant_default_offset' - uses constant value of set 'systemTaskWorkerCallbackDuration' configuration property; by default, it is used by all but 'join' system tasks - 'backoff_to_default_offset' - scales offset based on task poll-count in exponential way (2^n) up to value of the 'systemTaskWorkerCallbackDuration' configuration property; by default, it is used by 'join' system task - 'scaled_by_queue_size' - scales offset based on task poll-count and actual queue size in exponential way (2^n) up to value of: a. 'backoff_to_default_offset', if queue size == 0 b. 'backoff_to_default_offset'*'queue_size' otherwise this strategy is not used in the default configuration - Implemented new 'scaled_by_queue_size' strategy is appropriate for relatively big queues (100-1000s tasks) that contain long-running tasks (days-weeks) with high number of poll-counts. Reasoning: - New strategy was implemented primarily to solve performance issues on join queues that contain a large number of join tasks blocked by wait/human actions in some forks for several days/weeks. - Implemented strategies can easily be extended in the future while preserving backwards compatibility. - Improved configurability of the task offset evaluation. * FM-751 Change default offset strategy of join task - from BACKOFF_TO_DEFAULT_OFFSET - to SCALED_BY_QUEUE_SIZE * FM-751 Split OffsetEvaluationStrategy and implementations - goal: cleaner goals, separated configuration and implementation aspects - we can directly inject ConductorProperties into implementations of strategies that are represented by Spring components - introduction of TaskOffsetEvaluationSelector that allows other component to load implementation of specific strategy * FM-751 Add config property for SCALED_BY_TASK_DURATION strategy * FM-751 Implement ScaledByTaskDurationOffsetEvaluation - Computes the evaluation offset for a postponed task based on the task's duration and settings that define the offset for different levels of task durations. - In this strategy offset increases by steps based on settings that define the offset for different levels of task durations. Task duration is derived from {@link TaskModel#getScheduledTime()} and current time. - This strategy is appropriate for tasks that have a wide range of durations and the offset should be scaled based on the task's duration. - The defined keys in the settings compose the duration intervals for which the offset will be set to the corresponding value: <0, d1) = 0, + * Tasks that are not listed here use {@link + * ConductorProperties#systemTaskWorkerCallbackDuration} value. + */ + private Map systemTaskOffsetEvaluation = + Map.of(TaskType.JOIN, OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET); + + /** + * The duration of the task execution mapped to the calculated offset of the postponed task + * [seconds].
+ * This setting is used only by the {@link OffsetEvaluationStrategy#SCALED_BY_TASK_DURATION} + * offset evaluation strategy.
+ * Example: If settings contain two entries (10, 30) and (20, 60), then the evaluation offsets + * for the postponed tasks in the queue will be calculated according to the following intervals: + * + *
    + *
  • <0,10) seconds: offset = 0 seconds + *
  • <10,20) seconds: offset = 30 seconds + *
  • <20,N) seconds: offset = 60 seconds + *
+ * + * By default, the offset is always set to 0 seconds. + */ + private Map taskDurationToOffsetSteps = Collections.emptyMap(); + /** * The interval (in milliseconds) at which system task queues will be polled by the system task * workers. @@ -353,6 +383,23 @@ public Duration getSystemTaskWorkerCallbackDuration() { return systemTaskWorkerCallbackDuration; } + public void setSystemTaskOffsetEvaluation( + final Map systemTaskOffsetEvaluation) { + this.systemTaskOffsetEvaluation = systemTaskOffsetEvaluation; + } + + public Map getSystemTaskOffsetEvaluation() { + return systemTaskOffsetEvaluation; + } + + public Map getTaskDurationToOffsetSteps() { + return taskDurationToOffsetSteps; + } + + public void setTaskDurationToOffsetSteps(Map taskDurationToOffsetSteps) { + this.taskDurationToOffsetSteps = taskDurationToOffsetSteps; + } + public void setSystemTaskWorkerCallbackDuration(Duration systemTaskWorkerCallbackDuration) { this.systemTaskWorkerCallbackDuration = systemTaskWorkerCallbackDuration; } diff --git a/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java new file mode 100644 index 0000000000..150fd92386 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.config; + +/** + * Strategies used for computation of the task offset. The offset is used to postpone the task + * execution in the queue. + */ +public enum OffsetEvaluationStrategy { + /** Constant offset evaluation strategy - using default offset value. */ + CONSTANT_DEFAULT_OFFSET, + /** + * Computes the evaluation offset for a postponed task based on the task's poll count and a + * default offset. In this strategy offset increases exponentially until it reaches the default + * offset. + */ + BACKOFF_TO_DEFAULT_OFFSET, + /** + * Computes the evaluation offset for a postponed task based on the queue size and the task's + * poll count. In this strategy offset increases exponentially until it reaches the (default + * offset * queue size) value. + */ + SCALED_BY_QUEUE_SIZE, + /** + * Computes the evaluation offset for a postponed task based on the task's duration. In this + * strategy offset increases by steps that are proportional to the task's duration and defined + * by the user settings. + * + * @see ConductorProperties#getTaskDurationToOffsetSteps() setting used to define the steps + */ + SCALED_BY_TASK_DURATION +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index dd7975425f..8d04e01973 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -12,12 +12,17 @@ */ package com.netflix.conductor.core.execution; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; import com.netflix.conductor.core.dal.ExecutionDAOFacade; +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.MetadataDAO; @@ -33,8 +38,9 @@ public class AsyncSystemTaskExecutor { private final QueueDAO queueDAO; private final MetadataDAO metadataDAO; private final long queueTaskMessagePostponeSecs; - private final long systemTaskCallbackTime; + private final TaskOffsetEvaluationSelector taskOffsetEvaluationSelector; private final WorkflowExecutor workflowExecutor; + private final Map systemTaskOffsetEvaluation; private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSystemTaskExecutor.class); @@ -42,16 +48,17 @@ public AsyncSystemTaskExecutor( ExecutionDAOFacade executionDAOFacade, QueueDAO queueDAO, MetadataDAO metadataDAO, + TaskOffsetEvaluationSelector taskOffsetEvaluationSelector, ConductorProperties conductorProperties, WorkflowExecutor workflowExecutor) { this.executionDAOFacade = executionDAOFacade; this.queueDAO = queueDAO; this.metadataDAO = metadataDAO; + this.taskOffsetEvaluationSelector = taskOffsetEvaluationSelector; this.workflowExecutor = workflowExecutor; - this.systemTaskCallbackTime = - conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds(); this.queueTaskMessagePostponeSecs = conductorProperties.getTaskExecutionPostponeDuration().getSeconds(); + this.systemTaskOffsetEvaluation = conductorProperties.getSystemTaskOffsetEvaluation(); } /** @@ -164,12 +171,15 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { hasTaskExecutionCompleted = true; LOGGER.debug("{} removed from queue: {}", task, queueName); } else { - task.setCallbackAfterSeconds(systemTaskCallbackTime); - systemTask - .getEvaluationOffset(task, systemTaskCallbackTime) - .ifPresentOrElse( - task::setCallbackAfterSeconds, - () -> task.setCallbackAfterSeconds(systemTaskCallbackTime)); + final var evaluationStrategy = + systemTaskOffsetEvaluation.getOrDefault( + TaskType.of(task.getTaskType()), + OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET); + final var callbackAfterSeconds = + taskOffsetEvaluationSelector + .taskOffsetEvaluation(evaluationStrategy) + .computeEvaluationOffset(task, queueDAO.getSize(queueName)); + task.setCallbackAfterSeconds(callbackAfterSeconds); queueDAO.postpone( queueName, task.getTaskId(), diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java new file mode 100644 index 0000000000..bb4b274f5f --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluation.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; +import com.netflix.conductor.model.TaskModel; + +/** + * Computes the evaluation offset for a postponed task based on the task's poll count and a default + * offset. In this strategy offset increases exponentially until it reaches the default offset.
+ * This strategy is appropriate for queues that require low latency of all tasks.
+ * Sample evaluationOffset for different pollCounts and defaultOffset (queueSize is ignored): + * + * + * + * + * + * + * + * + * + * + *
pollCountdefaultOffsetevaluationOffset
050
150
252
354
455
4108
51010
+ */ +@Component +final class BackoffToDefaultOffsetEvaluation implements TaskOffsetEvaluation { + + private final long defaultOffset; + + BackoffToDefaultOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET; + } + + @Override + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + final int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; + if (index == 0) { + return 0L; + } + return Math.min((long) Math.pow(2, index), defaultOffset); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java new file mode 100644 index 0000000000..e8863e4424 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/ConstantDefaultOffsetEvaluation.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; +import com.netflix.conductor.model.TaskModel; + +/** Dummy implementation of {@link TaskOffsetEvaluation} that always returns the default offset. */ +@Component +final class ConstantDefaultOffsetEvaluation implements TaskOffsetEvaluation { + + private final long defaultOffset; + + ConstantDefaultOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET; + } + + @Override + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + return defaultOffset; + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java new file mode 100644 index 0000000000..d83d18cd94 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluation.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; +import com.netflix.conductor.model.TaskModel; + +/** + * Computes the evaluation offset for a postponed task based on the queue size and the task's poll + * count. In this strategy offset increases exponentially until it reaches the (default offset * + * queue size) value.
+ * This strategy is appropriate for relatively big queues (100-1000s tasks) that contain + * long-running tasks (days-weeks) with high number of poll-counts.
+ * Sample evaluationOffset for different pollCounts, defaultOffset and queueSize: + * + * + * + * + * + * + * + * + * + * + *
pollCountdefaultOffsetqueueSizeevaluationOffset
0--0
1--0
2512
3514
4515
4505
4528
+ */ +@Component +final class ScaledByQueueSizeOffsetEvaluation implements TaskOffsetEvaluation { + + private final long defaultOffset; + + ScaledByQueueSizeOffsetEvaluation(final ConductorProperties conductorProperties) { + defaultOffset = conductorProperties.getSystemTaskWorkerCallbackDuration().toSeconds(); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.SCALED_BY_QUEUE_SIZE; + } + + @Override + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; + if (index == 0) { + return 0L; + } + final long scaledOffset = queueSize > 0 ? queueSize * defaultOffset : defaultOffset; + return Math.min((long) Math.pow(2, index), scaledOffset); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluation.java new file mode 100644 index 0000000000..dd9e940d7a --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluation.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; +import com.netflix.conductor.model.TaskModel; + +/** + * Computes the evaluation offset for a postponed task based on the task's duration and settings + * that define the offset for different levels of task durations.
+ * In this strategy offset increases by steps based on settings that define the offset for different + * levels of task durations. Task duration is derived from {@link TaskModel#getScheduledTime()} and + * current time.
+ * This strategy is appropriate for tasks that have a wide range of durations and the offset should + * be scaled based on the task's duration.
+ * The defined keys in the settings compose the duration intervals for which the offset will be set + * to the corresponding value: <0, d1) = 0, + * The order of the keys is not important as the map is sorted by the key before the evaluation. + */ +@Component +final class ScaledByTaskDurationOffsetEvaluation implements TaskOffsetEvaluation { + + private final Map taskDurationToOffsetSteps; + + ScaledByTaskDurationOffsetEvaluation(final ConductorProperties conductorProperties) { + taskDurationToOffsetSteps = sortByTaskDuration(conductorProperties); + } + + private static LinkedHashMap sortByTaskDuration( + final ConductorProperties conductorProperties) { + return conductorProperties.getTaskDurationToOffsetSteps().entrySet().stream() + .sorted(Entry.comparingByKey(Comparator.reverseOrder())) + .collect( + Collectors.toMap( + Entry::getKey, + Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new)); + } + + @Override + public OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.SCALED_BY_TASK_DURATION; + } + + @Override + public long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + if (taskDurationToOffsetSteps.isEmpty()) { + return 0L; + } + final long taskDuration = + (System.currentTimeMillis() - taskModel.getScheduledTime()) / 1000; + return taskDurationToOffsetSteps.entrySet().stream() + .filter(entry -> taskDuration >= entry.getKey()) + .map(Entry::getValue) + .findFirst() + .orElse(0L); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java new file mode 100644 index 0000000000..5b2baadfa7 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluation.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; +import com.netflix.conductor.model.TaskModel; + +/** Service used for computation of the evaluation offset for the postponed task. */ +public interface TaskOffsetEvaluation { + /** + * Get the type of the offset evaluation strategy. + * + * @return @{@link OffsetEvaluationStrategy} + */ + OffsetEvaluationStrategy type(); + + /** + * Compute the evaluation offset for the postponed task. + * + * @param taskModel details about the postponed task + * @param queueSize the actual size of the queue before the task is postponed + * @return the computed evaluation offset [seconds] + */ + long computeEvaluationOffset(TaskModel taskModel, int queueSize); +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java new file mode 100644 index 0000000000..c2ef916840 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/offset/TaskOffsetEvaluationSelector.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.OffsetEvaluationStrategy; + +@Component +public final class TaskOffsetEvaluationSelector { + + private final Map evaluations; + + @Autowired + public TaskOffsetEvaluationSelector(final List evaluations) { + this.evaluations = + evaluations.stream() + .collect(Collectors.toMap(TaskOffsetEvaluation::type, Function.identity())); + } + + /** + * Get the implementation of the offset evaluation for the given strategy. + * + * @param strategy the strategy to get the implementation for + * @return {@link TaskOffsetEvaluation} + * @throws IllegalStateException if no implementation is found for the given strategy + */ + @NonNull + public TaskOffsetEvaluation taskOffsetEvaluation(final OffsetEvaluationStrategy strategy) { + final var taskOffsetEvaluation = evaluations.get(strategy); + if (taskOffsetEvaluation == null) { + throw new IllegalStateException( + "No TaskOffsetEvaluation found for strategy: " + strategy); + } + return taskOffsetEvaluation; + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 0398b70c18..46f9235d1e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -13,7 +13,6 @@ package com.netflix.conductor.core.execution.tasks; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.springframework.stereotype.Component; @@ -82,15 +81,6 @@ public boolean execute( return false; } - @Override - public Optional getEvaluationOffset(TaskModel taskModel, long defaultOffset) { - int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; - if (index == 0) { - return Optional.of(0L); - } - return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset)); - } - public boolean isAsync() { return true; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java index b531d0001b..3b6bb0d347 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java @@ -65,10 +65,6 @@ public boolean execute( */ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {} - public Optional getEvaluationOffset(TaskModel taskModel, long defaultOffset) { - return Optional.empty(); - } - /** * @return True if the task is supposed to be started asynchronously using internal queues. */ diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy index d960eb2b59..e8539570c3 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy @@ -16,7 +16,10 @@ import java.time.Duration import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.core.config.ConductorProperties +import com.netflix.conductor.core.config.OffsetEvaluationStrategy import com.netflix.conductor.core.dal.ExecutionDAOFacade +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluation +import com.netflix.conductor.core.execution.offset.TaskOffsetEvaluationSelector import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask import com.netflix.conductor.core.operation.StartWorkflowOperation @@ -61,7 +64,25 @@ class AsyncSystemTaskExecutorTest extends Specification { properties.taskExecutionPostponeDuration = Duration.ofSeconds(1) properties.systemTaskWorkerCallbackDuration = Duration.ofSeconds(1) - executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, properties, workflowExecutor) + def offsetEvaluationStrategySelector = createTaskOffsetEvaluationSelector( + properties.systemTaskWorkerCallbackDuration.toSeconds()) + executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, + offsetEvaluationStrategySelector, properties, workflowExecutor) + } + + private static TaskOffsetEvaluationSelector createTaskOffsetEvaluationSelector(final long offset) { + def offsetEvaluation = new TaskOffsetEvaluation() { + @Override + OffsetEvaluationStrategy type() { + return OffsetEvaluationStrategy.CONSTANT_DEFAULT_OFFSET + } + + @Override + long computeEvaluationOffset(final TaskModel taskModel, final int queueSize) { + return offset + } + } + return new TaskOffsetEvaluationSelector(List.of(offsetEvaluation)) } // this is not strictly a unit test, but its essential to test AsyncSystemTaskExecutor with SubWorkflow @@ -238,8 +259,6 @@ class AsyncSystemTaskExecutorTest extends Specification { taskDefName: "taskDefName", workflowPriority: 10) WorkflowModel workflow = new WorkflowModel(workflowId: workflowId, status: WorkflowModel.Status.RUNNING) String queueName = QueueUtils.getQueueName(task) - workflowSystemTask.getEvaluationOffset(task, 1) >> Optional.empty(); - when: executor.execute(workflowSystemTask, taskId) diff --git a/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java b/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java new file mode 100644 index 0000000000..ae540fceb8 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/offset/BackoffToDefaultOffsetEvaluationTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.time.Duration; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.model.TaskModel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class BackoffToDefaultOffsetEvaluationTest { + + @Mock private TaskModel taskModel; + @Mock private ConductorProperties conductorProperties; + + @ParameterizedTest + @CsvSource({"0, 5, 0", "1, 5, 0", "2, 5, 2", "3, 5, 4", "4, 5, 5", "4, 10, 8", "5, 10, 10"}) + void testComputeEvaluationOffset( + final int pollCount, final long defaultOffset, final long expectedOffset) { + when(conductorProperties.getSystemTaskWorkerCallbackDuration()) + .thenReturn(Duration.ofSeconds(defaultOffset)); + final var offsetEvaluation = new BackoffToDefaultOffsetEvaluation(conductorProperties); + when(taskModel.getPollCount()).thenReturn(pollCount); + final var result = offsetEvaluation.computeEvaluationOffset(taskModel, 10); + assertEquals(expectedOffset, result); + } +} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java new file mode 100644 index 0000000000..04cf97250e --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByQueueSizeOffsetEvaluationTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.time.Duration; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.model.TaskModel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ScaledByQueueSizeOffsetEvaluationTest { + + @Mock private TaskModel taskModel; + @Mock private ConductorProperties conductorProperties; + + @ParameterizedTest + @CsvSource({ + "0, 5, 1, 0", + "1, 5, 1, 0", + "2, 5, 1, 2", + "3, 5, 1, 4", + "4, 5, 1, 5", + "4, 5, 0, 5", + "4, 5, 2, 8" + }) + void testComputeEvaluationOffset( + final int pollCount, + final long defaultOffset, + final int queueSize, + final long expectedOffset) { + when(conductorProperties.getSystemTaskWorkerCallbackDuration()) + .thenReturn(Duration.ofSeconds(defaultOffset)); + final var offsetEvaluation = new ScaledByQueueSizeOffsetEvaluation(conductorProperties); + when(taskModel.getPollCount()).thenReturn(pollCount); + final var result = offsetEvaluation.computeEvaluationOffset(taskModel, queueSize); + assertEquals(expectedOffset, result); + } +} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluationTest.java b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluationTest.java new file mode 100644 index 0000000000..5b22e1047b --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/offset/ScaledByTaskDurationOffsetEvaluationTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.offset; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.model.TaskModel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ScaledByTaskDurationOffsetEvaluationTest { + + @Mock private TaskModel taskModel; + @Mock private ConductorProperties conductorProperties; + + private static Stream testOffsetsProvider() { + return Stream.of( + Arguments.of(Map.of(10L, 30L, 20L, 60L, 30L, 120L), 1L, 0L), + Arguments.of(Map.of(10L, 30L, 20L, 60L, 30L, 120L), 11L, 30L), + Arguments.of(Map.of(10L, 30L, 20L, 60L, 30L, 120L), 100L, 120L), + Arguments.of(Collections.emptyMap(), 20L, 0L), + Arguments.of(Map.of(30L, 120L, 20L, 60L, 10L, 0L, 100L, 1200L), 35L, 120L)); + } + + @ParameterizedTest + @MethodSource("testOffsetsProvider") + void testComputeEvaluationOffset( + final Map offsets, final long taskDuration, final long expectedOffset) { + final long scheduledTime = System.currentTimeMillis() - (taskDuration * 1000); + when(conductorProperties.getTaskDurationToOffsetSteps()).thenReturn(offsets); + if (!offsets.isEmpty()) { + when(taskModel.getScheduledTime()).thenReturn(scheduledTime); + } + + final var offsetEvaluation = new ScaledByTaskDurationOffsetEvaluation(conductorProperties); + final var result = offsetEvaluation.computeEvaluationOffset(taskModel, 50); + assertEquals(expectedOffset, result); + } +} diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 9a57b3b4c6..0ba93477ea 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -130,6 +130,7 @@ conductor.external-payload-storage.postgres.password=postgres #### Performance / timer tweaks #### conductor.app.systemTaskWorkerCallbackDuration=10 +conductor.app.system-task-offset-evaluation.join=backoff_to_default_offset conductor.app.workflowOffsetTimeout=10 #### Performance / timer tweaks end ####