diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index aff7358342..00e49d66ec 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -108,6 +109,24 @@ public class ConductorProperties { private Map systemTaskOffsetEvaluation = Map.of(TaskType.JOIN, OffsetEvaluationStrategy.BACKOFF_TO_DEFAULT_OFFSET); + /** + * The duration of the task execution mapped to the calculated offset of the postponed task + * [seconds].
+ * This setting is used only by the {@link OffsetEvaluationStrategy#SCALED_BY_TASK_DURATION} + * offset evaluation strategy.
+ * Example: If map contains two entries (10, 30) and (20, 60), then the evaluation offsets for + * the postponed tasks in the queue will be calculated according to the following intervals: + * + * + * + * By default, the offset is always set to 0 seconds. + */ + private Map taskDurationToOffsetSteps = Collections.emptyMap(); + /** * The interval (in milliseconds) at which system task queues will be polled by the system task * workers. @@ -373,6 +392,14 @@ public Map getSystemTaskOffsetEvaluation() { return systemTaskOffsetEvaluation; } + public Map getTaskDurationToOffsetSteps() { + return taskDurationToOffsetSteps; + } + + public void setTaskDurationToOffsetSteps(Map taskDurationToOffsetSteps) { + this.taskDurationToOffsetSteps = taskDurationToOffsetSteps; + } + public void setSystemTaskWorkerCallbackDuration(Duration systemTaskWorkerCallbackDuration) { this.systemTaskWorkerCallbackDuration = systemTaskWorkerCallbackDuration; } diff --git a/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java index 9fbfa085be..150fd92386 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java +++ b/core/src/main/java/com/netflix/conductor/core/config/OffsetEvaluationStrategy.java @@ -30,5 +30,13 @@ public enum OffsetEvaluationStrategy { * poll count. In this strategy offset increases exponentially until it reaches the (default * offset * queue size) value. */ - SCALED_BY_QUEUE_SIZE; + SCALED_BY_QUEUE_SIZE, + /** + * Computes the evaluation offset for a postponed task based on the task's duration. In this + * strategy offset increases by steps that are proportional to the task's duration and defined + * by the user settings. + * + * @see ConductorProperties#getTaskDurationToOffsetSteps() setting used to define the steps + */ + SCALED_BY_TASK_DURATION }