From 659baf620f8e7e5bafd00bb818921ac645099c45 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 25 Nov 2024 11:14:39 +0530 Subject: [PATCH 1/6] Code changes for introducing totalTimeOut and firstStartTime on taskDef and task --- .../constraints/TaskTimeoutConstraint.java | 13 ++++ .../conductor/common/metadata/tasks/Task.java | 77 +++++++++++++------ .../common/metadata/tasks/TaskDef.java | 19 ++++- .../conductor/common/tasks/TaskDefTest.java | 20 +++++ .../common/metadata/tasks/TaskDef.java | 14 +++- .../netflix/conductor/model/TaskModel.java | 3 + .../conductor/grpc/AbstractProtoMapper.java | 4 + grpc/src/main/proto/model/task.proto | 1 + grpc/src/main/proto/model/taskdef.proto | 1 + 9 files changed, 124 insertions(+), 28 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java index a9e6576f0..83840a5e9 100644 --- a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java +++ b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java @@ -66,6 +66,19 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) { } } + // Check if timeoutSeconds is greater than totalTimeoutSeconds + if (taskDef.getTimeoutSeconds() > 0 + && taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) { + valid = false; + String message = + String.format( + "TaskDef: %s timeoutSeconds: %d must be less than or equal to totalTimeoutSeconds: %d", + taskDef.getName(), + taskDef.getTimeoutSeconds(), + taskDef.getTotalTimeoutSeconds()); + context.buildConstraintViolationWithTemplate(message).addConstraintViolation(); + } + return valid; } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index 495ff06a9..5bc443fb0 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -97,19 +97,27 @@ public boolean isRetriable() { @ProtoField(id = 9) private String taskDefName; - /** Time when the task was scheduled */ + /** + * Time when the task was scheduled + */ @ProtoField(id = 10) private long scheduledTime; - /** Time when the task was first polled */ + /** + * Time when the task was first polled + */ @ProtoField(id = 11) private long startTime; - /** Time when the task completed executing */ + /** + * Time when the task completed executing + */ @ProtoField(id = 12) private long endTime; - /** Time when the task was last updated */ + /** + * Time when the task was last updated + */ @ProtoField(id = 13) private long updateTime; @@ -202,10 +210,14 @@ public boolean isRetriable() { @ProtoField(id = 42) private boolean subworkflowChanged; + @ProtoField(id = 43) + private long firstStartTime; + // If the task is an event associated with a parent task, the id of the parent task private String parentTaskId; - public Task() {} + public Task() { + } /** * @return Type of the task @@ -417,7 +429,7 @@ public void setRetried(boolean retried) { /** * @return True if the task has completed its lifecycle within conductor (from start to - * completion to being updated in the datastore) + * completion to being updated in the datastore) */ public boolean isExecuted() { return executed; @@ -479,7 +491,7 @@ public long getResponseTimeoutSeconds() { /** * @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the - * task will be re-queued + * task will be re-queued */ public void setResponseTimeoutSeconds(long responseTimeoutSeconds) { this.responseTimeoutSeconds = responseTimeoutSeconds; @@ -661,7 +673,7 @@ public String getExternalInputPayloadStoragePath() { /** * @param externalInputPayloadStoragePath the external storage path where the task input payload - * is stored + * is stored */ public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; @@ -676,7 +688,7 @@ public String getExternalOutputPayloadStoragePath() { /** * @param externalOutputPayloadStoragePath the external storage path where the task output - * payload is stored + * payload is stored */ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) { this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath; @@ -716,7 +728,9 @@ public boolean isLoopOverTask() { return iteration > 0; } - /** * @return the priority defined on workflow */ + /** + * @return the priority defined on workflow + */ public int getWorkflowPriority() { return workflowPriority; } @@ -744,8 +758,8 @@ public String getSubWorkflowId() { return this.getOutputData() != null && this.getOutputData().get("subWorkflowId") != null ? (String) this.getOutputData().get("subWorkflowId") : this.getInputData() != null - ? (String) this.getInputData().get("subWorkflowId") - : null; + ? (String) this.getInputData().get("subWorkflowId") + : null; } } @@ -765,6 +779,14 @@ public void setParentTaskId(String parentTaskId) { this.parentTaskId = parentTaskId; } + public long getFirstStartTime() { + return firstStartTime; + } + + public void setFirstStartTime(long firstStartTime) { + this.firstStartTime = firstStartTime; + } + public Task copy() { Task copy = new Task(); copy.setCallbackAfterSeconds(callbackAfterSeconds); @@ -798,17 +820,18 @@ public Task copy() { copy.setSubWorkflowId(getSubWorkflowId()); copy.setSubworkflowChanged(subworkflowChanged); copy.setParentTaskId(parentTaskId); + copy.setFirstStartTime(firstStartTime); return copy; } /** * @return a deep copy of the task instance To be used inside copy Workflow method to provide a - * valid deep copied object. Note: This does not copy the following fields: - * + * valid deep copied object. Note: This does not copy the following fields: + * */ public Task deepCopy() { Task deepCopy = copy(); @@ -819,6 +842,7 @@ public Task deepCopy() { deepCopy.setReasonForIncompletion(reasonForIncompletion); deepCopy.setSeq(seq); deepCopy.setParentTaskId(parentTaskId); + deepCopy.setFirstStartTime(firstStartTime); return deepCopy; } @@ -919,6 +943,9 @@ public String toString() { + ", subworkflowChanged='" + subworkflowChanged + '\'' + + ", firstStartTime='" + + firstStartTime + + '\'' + '}'; } @@ -966,14 +993,15 @@ && getWorkflowPriority() == task.getWorkflowPriority() && Objects.equals(getInputMessage(), task.getInputMessage()) && Objects.equals(getOutputMessage(), task.getOutputMessage()) && Objects.equals( - getExternalInputPayloadStoragePath(), - task.getExternalInputPayloadStoragePath()) + getExternalInputPayloadStoragePath(), + task.getExternalInputPayloadStoragePath()) && Objects.equals( - getExternalOutputPayloadStoragePath(), - task.getExternalOutputPayloadStoragePath()) + getExternalOutputPayloadStoragePath(), + task.getExternalOutputPayloadStoragePath()) && Objects.equals(getIsolationGroupId(), task.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace()) - && Objects.equals(getParentTaskId(), task.getParentTaskId()); + && Objects.equals(getParentTaskId(), task.getParentTaskId()) + && Objects.equals(getFirstStartTime(), task.getFirstStartTime()); } @Override @@ -1016,6 +1044,7 @@ public int hashCode() { getExternalOutputPayloadStoragePath(), getIsolationGroupId(), getExecutionNameSpace(), - getParentTaskId()); + getParentTaskId(), + getFirstStartTime()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index 7e4357604..da77a678e 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -30,6 +30,7 @@ import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.bind.DefaultValue; @ProtoMessage @TaskTimeoutConstraint @@ -127,6 +128,10 @@ public enum RetryLogic { @ProtoField(id = 21) private String baseType; + @ProtoField(id = 22) + @NotNull + private long totalTimeoutSeconds; + private SchemaDef inputSchema; private SchemaDef outputSchema; private boolean enforceSchema; @@ -464,6 +469,14 @@ public void setEnforceSchema(boolean enforceSchema) { this.enforceSchema = enforceSchema; } + public long getTotalTimeoutSeconds() { + return totalTimeoutSeconds; + } + + public void setTotalTimeoutSeconds(@NotNull long totalTimeoutSeconds) { + this.totalTimeoutSeconds = totalTimeoutSeconds; + } + @Override public String toString() { return name; @@ -497,7 +510,8 @@ && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) - && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); + && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()) + && Objects.equals(getTotalTimeoutSeconds(), taskDef.getTotalTimeoutSeconds()); } @Override @@ -523,6 +537,7 @@ public int hashCode() { getOwnerEmail(), getBaseType(), getInputSchema(), - getOutputSchema()); + getOutputSchema(), + getTotalTimeoutSeconds()); } } diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java index a46cf7d5c..987bcdbca 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java @@ -74,6 +74,26 @@ public void testTaskDef() { assertTrue(validationErrors.contains("ownerEmail cannot be empty")); } + @Test + public void testTaskDefTotalTimeOutSeconds() { + TaskDef taskDef = new TaskDef(); + taskDef.setName("test-task"); + taskDef.setRetryCount(1); + taskDef.setTimeoutSeconds(1000); + taskDef.setResponseTimeoutSeconds(1); + taskDef.setOwnerEmail("blah@gmail.com"); + + Set> result = validator.validate(taskDef); + assertEquals(1, result.size()); + + List validationErrors = new ArrayList<>(); + result.forEach(e -> validationErrors.add(e.getMessage())); + + assertTrue( + validationErrors.toString(), + validationErrors.contains("TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 0")); + } + @Test public void testTaskDefInvalidEmail() { TaskDef taskDef = new TaskDef(); diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index d4bfc0fc1..0bdd0f2d9 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -47,6 +47,8 @@ public enum RetryLogic { private long timeoutSeconds; + private long totalTimeoutSeconds; + private List inputKeys = new ArrayList<>(); private List outputKeys = new ArrayList<>(); @@ -172,6 +174,14 @@ public void setTimeoutSeconds(long timeoutSeconds) { this.timeoutSeconds = timeoutSeconds; } + public long getTotalTimeoutSeconds() { + return totalTimeoutSeconds; + } + + public void setTotalTimeoutSeconds(long totalTimeoutSeconds) { + this.totalTimeoutSeconds = totalTimeoutSeconds; + } + /** * @return Returns the input keys */ @@ -428,10 +438,10 @@ public boolean equals(Object o) { return false; } TaskDef taskDef = (TaskDef) o; - return getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); + return getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && getTotalTimeoutSeconds() == taskDef.getTotalTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); } public int hashCode() { - return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema()); + return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getTotalTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema()); } } diff --git a/core/src/main/java/com/netflix/conductor/model/TaskModel.java b/core/src/main/java/com/netflix/conductor/model/TaskModel.java index 122c31b5b..b169998cf 100644 --- a/core/src/main/java/com/netflix/conductor/model/TaskModel.java +++ b/core/src/main/java/com/netflix/conductor/model/TaskModel.java @@ -94,6 +94,9 @@ public boolean isRetriable() { /** Time when the task was last updated */ private long updateTime; + /** Time when first task started */ + private long firstStartTime; + private int startDelayInSeconds; private String retriedTaskId; diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 47ccc9d8b..d3d37246d 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -729,6 +729,7 @@ public TaskPb.Task toProto(Task from) { to.setSubWorkflowId( from.getSubWorkflowId() ); } to.setSubworkflowChanged( from.isSubworkflowChanged() ); + to.setFirstStartTime( from.getFirstStartTime() ); return to.build(); } @@ -788,6 +789,7 @@ public Task fromProto(TaskPb.Task from) { to.setIteration( from.getIteration() ); to.setSubWorkflowId( from.getSubWorkflowId() ); to.setSubworkflowChanged( from.getSubworkflowChanged() ); + to.setFirstStartTime( from.getFirstStartTime() ); return to; } @@ -875,6 +877,7 @@ public TaskDefPb.TaskDef toProto(TaskDef from) { if (from.getBaseType() != null) { to.setBaseType( from.getBaseType() ); } + to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() ); return to.build(); } @@ -904,6 +907,7 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) { to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() ); to.setBackoffScaleFactor( from.getBackoffScaleFactor() ); to.setBaseType( from.getBaseType() ); + to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() ); return to; } diff --git a/grpc/src/main/proto/model/task.proto b/grpc/src/main/proto/model/task.proto index 410aa0a06..0df2f9b0f 100644 --- a/grpc/src/main/proto/model/task.proto +++ b/grpc/src/main/proto/model/task.proto @@ -61,4 +61,5 @@ message Task { int32 iteration = 40; string sub_workflow_id = 41; bool subworkflow_changed = 42; + int64 first_start_time = 43; } diff --git a/grpc/src/main/proto/model/taskdef.proto b/grpc/src/main/proto/model/taskdef.proto index e531bcfec..28046426f 100644 --- a/grpc/src/main/proto/model/taskdef.proto +++ b/grpc/src/main/proto/model/taskdef.proto @@ -38,4 +38,5 @@ message TaskDef { int32 poll_timeout_seconds = 19; int32 backoff_scale_factor = 20; string base_type = 21; + int64 total_timeout_seconds = 22; } From 3f099e474848ed6445a392b799efcfeeb4c020eb Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 25 Nov 2024 11:18:15 +0530 Subject: [PATCH 2/6] formatting changes --- .../conductor/common/metadata/tasks/Task.java | 51 ++++++++----------- .../common/metadata/tasks/TaskDef.java | 1 - .../conductor/common/tasks/TaskDefTest.java | 3 +- 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index 5bc443fb0..2c948c3f9 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -97,27 +97,19 @@ public boolean isRetriable() { @ProtoField(id = 9) private String taskDefName; - /** - * Time when the task was scheduled - */ + /** Time when the task was scheduled */ @ProtoField(id = 10) private long scheduledTime; - /** - * Time when the task was first polled - */ + /** Time when the task was first polled */ @ProtoField(id = 11) private long startTime; - /** - * Time when the task completed executing - */ + /** Time when the task completed executing */ @ProtoField(id = 12) private long endTime; - /** - * Time when the task was last updated - */ + /** Time when the task was last updated */ @ProtoField(id = 13) private long updateTime; @@ -216,8 +208,7 @@ public boolean isRetriable() { // If the task is an event associated with a parent task, the id of the parent task private String parentTaskId; - public Task() { - } + public Task() {} /** * @return Type of the task @@ -429,7 +420,7 @@ public void setRetried(boolean retried) { /** * @return True if the task has completed its lifecycle within conductor (from start to - * completion to being updated in the datastore) + * completion to being updated in the datastore) */ public boolean isExecuted() { return executed; @@ -491,7 +482,7 @@ public long getResponseTimeoutSeconds() { /** * @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the - * task will be re-queued + * task will be re-queued */ public void setResponseTimeoutSeconds(long responseTimeoutSeconds) { this.responseTimeoutSeconds = responseTimeoutSeconds; @@ -673,7 +664,7 @@ public String getExternalInputPayloadStoragePath() { /** * @param externalInputPayloadStoragePath the external storage path where the task input payload - * is stored + * is stored */ public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; @@ -688,7 +679,7 @@ public String getExternalOutputPayloadStoragePath() { /** * @param externalOutputPayloadStoragePath the external storage path where the task output - * payload is stored + * payload is stored */ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) { this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath; @@ -758,8 +749,8 @@ public String getSubWorkflowId() { return this.getOutputData() != null && this.getOutputData().get("subWorkflowId") != null ? (String) this.getOutputData().get("subWorkflowId") : this.getInputData() != null - ? (String) this.getInputData().get("subWorkflowId") - : null; + ? (String) this.getInputData().get("subWorkflowId") + : null; } } @@ -826,12 +817,12 @@ public Task copy() { /** * @return a deep copy of the task instance To be used inside copy Workflow method to provide a - * valid deep copied object. Note: This does not copy the following fields: - *
    - *
  • retried - *
  • updateTime - *
  • retriedTaskId - *
+ * valid deep copied object. Note: This does not copy the following fields: + *
    + *
  • retried + *
  • updateTime + *
  • retriedTaskId + *
*/ public Task deepCopy() { Task deepCopy = copy(); @@ -993,11 +984,11 @@ && getWorkflowPriority() == task.getWorkflowPriority() && Objects.equals(getInputMessage(), task.getInputMessage()) && Objects.equals(getOutputMessage(), task.getOutputMessage()) && Objects.equals( - getExternalInputPayloadStoragePath(), - task.getExternalInputPayloadStoragePath()) + getExternalInputPayloadStoragePath(), + task.getExternalInputPayloadStoragePath()) && Objects.equals( - getExternalOutputPayloadStoragePath(), - task.getExternalOutputPayloadStoragePath()) + getExternalOutputPayloadStoragePath(), + task.getExternalOutputPayloadStoragePath()) && Objects.equals(getIsolationGroupId(), task.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace()) && Objects.equals(getParentTaskId(), task.getParentTaskId()) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index da77a678e..b5a9d071b 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -30,7 +30,6 @@ import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; -import org.springframework.boot.context.properties.bind.DefaultValue; @ProtoMessage @TaskTimeoutConstraint diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java index 987bcdbca..ab2026540 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java @@ -91,7 +91,8 @@ public void testTaskDefTotalTimeOutSeconds() { assertTrue( validationErrors.toString(), - validationErrors.contains("TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 0")); + validationErrors.contains( + "TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 0")); } @Test From 28f7cfa5ce06786a65a0ab44833acfb804cf850e Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 25 Nov 2024 14:10:06 +0530 Subject: [PATCH 3/6] modified constraints --- .../conductor/common/constraints/TaskTimeoutConstraint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java index 83840a5e9..ed7c93d86 100644 --- a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java +++ b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java @@ -67,7 +67,7 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) { } // Check if timeoutSeconds is greater than totalTimeoutSeconds - if (taskDef.getTimeoutSeconds() > 0 + if (taskDef.getTimeoutSeconds() > 0 && taskDef.getTotalTimeoutSeconds() > 0 && taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) { valid = false; String message = From f9784677e066a39dbfe65721e6cd65e8231a4a98 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 25 Nov 2024 14:14:58 +0530 Subject: [PATCH 4/6] fixed UT --- .../conductor/common/constraints/TaskTimeoutConstraint.java | 3 ++- .../java/com/netflix/conductor/common/tasks/TaskDefTest.java | 3 ++- .../test/java/com/netflix/conductor/common/tasks/TaskTest.java | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java index ed7c93d86..43a7cd8ba 100644 --- a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java +++ b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java @@ -67,7 +67,8 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) { } // Check if timeoutSeconds is greater than totalTimeoutSeconds - if (taskDef.getTimeoutSeconds() > 0 && taskDef.getTotalTimeoutSeconds() > 0 + if (taskDef.getTimeoutSeconds() > 0 + && taskDef.getTotalTimeoutSeconds() > 0 && taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) { valid = false; String message = diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java index ab2026540..b3f4b9932 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java @@ -80,6 +80,7 @@ public void testTaskDefTotalTimeOutSeconds() { taskDef.setName("test-task"); taskDef.setRetryCount(1); taskDef.setTimeoutSeconds(1000); + taskDef.setTotalTimeoutSeconds(900); taskDef.setResponseTimeoutSeconds(1); taskDef.setOwnerEmail("blah@gmail.com"); @@ -92,7 +93,7 @@ public void testTaskDefTotalTimeOutSeconds() { assertTrue( validationErrors.toString(), validationErrors.contains( - "TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 0")); + "TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 900")); } @Test diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java index 402fcfcb0..f1a71bfdb 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java @@ -98,7 +98,7 @@ public void testDeepCopyTask() { final Task task = new Task(); // In order to avoid forgetting putting inside the copy method the newly added fields check // the number of declared fields. - final int expectedTaskFieldsNumber = 41; + final int expectedTaskFieldsNumber = 42; final int declaredFieldsNumber = task.getClass().getDeclaredFields().length; assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber); From 93362c06a566757a0d55eb78a40931ec9d5fe853 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Tue, 26 Nov 2024 20:21:24 +0530 Subject: [PATCH 5/6] Removed code changes from sdk v4 --- .../conductor/common/metadata/tasks/TaskDef.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index 0bdd0f2d9..d4bfc0fc1 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -47,8 +47,6 @@ public enum RetryLogic { private long timeoutSeconds; - private long totalTimeoutSeconds; - private List inputKeys = new ArrayList<>(); private List outputKeys = new ArrayList<>(); @@ -174,14 +172,6 @@ public void setTimeoutSeconds(long timeoutSeconds) { this.timeoutSeconds = timeoutSeconds; } - public long getTotalTimeoutSeconds() { - return totalTimeoutSeconds; - } - - public void setTotalTimeoutSeconds(long totalTimeoutSeconds) { - this.totalTimeoutSeconds = totalTimeoutSeconds; - } - /** * @return Returns the input keys */ @@ -438,10 +428,10 @@ public boolean equals(Object o) { return false; } TaskDef taskDef = (TaskDef) o; - return getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && getTotalTimeoutSeconds() == taskDef.getTotalTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); + return getRetryCount() == taskDef.getRetryCount() && getTimeoutSeconds() == taskDef.getTimeoutSeconds() && getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() && getBackoffScaleFactor() == taskDef.getBackoffScaleFactor() && getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() && Objects.equals(getName(), taskDef.getName()) && Objects.equals(getDescription(), taskDef.getDescription()) && Objects.equals(getInputKeys(), taskDef.getInputKeys()) && Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) && getTimeoutPolicy() == taskDef.getTimeoutPolicy() && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) && Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); } public int hashCode() { - return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getTotalTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema()); + return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(), getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(), getBackoffScaleFactor(), getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), getBaseType(), getInputSchema(), getOutputSchema()); } } From ccbcbda99cb3206618efcd1d42dc3c606800f4c9 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Wed, 27 Nov 2024 10:31:47 +0530 Subject: [PATCH 6/6] nit changes --- .../com/netflix/conductor/common/metadata/tasks/TaskDef.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index b5a9d071b..b96c54245 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -472,7 +472,7 @@ public long getTotalTimeoutSeconds() { return totalTimeoutSeconds; } - public void setTotalTimeoutSeconds(@NotNull long totalTimeoutSeconds) { + public void setTotalTimeoutSeconds(long totalTimeoutSeconds) { this.totalTimeoutSeconds = totalTimeoutSeconds; }