Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
[Branch - 2.31] Configurable retry delay policy (#2371)
Browse files Browse the repository at this point in the history
* Custom strategy for configurable retry delay

* Addressing code review comments

* Removing redundant space

* Addressed code review comments

* Address code review feedback and added new retry policy at workflow task level

* Fix test code formatting

* Fix test code formatting

* Addressed code review comments

* Removed the proto

Co-authored-by: pgolash <[email protected]>
  • Loading branch information
prashantgolash and pgolash authored Aug 10, 2021
1 parent b751d16 commit 1e7f467
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.common.constraints;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskDef.RetryLogic;

import javax.validation.Constraint;
import javax.validation.ConstraintValidator;
import javax.validation.ConstraintValidatorContext;
import javax.validation.Payload;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;

@Documented
@Constraint(validatedBy = RetryLogicConstraint.RetryLogicValidator.class)
@Target({TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RetryLogicConstraint {
String message() default "";

Class<?>[] groups() default {};

Class<? extends Payload>[] payload() default {};

class RetryLogicValidator implements ConstraintValidator<RetryLogicConstraint, TaskDef> {

@Override
public void initialize(RetryLogicConstraint constraintAnnotation) {
}

@Override
public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) {
context.disableDefaultConstraintViolation();

boolean valid = true;
if (taskDef.getRetryLogic() == RetryLogic.UNSPECIFIED && taskDef.isRetryDelaySet()) {
valid = false;
String message = String.format("TaskDef: %s retryPolicy can't be UNSPECIFIED as retryDelay is set",
taskDef.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
}
return valid;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.netflix.conductor.common.constraints.OwnerEmailMandatoryConstraint;
import com.netflix.conductor.common.constraints.RetryLogicConstraint;
import com.netflix.conductor.common.constraints.TaskTimeoutConstraint;
import com.netflix.conductor.common.metadata.Auditable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -39,14 +41,15 @@
*/
@ProtoMessage
@TaskTimeoutConstraint
@RetryLogicConstraint
@Valid
public class TaskDef extends Auditable {

@ProtoEnum
public enum TimeoutPolicy {RETRY, TIME_OUT_WF, ALERT_ONLY}

@ProtoEnum
public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}
public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF, CUSTOM, UNSPECIFIED}

private static final int ONE_HOUR = 60 * 60;

Expand Down Expand Up @@ -82,6 +85,7 @@ public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}

@ProtoField(id = 9)
private int retryDelaySeconds = 60;
private boolean isRetryDelaySet = false;

@ProtoField(id = 10)
@Min(value = 1, message = "TaskDef responseTimeoutSeconds: ${validatedValue} should be minimum {value} second")
Expand Down Expand Up @@ -268,6 +272,21 @@ public int getRetryDelaySeconds() {
return retryDelaySeconds;
}

/**
* @param retryDelaySeconds the retryDelaySeconds to set
*/
public void setRetryDelaySeconds(int retryDelaySeconds) {
this.retryDelaySeconds = retryDelaySeconds;
this.isRetryDelaySet = true;
}

/**
* @return if retryDelaySeconds is set
*/
public boolean isRetryDelaySet() {
return isRetryDelaySet;
}

/**
*
* @return the timeout for task to send response. After this timeout, the task will be re-queued
Expand All @@ -284,13 +303,6 @@ public void setResponseTimeoutSeconds(long responseTimeoutSeconds) {
this.responseTimeoutSeconds = responseTimeoutSeconds;
}

/**
* @param retryDelaySeconds the retryDelaySeconds to set
*/
public void setRetryDelaySeconds(int retryDelaySeconds) {
this.retryDelaySeconds = retryDelaySeconds;
}

/**
* @return the inputTemplate
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public enum Status {
@ProtoField(id = 8)
private Any outputMessage;

@ProtoField(id = 9)
private int retryDelaySeconds;

private List<TaskExecLog> logs = new CopyOnWriteArrayList<>();

private String externalOutputPayloadStoragePath;
Expand All @@ -80,6 +83,7 @@ public TaskResult(Task task) {
this.outputData = task.getOutputData();
this.externalOutputPayloadStoragePath = task.getExternalOutputPayloadStoragePath();
this.subWorkflowId = task.getSubWorkflowId();
this.retryDelaySeconds = task.getStartDelayInSeconds();
switch (task.getStatus()) {
case CANCELED:
case COMPLETED_WITH_ERRORS:
Expand Down Expand Up @@ -132,15 +136,33 @@ public long getCallbackAfterSeconds() {
}

/**
* When set to non-zero values, the task remains in the queue for the specified seconds before sent back to the worker when polled.
* Useful for the long running task, where the task is updated as IN_PROGRESS and should not be polled out of the queue for a specified amount of time. (delayed queue implementation)
* @return the retry delay for the FAILED tasks.
*/
public int getRetryDelaySeconds() {
return retryDelaySeconds;
}

/**
* When set to non-zero values, the task remains in the queue for the specified seconds before sent back to the
* worker when polled. Useful for the long running task, where the task is updated as IN_PROGRESS and should not be
* polled out of the queue for a specified amount of time. (delayed queue implementation)
*
* @param callbackAfterSeconds Amount of time in seconds the task should be held in the queue before giving it to a polling worker.
*/
public void setCallbackAfterSeconds(long callbackAfterSeconds) {
this.callbackAfterSeconds = callbackAfterSeconds;
}

/**
* Retry delay. It follows following logic to decide on retry interval:
* <p>NO retry delay if the worker sends a negative value in the TaskResult</p>
* <p>Retry delay from the task definition if the worker sends 0 in the TaskResult</p>
* <p>Retry delay from the workflow task if the worker sends a positive value in the TaskResult</p>
*
* @param retryDelaySeconds
*/
public void setRetryDelaySeconds(int retryDelaySeconds) { this.retryDelaySeconds = retryDelaySeconds; }

public String getWorkerId() {
return workerId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ public void setTasks(List<WorkflowTask> tasks) {
@ProtoField(id = 26)
private Integer retryCount;

@ProtoField(id = 27)
private TaskDef.RetryLogic retryLogic = TaskDef.RetryLogic.UNSPECIFIED;

/**
* @return the name
*/
Expand Down Expand Up @@ -529,6 +532,20 @@ public void setDefaultExclusiveJoinTask(List<String> defaultExclusiveJoinTask) {
this.defaultExclusiveJoinTask = defaultExclusiveJoinTask;
}

/**
* @return the retryLogic
*/
public TaskDef.RetryLogic getRetryLogic() {
return retryLogic;
}

/**
* @param retryLogic the retryLogic to set
*/
public void setRetryLogic(TaskDef.RetryLogic retryLogic) {
this.retryLogic = retryLogic;
}

private Collection<List<WorkflowTask>> children() {
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();
TaskType taskType = TaskType.USER_DEFINED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void setUp() {
task.setWorkerId("worker-id");
task.setOutputData(new HashMap<>());
task.setExternalOutputPayloadStoragePath("externalOutput");
task.setStartDelayInSeconds(10);
}

@Test
Expand Down Expand Up @@ -64,5 +65,6 @@ private void validateTaskResult() {
assertEquals(task.getWorkerId(), taskResult.getWorkerId());
assertEquals(task.getOutputData(), taskResult.getOutputData());
assertEquals(task.getExternalOutputPayloadStoragePath(), taskResult.getExternalOutputPayloadStoragePath());
assertEquals(task.getStartDelayInSeconds(), taskResult.getRetryDelaySeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Test;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskDef.RetryLogic;

import javax.validation.ConstraintViolation;
import javax.validation.Validation;
Expand All @@ -44,27 +45,29 @@ public void setup(){
this.validator = factory.getValidator();
}

@Test
public void test() {
String name = "test1";
String description = "desc";
int retryCount = 10;
int timeout = 100;
TaskDef def = new TaskDef(name, description, retryCount, timeout);
assertEquals(36_00, def.getResponseTimeoutSeconds());
assertEquals(name, def.getName());
assertEquals(description, def.getDescription());
assertEquals(retryCount, def.getRetryCount());
assertEquals(timeout, def.getTimeoutSeconds());
}

@Test
public void testTaskDef() {
TaskDef taskDef = new TaskDef();
taskDef.setName("task1");
taskDef.setRetryCount(-1);
taskDef.setTimeoutSeconds(1000);
taskDef.setResponseTimeoutSeconds(1001);
@Test
public void test() {
String name = "test1";
String description = "desc";
int retryCount = 10;
int timeout = 100;
TaskDef def = new TaskDef(name, description, retryCount, timeout);
assertEquals(36_00, def.getResponseTimeoutSeconds());
assertEquals(name, def.getName());
assertEquals(description, def.getDescription());
assertEquals(retryCount, def.getRetryCount());
assertEquals(timeout, def.getTimeoutSeconds());
}

@Test
public void testTaskDef() {
TaskDef taskDef = new TaskDef();
taskDef.setName("task1");
taskDef.setRetryCount(-1);
taskDef.setTimeoutSeconds(1000);
taskDef.setResponseTimeoutSeconds(1001);
taskDef.setRetryLogic(RetryLogic.FIXED);
taskDef.setRetryDelaySeconds(10);

Set<ConstraintViolation<Object>> result = validator.validate(taskDef);
assertEquals(3, result.size());
Expand All @@ -77,6 +80,26 @@ public void testTaskDef() {
assertTrue(validationErrors.contains("ownerEmail cannot be empty"));
}

@Test
public void testTaskDefRetryLogic() {
TaskDef taskDef = new TaskDef();
taskDef.setName("task1");
taskDef.setRetryCount(1);
taskDef.setTimeoutSeconds(1000);
taskDef.setResponseTimeoutSeconds(10);
taskDef.setOwnerEmail("[email protected]");
taskDef.setRetryLogic(RetryLogic.UNSPECIFIED);
taskDef.setRetryDelaySeconds(100);

Set<ConstraintViolation<Object>> result = validator.validate(taskDef);
assertEquals(1, result.size());

List<String> validationErrors = new ArrayList<>();
result.forEach(e -> validationErrors.add(e.getMessage()));

assertTrue(validationErrors.contains("TaskDef: task1 retryPolicy can't be UNSPECIFIED as retryDelay is set"));
}

@Test
public void testTaskDefNameAndOwnerNotSet() {
TaskDef taskDef = new TaskDef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testWorkflowDefJson() throws Exception {
" \"loopOver\" : [ ],\n" +
" \"name\" : \"test_task\",\n" +
" \"optional\" : false,\n" +
" \"retryLogic\" : \"UNSPECIFIED\",\n" +
" \"startDelay\" : 0,\n" +
" \"taskReferenceName\" : \"t1\",\n" +
" \"type\" : \"SIMPLE\"\n" +
Expand Down
Loading

0 comments on commit 1e7f467

Please sign in to comment.