Skip to content

Commit

Permalink
Merge pull request conductor-oss#313 from shaileshpadave/addTotalTimeout
Browse files Browse the repository at this point in the history
Adding total timeout to task models
  • Loading branch information
shaileshpadave authored Nov 27, 2024
2 parents 4ae5b18 + ccbcbda commit c67b2eb
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) {
}
}

// Check if timeoutSeconds is greater than totalTimeoutSeconds
if (taskDef.getTimeoutSeconds() > 0
&& taskDef.getTotalTimeoutSeconds() > 0
&& taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) {
valid = false;
String message =
String.format(
"TaskDef: %s timeoutSeconds: %d must be less than or equal to totalTimeoutSeconds: %d",
taskDef.getName(),
taskDef.getTimeoutSeconds(),
taskDef.getTotalTimeoutSeconds());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
}

return valid;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public boolean isRetriable() {
@ProtoField(id = 42)
private boolean subworkflowChanged;

@ProtoField(id = 43)
private long firstStartTime;

// If the task is an event associated with a parent task, the id of the parent task
private String parentTaskId;

Expand Down Expand Up @@ -716,7 +719,9 @@ public boolean isLoopOverTask() {
return iteration > 0;
}

/** * @return the priority defined on workflow */
/**
* @return the priority defined on workflow
*/
public int getWorkflowPriority() {
return workflowPriority;
}
Expand Down Expand Up @@ -765,6 +770,14 @@ public void setParentTaskId(String parentTaskId) {
this.parentTaskId = parentTaskId;
}

public long getFirstStartTime() {
return firstStartTime;
}

public void setFirstStartTime(long firstStartTime) {
this.firstStartTime = firstStartTime;
}

public Task copy() {
Task copy = new Task();
copy.setCallbackAfterSeconds(callbackAfterSeconds);
Expand Down Expand Up @@ -798,6 +811,7 @@ public Task copy() {
copy.setSubWorkflowId(getSubWorkflowId());
copy.setSubworkflowChanged(subworkflowChanged);
copy.setParentTaskId(parentTaskId);
copy.setFirstStartTime(firstStartTime);
return copy;
}

Expand All @@ -819,6 +833,7 @@ public Task deepCopy() {
deepCopy.setReasonForIncompletion(reasonForIncompletion);
deepCopy.setSeq(seq);
deepCopy.setParentTaskId(parentTaskId);
deepCopy.setFirstStartTime(firstStartTime);
return deepCopy;
}

Expand Down Expand Up @@ -919,6 +934,9 @@ public String toString() {
+ ", subworkflowChanged='"
+ subworkflowChanged
+ '\''
+ ", firstStartTime='"
+ firstStartTime
+ '\''
+ '}';
}

Expand Down Expand Up @@ -973,7 +991,8 @@ && getWorkflowPriority() == task.getWorkflowPriority()
task.getExternalOutputPayloadStoragePath())
&& Objects.equals(getIsolationGroupId(), task.getIsolationGroupId())
&& Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace())
&& Objects.equals(getParentTaskId(), task.getParentTaskId());
&& Objects.equals(getParentTaskId(), task.getParentTaskId())
&& Objects.equals(getFirstStartTime(), task.getFirstStartTime());
}

@Override
Expand Down Expand Up @@ -1016,6 +1035,7 @@ public int hashCode() {
getExternalOutputPayloadStoragePath(),
getIsolationGroupId(),
getExecutionNameSpace(),
getParentTaskId());
getParentTaskId(),
getFirstStartTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public enum RetryLogic {
@ProtoField(id = 21)
private String baseType;

@ProtoField(id = 22)
@NotNull
private long totalTimeoutSeconds;

private SchemaDef inputSchema;
private SchemaDef outputSchema;
private boolean enforceSchema;
Expand Down Expand Up @@ -464,6 +468,14 @@ public void setEnforceSchema(boolean enforceSchema) {
this.enforceSchema = enforceSchema;
}

public long getTotalTimeoutSeconds() {
return totalTimeoutSeconds;
}

public void setTotalTimeoutSeconds(long totalTimeoutSeconds) {
this.totalTimeoutSeconds = totalTimeoutSeconds;
}

@Override
public String toString() {
return name;
Expand Down Expand Up @@ -497,7 +509,8 @@ && getRetryLogic() == taskDef.getRetryLogic()
&& Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail())
&& Objects.equals(getBaseType(), taskDef.getBaseType())
&& Objects.equals(getInputSchema(), taskDef.getInputSchema())
&& Objects.equals(getOutputSchema(), taskDef.getOutputSchema());
&& Objects.equals(getOutputSchema(), taskDef.getOutputSchema())
&& Objects.equals(getTotalTimeoutSeconds(), taskDef.getTotalTimeoutSeconds());
}

@Override
Expand All @@ -523,6 +536,7 @@ public int hashCode() {
getOwnerEmail(),
getBaseType(),
getInputSchema(),
getOutputSchema());
getOutputSchema(),
getTotalTimeoutSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ public void testTaskDef() {
assertTrue(validationErrors.contains("ownerEmail cannot be empty"));
}

@Test
public void testTaskDefTotalTimeOutSeconds() {
TaskDef taskDef = new TaskDef();
taskDef.setName("test-task");
taskDef.setRetryCount(1);
taskDef.setTimeoutSeconds(1000);
taskDef.setTotalTimeoutSeconds(900);
taskDef.setResponseTimeoutSeconds(1);
taskDef.setOwnerEmail("[email protected]");

Set<ConstraintViolation<Object>> result = validator.validate(taskDef);
assertEquals(1, result.size());

List<String> validationErrors = new ArrayList<>();
result.forEach(e -> validationErrors.add(e.getMessage()));

assertTrue(
validationErrors.toString(),
validationErrors.contains(
"TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 900"));
}

@Test
public void testTaskDefInvalidEmail() {
TaskDef taskDef = new TaskDef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testDeepCopyTask() {
final Task task = new Task();
// In order to avoid forgetting putting inside the copy method the newly added fields check
// the number of declared fields.
final int expectedTaskFieldsNumber = 41;
final int expectedTaskFieldsNumber = 42;
final int declaredFieldsNumber = task.getClass().getDeclaredFields().length;

assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber);
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public boolean isRetriable() {
/** Time when the task was last updated */
private long updateTime;

/** Time when first task started */
private long firstStartTime;

private int startDelayInSeconds;

private String retriedTaskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ public TaskPb.Task toProto(Task from) {
to.setSubWorkflowId( from.getSubWorkflowId() );
}
to.setSubworkflowChanged( from.isSubworkflowChanged() );
to.setFirstStartTime( from.getFirstStartTime() );
return to.build();
}

Expand Down Expand Up @@ -788,6 +789,7 @@ public Task fromProto(TaskPb.Task from) {
to.setIteration( from.getIteration() );
to.setSubWorkflowId( from.getSubWorkflowId() );
to.setSubworkflowChanged( from.getSubworkflowChanged() );
to.setFirstStartTime( from.getFirstStartTime() );
return to;
}

Expand Down Expand Up @@ -875,6 +877,7 @@ public TaskDefPb.TaskDef toProto(TaskDef from) {
if (from.getBaseType() != null) {
to.setBaseType( from.getBaseType() );
}
to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() );
return to.build();
}

Expand Down Expand Up @@ -904,6 +907,7 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) {
to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() );
to.setBackoffScaleFactor( from.getBackoffScaleFactor() );
to.setBaseType( from.getBaseType() );
to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ message Task {
int32 iteration = 40;
string sub_workflow_id = 41;
bool subworkflow_changed = 42;
int64 first_start_time = 43;
}
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/taskdef.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ message TaskDef {
int32 poll_timeout_seconds = 19;
int32 backoff_scale_factor = 20;
string base_type = 21;
int64 total_timeout_seconds = 22;
}

0 comments on commit c67b2eb

Please sign in to comment.