Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Permissive task capability #3866

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum TaskType {
DYNAMIC,
FORK_JOIN,
FORK_JOIN_DYNAMIC,
PERMISSIVE,
DECISION,
SWITCH,
JOIN,
Expand Down Expand Up @@ -70,6 +71,7 @@ public enum TaskType {
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
public static final String TASK_TYPE_FORK = "FORK";
public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE";
public static final String TASK_TYPE_NOOP = "NOOP";

private static final Set<String> BUILT_IN_TASKS = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED;
import static com.netflix.conductor.model.TaskModel.Status.*;
Expand Down Expand Up @@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get());
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
outcome.tasksToBeUpdated.add(pendingTask);
} else {
} else if (!(pendingTask.getWorkflowTask() != null
&& TaskType.PERMISSIVE
.name()
.equals(pendingTask.getWorkflowTask().getType())
&& !pendingTask.getWorkflowTask().isOptional())) {
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
}
}
Expand Down Expand Up @@ -254,6 +259,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
if (hasSuccessfulTerminateTask
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
List<TaskModel> permissiveTasksTerminalNonSuccessful =
workflow.getTasks().stream()
.filter(t -> t.getWorkflowTask() != null)
.filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType()))
.filter(t -> !t.getWorkflowTask().isOptional())
.collect(
Collectors.toMap(
TaskModel::getReferenceTaskName,
t -> t,
(t1, t2) ->
t1.getRetryCount() > t2.getRetryCount()
? t1
: t2))
.values()
.stream()
.filter(
t ->
t.getStatus().isTerminal()
&& !t.getStatus().isSuccessful())
.toList();
if (!permissiveTasksTerminalNonSuccessful.isEmpty()) {
final String errMsg =
permissiveTasksTerminalNonSuccessful.stream()
.map(
t ->
String.format(
"Task %s failed with status: %s and reason: '%s'",
t.getTaskId(),
t.getStatus(),
t.getReasonForIncompletion()))
.collect(Collectors.joining(". "));
throw new TerminateWorkflowException(errMsg);
}
outcome.isComplete = true;
}

Expand Down Expand Up @@ -437,11 +475,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
if (status == null || !status.isTerminal()) {
return false;
}
// if we reach here, the task has been completed.
// Was the task successful in completion?
if (!status.isSuccessful()) {
return false;
}
}

boolean noPendingSchedule =
Expand Down Expand Up @@ -529,7 +562,9 @@ Optional<TaskModel> retry(
if (!task.getStatus().isRetriable()
|| TaskType.isBuiltIn(task.getTaskType())
|| expectedRetryCount <= retryCount) {
if (workflowTask != null && workflowTask.isOptional()) {
if (workflowTask != null
&& (workflowTask.isOptional()
|| TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) {
return Optional.empty();
}
WorkflowModel.Status status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) {
for (TaskModel task : workflow.getTasks()) {
switch (task.getStatus()) {
case FAILED:
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
|| task.getTaskType()
.equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) {
@SuppressWarnings("unchecked")
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow);
if (joinOnFailedPermissive) {
task.setStatus(IN_PROGRESS);
addTaskToQueue(task);
break;
}
}
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
retriableMap.put(task.getReferenceTaskName(), task);
Expand Down Expand Up @@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) {

LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
}

private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowModel workflow) {
return joinOn.stream()
.map(workflow::getTaskByRefName)
.anyMatch(
t ->
TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType())
&& !t.getWorkflowTask().isOptional()
&& t.getStatus().equals(FAILED));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
* TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}.
*/
@Component
public class PermissiveTaskMapper implements TaskMapper {

public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class);
private final ParametersUtils parametersUtils;

public PermissiveTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
}

@Override
public String getTaskType() {
return TaskType.PERMISSIVE.name();
}

/**
* This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link
* TaskModel}
*
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link
* WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId
* @return a List with just one exclusive task
* @throws TerminateWorkflowException In case if the task definition does not exist
*/
@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {

LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext);

WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
int retryCount = taskMapperContext.getRetryCount();
String retriedTaskId = taskMapperContext.getRetryTaskId();

TaskDef taskDefinition =
Optional.ofNullable(workflowTask.getTaskDefinition())
.orElseThrow(
() -> {
String reason =
String.format(
"Invalid task. Task %s does not have a definition",
workflowTask.getName());
return new TerminateWorkflowException(reason);
});

Map<String, Object> input =
parametersUtils.getTaskInput(
workflowTask.getInputParameters(),
workflowModel,
taskDefinition,
taskMapperContext.getTaskId());
TaskModel permissiveTask = taskMapperContext.createTaskModel();
permissiveTask.setTaskType(workflowTask.getName());
permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay());
permissiveTask.setInputData(input);
permissiveTask.setStatus(TaskModel.Status.SCHEDULED);
permissiveTask.setRetryCount(retryCount);
permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds());
permissiveTask.setRetriedTaskId(retriedTaskId);
permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
permissiveTask.setRateLimitFrequencyInSeconds(
taskDefinition.getRateLimitFrequencyInSeconds());
return List.of(permissiveTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;

@Component(TASK_TYPE_EXCLUSIVE_JOIN)
Expand Down Expand Up @@ -65,9 +66,20 @@ public boolean execute(
}
taskStatus = exclusiveTask.getStatus();
foundExlusiveJoinOnTask = taskStatus.isTerminal();
hasFailures = !taskStatus.isSuccessful();
hasFailures =
!taskStatus.isSuccessful()
&& (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType())
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;

@Component(TASK_TYPE_JOIN)
Expand Down Expand Up @@ -57,9 +58,21 @@ public boolean execute(
break;
}
TaskModel.Status taskStatus = forkedTask.getStatus();
hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional();
hasFailures =
!taskStatus.isSuccessful()
&& !forkedTask.getWorkflowTask().isOptional()
&& (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType())
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}
// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,80 @@ public void testOptional() {
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

/** Similar to {@link #testOptional} */
@Test
public void testPermissive() {
WorkflowDef def = new WorkflowDef();
def.setName("test-permissive");

WorkflowTask task1 = new WorkflowTask();
task1.setName("task0");
task1.setType("PERMISSIVE");
task1.setTaskReferenceName("t0");
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
task1.setTaskDefinition(new TaskDef("task0"));

WorkflowTask task2 = new WorkflowTask();
task2.setName("task1");
task2.setType("PERMISSIVE");
task2.setTaskReferenceName("t1");
task2.setTaskDefinition(new TaskDef("task1"));

def.getTasks().add(task1);
def.getTasks().add(task2);
def.setSchemaVersion(2);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());

for (int i = 0; i < 3; i++) {
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount());
}

String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task2.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

@Test
public void testOptionalWithDynamicFork() {
WorkflowDef def = new WorkflowDef();
Expand Down
Loading