Skip to content

Commit

Permalink
Merge pull request #274 from orkes-io/workflow-task-input-schema
Browse files Browse the repository at this point in the history
Added inputSchema for WorkflowDef and TaskDef
  • Loading branch information
villat authored Mar 20, 2024
2 parents 3813331 + 2873669 commit 4afae94
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.common;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

/** Schema for {@link WorkflowDef} and {@link TaskDef} */
public class Schema {

private String name;
private Integer version;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getVersion() {
return version;
}

public void setVersion(Integer version) {
this.version = version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.conductor.common.constraints.OwnerEmailValidConstraint;
import com.netflix.conductor.common.constraints.TaskTimeoutConstraint;
import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.common.Schema;

@ProtoMessage
@TaskTimeoutConstraint
Expand Down Expand Up @@ -128,6 +129,9 @@ public enum RetryLogic {
@ProtoField(id = 21)
private String baseType;

private Schema inputSchema;
private Schema outputSchema;

public TaskDef() {}

public TaskDef(String name) {
Expand Down Expand Up @@ -437,6 +441,22 @@ public void setBaseType(String baseType) {
this.baseType = baseType;
}

public Schema getInputSchema() {
return inputSchema;
}

public void setInputSchema(Schema inputSchema) {
this.inputSchema = inputSchema;
}

public Schema getOutputSchema() {
return outputSchema;
}

public void setOutputSchema(Schema outputSchema) {
this.outputSchema = outputSchema;
}

@Override
public String toString() {
return name;
Expand Down Expand Up @@ -468,7 +488,9 @@ && getRetryLogic() == taskDef.getRetryLogic()
&& Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId())
&& Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace())
&& Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail())
&& Objects.equals(getBaseType(), taskDef.getBaseType());
&& Objects.equals(getBaseType(), taskDef.getBaseType())
&& Objects.equals(getInputSchema(), taskDef.getInputSchema())
&& Objects.equals(getOutputSchema(), taskDef.getOutputSchema());
}

@Override
Expand All @@ -492,6 +514,8 @@ public int hashCode() {
getIsolationGroupId(),
getExecutionNameSpace(),
getOwnerEmail(),
getBaseType());
getBaseType(),
getInputSchema(),
getOutputSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.conductor.common.constraints.OwnerEmailValidConstraint;
import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint;
import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.common.Schema;
import com.netflix.conductor.common.metadata.tasks.TaskType;

@ProtoMessage
Expand Down Expand Up @@ -101,6 +102,10 @@ public enum TimeoutPolicy {

private RateLimitConfig rateLimitConfig;

private Schema inputSchema;

private Schema outputSchema;

/**
* @return the name
*/
Expand Down Expand Up @@ -337,6 +342,22 @@ public void setRateLimitConfig(RateLimitConfig rateLimitConfig) {
this.rateLimitConfig = rateLimitConfig;
}

public Schema getInputSchema() {
return inputSchema;
}

public void setInputSchema(Schema inputSchema) {
this.inputSchema = inputSchema;
}

public Schema getOutputSchema() {
return outputSchema;
}

public void setOutputSchema(Schema outputSchema) {
this.outputSchema = outputSchema;
}

public boolean containsType(String taskType) {
return collectTasks().stream().anyMatch(t -> t.getType().equals(taskType));
}
Expand Down Expand Up @@ -409,7 +430,9 @@ && getSchemaVersion() == that.getSchemaVersion()
&& Objects.equals(getOutputParameters(), that.getOutputParameters())
&& Objects.equals(getFailureWorkflow(), that.getFailureWorkflow())
&& Objects.equals(getOwnerEmail(), that.getOwnerEmail())
&& Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds());
&& Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds())
&& Objects.equals(getInputSchema(), that.getInputSchema())
&& Objects.equals(getOutputSchema(), that.getOutputSchema());
}

@Override
Expand All @@ -424,7 +447,9 @@ public int hashCode() {
getFailureWorkflow(),
getSchemaVersion(),
getOwnerEmail(),
getTimeoutSeconds());
getTimeoutSeconds(),
getInputSchema(),
getOutputSchema());
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import com.netflix.conductor.common.metadata.common.Schema;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.StateChangeEvent;
Expand Down Expand Up @@ -158,6 +159,7 @@ public boolean isRetriable() {
/** Id of the parent task when *this* task is an event associated with the task */
private String parentTaskId;

private Schema outputSchema;
private @Valid Map<String, List<StateChangeEvent>> onStateChange;

@JsonIgnore private Map<String, Object> inputPayload = new HashMap<>();
Expand Down Expand Up @@ -695,4 +697,12 @@ public String getParentTaskId() {
public void setParentTaskId(String parentTaskId) {
this.parentTaskId = parentTaskId;
}

public Schema getOutputSchema() {
return outputSchema;
}

public void setOutputSchema(Schema outputSchema) {
this.outputSchema = outputSchema;
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import com.netflix.conductor.common.metadata.common.Schema;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.utils.Utils;
Expand Down Expand Up @@ -106,6 +107,7 @@ public boolean isSuccessful() {
private String rateLimitKey;
private boolean rateLimited;
private Map<String, Object> systemMetadata = new HashMap<>();
private Schema outputSchema;

private List<WorkflowModel> history = new LinkedList<>();

Expand Down Expand Up @@ -485,6 +487,14 @@ public void setSystemMetadata(Map<String, Object> systemMetadata) {
this.systemMetadata = systemMetadata;
}

public Schema getOutputSchema() {
return outputSchema;
}

public void setOutputSchema(Schema outputSchema) {
this.outputSchema = outputSchema;
}

public void externalizeInput(String path) {
this.inputPayload = this.input;
this.input = new HashMap<>();
Expand Down

0 comments on commit 4afae94

Please sign in to comment.