diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/common/Schema.java b/common/src/main/java/com/netflix/conductor/common/metadata/common/Schema.java new file mode 100644 index 0000000000..878015a3a0 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/common/Schema.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.common; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; + +/** Schema for {@link WorkflowDef} and {@link TaskDef} */ +public class Schema { + + private String name; + private Integer version; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index 0486f1dd07..84020e5776 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -30,6 +30,7 @@ import com.netflix.conductor.common.constraints.OwnerEmailValidConstraint; import com.netflix.conductor.common.constraints.TaskTimeoutConstraint; import com.netflix.conductor.common.metadata.Auditable; +import com.netflix.conductor.common.metadata.common.Schema; @ProtoMessage @TaskTimeoutConstraint @@ -128,6 +129,9 @@ public enum RetryLogic { @ProtoField(id = 21) private String baseType; + private Schema inputSchema; + private Schema outputSchema; + public TaskDef() {} public TaskDef(String name) { @@ -437,6 +441,22 @@ public void setBaseType(String baseType) { this.baseType = baseType; } + public Schema getInputSchema() { + return inputSchema; + } + + public void setInputSchema(Schema inputSchema) { + this.inputSchema = inputSchema; + } + + public Schema getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(Schema outputSchema) { + this.outputSchema = outputSchema; + } + @Override public String toString() { return name; @@ -468,7 +488,9 @@ && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) - && Objects.equals(getBaseType(), taskDef.getBaseType()); + && Objects.equals(getBaseType(), taskDef.getBaseType()) + && Objects.equals(getInputSchema(), taskDef.getInputSchema()) + && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); } @Override @@ -492,6 +514,8 @@ public int hashCode() { getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail(), - getBaseType()); + getBaseType(), + getInputSchema(), + getOutputSchema()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index 1cd9a8fe9d..15d8cb813b 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -28,6 +28,7 @@ import com.netflix.conductor.common.constraints.OwnerEmailValidConstraint; import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint; import com.netflix.conductor.common.metadata.Auditable; +import com.netflix.conductor.common.metadata.common.Schema; import com.netflix.conductor.common.metadata.tasks.TaskType; @ProtoMessage @@ -101,6 +102,10 @@ public enum TimeoutPolicy { private RateLimitConfig rateLimitConfig; + private Schema inputSchema; + + private Schema outputSchema; + /** * @return the name */ @@ -337,6 +342,22 @@ public void setRateLimitConfig(RateLimitConfig rateLimitConfig) { this.rateLimitConfig = rateLimitConfig; } + public Schema getInputSchema() { + return inputSchema; + } + + public void setInputSchema(Schema inputSchema) { + this.inputSchema = inputSchema; + } + + public Schema getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(Schema outputSchema) { + this.outputSchema = outputSchema; + } + public boolean containsType(String taskType) { return collectTasks().stream().anyMatch(t -> t.getType().equals(taskType)); } @@ -409,7 +430,9 @@ && getSchemaVersion() == that.getSchemaVersion() && Objects.equals(getOutputParameters(), that.getOutputParameters()) && Objects.equals(getFailureWorkflow(), that.getFailureWorkflow()) && Objects.equals(getOwnerEmail(), that.getOwnerEmail()) - && Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds()); + && Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds()) + && Objects.equals(getInputSchema(), that.getInputSchema()) + && Objects.equals(getOutputSchema(), that.getOutputSchema()); } @Override @@ -424,7 +447,9 @@ public int hashCode() { getFailureWorkflow(), getSchemaVersion(), getOwnerEmail(), - getTimeoutSeconds()); + getTimeoutSeconds(), + getInputSchema(), + getOutputSchema()); } @Override diff --git a/core/src/main/java/com/netflix/conductor/model/TaskModel.java b/core/src/main/java/com/netflix/conductor/model/TaskModel.java index 95a359ae6f..0c837ff991 100644 --- a/core/src/main/java/com/netflix/conductor/model/TaskModel.java +++ b/core/src/main/java/com/netflix/conductor/model/TaskModel.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; +import com.netflix.conductor.common.metadata.common.Schema; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.StateChangeEvent; @@ -158,6 +159,7 @@ public boolean isRetriable() { /** Id of the parent task when *this* task is an event associated with the task */ private String parentTaskId; + private Schema outputSchema; private @Valid Map> onStateChange; @JsonIgnore private Map inputPayload = new HashMap<>(); @@ -695,4 +697,12 @@ public String getParentTaskId() { public void setParentTaskId(String parentTaskId) { this.parentTaskId = parentTaskId; } + + public Schema getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(Schema outputSchema) { + this.outputSchema = outputSchema; + } } diff --git a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java index 4f42f2e0af..f223d2cfd1 100644 --- a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java +++ b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java @@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; +import com.netflix.conductor.common.metadata.common.Schema; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.utils.Utils; @@ -106,6 +107,7 @@ public boolean isSuccessful() { private String rateLimitKey; private boolean rateLimited; private Map systemMetadata = new HashMap<>(); + private Schema outputSchema; private List history = new LinkedList<>(); @@ -485,6 +487,14 @@ public void setSystemMetadata(Map systemMetadata) { this.systemMetadata = systemMetadata; } + public Schema getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(Schema outputSchema) { + this.outputSchema = outputSchema; + } + public void externalizeInput(String path) { this.inputPayload = this.input; this.input = new HashMap<>();