Skip to content

Commit 0aa5695

Browse files
committed
Feature: Add WHILE Task
1 parent 2978701 commit 0aa5695

File tree

33 files changed

+3908
-58
lines changed

33 files changed

+3908
-58
lines changed

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java

+13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package com.netflix.conductor.common.metadata.tasks;
1414

1515
import java.util.HashSet;
16+
import java.util.Objects;
1617
import java.util.Set;
1718

1819
import com.netflix.conductor.annotations.protogen.ProtoEnum;
@@ -27,6 +28,7 @@ public enum TaskType {
2728
SWITCH,
2829
JOIN,
2930
DO_WHILE,
31+
WHILE,
3032
SUB_WORKFLOW,
3133
START_WORKFLOW,
3234
EVENT,
@@ -53,6 +55,7 @@ public enum TaskType {
5355
public static final String TASK_TYPE_DYNAMIC = "DYNAMIC";
5456
public static final String TASK_TYPE_JOIN = "JOIN";
5557
public static final String TASK_TYPE_DO_WHILE = "DO_WHILE";
58+
public static final String TASK_TYPE_WHILE = "WHILE";
5659
public static final String TASK_TYPE_FORK_JOIN_DYNAMIC = "FORK_JOIN_DYNAMIC";
5760
public static final String TASK_TYPE_EVENT = "EVENT";
5861
public static final String TASK_TYPE_WAIT = "WAIT";
@@ -81,6 +84,7 @@ public enum TaskType {
8184
BUILT_IN_TASKS.add(TASK_TYPE_JOIN);
8285
BUILT_IN_TASKS.add(TASK_TYPE_EXCLUSIVE_JOIN);
8386
BUILT_IN_TASKS.add(TASK_TYPE_DO_WHILE);
87+
BUILT_IN_TASKS.add(TASK_TYPE_WHILE);
8488
}
8589

8690
/**
@@ -104,4 +108,13 @@ public static TaskType of(String taskType) {
104108
public static boolean isBuiltIn(String taskType) {
105109
return BUILT_IN_TASKS.contains(taskType);
106110
}
111+
112+
public static boolean isLoopTask(String taskType) {
113+
return Objects.equals(TASK_TYPE_DO_WHILE, taskType)
114+
|| Objects.equals(TASK_TYPE_WHILE, taskType);
115+
}
116+
117+
public static boolean isLoopTask(TaskType taskType) {
118+
return taskType == DO_WHILE || taskType == WHILE;
119+
}
107120
}

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ public WorkflowTask getNextTask(String taskReferenceName) {
341341
WorkflowTask nextTask = task.next(taskReferenceName, null);
342342
if (nextTask != null) {
343343
return nextTask;
344-
} else if (TaskType.DO_WHILE.name().equals(task.getType())
344+
} else if (TaskType.isLoopTask(task.getType())
345345
&& !task.getTaskReferenceName().equals(taskReferenceName)
346346
&& task.has(taskReferenceName)) {
347347
// If the task is child of Loop Task and at last position, return null.

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ private Collection<List<WorkflowTask>> children() {
560560
workflowTaskLists.addAll(forkTasks);
561561
break;
562562
case DO_WHILE:
563+
case WHILE:
563564
workflowTaskLists.add(loopOver);
564565
break;
565566
default:
@@ -584,6 +585,7 @@ public WorkflowTask next(String taskReferenceName, WorkflowTask parent) {
584585

585586
switch (taskType) {
586587
case DO_WHILE:
588+
case WHILE:
587589
case DECISION:
588590
case SWITCH:
589591
for (List<WorkflowTask> workflowTasks : children()) {
@@ -605,13 +607,12 @@ public WorkflowTask next(String taskReferenceName, WorkflowTask parent) {
605607
return iterator.next();
606608
}
607609
}
608-
if (taskType == TaskType.DO_WHILE && this.has(taskReferenceName)) {
609-
// come here means this is DO_WHILE task and `taskReferenceName` is the last
610-
// task in
611-
// this DO_WHILE task, because DO_WHILE task need to be executed to decide
612-
// whether to
613-
// schedule next iteration, so we just return the DO_WHILE task, and then ignore
614-
// generating this task again in deciderService.getNextTask()
610+
if (TaskType.isLoopTask(taskType) && this.has(taskReferenceName)) {
611+
// come here means this is DO_WHILE/WHILE task and `taskReferenceName` is the
612+
// last task in this DO_WHILE/WHILE task, because DO_WHILE/WHILE task need to be
613+
// executed to decide whether to schedule next iteration, so we just return the
614+
// DO_WHILE/WHILE task, and then ignore generating this task again in
615+
// deciderService.getNextTask()
615616
return this;
616617
}
617618
break;
@@ -663,6 +664,7 @@ public boolean has(String taskReferenceName) {
663664
case DECISION:
664665
case SWITCH:
665666
case DO_WHILE:
667+
case WHILE:
666668
case FORK_JOIN:
667669
for (List<WorkflowTask> childx : children()) {
668670
for (WorkflowTask child : childx) {

core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
218218
pendingTask.setExecuted(true);
219219
List<TaskModel> nextTasks = getNextTask(workflow, pendingTask);
220220
if (pendingTask.isLoopOverTask()
221-
&& !TaskType.DO_WHILE.name().equals(pendingTask.getTaskType())
221+
&& !TaskType.isLoopTask(pendingTask.getTaskType())
222222
&& !nextTasks.isEmpty()) {
223223
nextTasks = filterNextLoopOverTasks(nextTasks, pendingTask, workflow);
224224
}
@@ -476,8 +476,8 @@ List<TaskModel> getNextTask(WorkflowModel workflow, TaskModel task) {
476476
while (isTaskSkipped(taskToSchedule, workflow)) {
477477
taskToSchedule = workflowDef.getNextTask(taskToSchedule.getTaskReferenceName());
478478
}
479-
if (taskToSchedule != null && TaskType.DO_WHILE.name().equals(taskToSchedule.getType())) {
480-
// check if already has this DO_WHILE task, ignore it if it already exists
479+
if (taskToSchedule != null && TaskType.isLoopTask(taskToSchedule.getType())) {
480+
// check if already has this DO_WHILE/WHILE task, ignore it if it already exists
481481
String nextTaskReferenceName = taskToSchedule.getTaskReferenceName();
482482
if (workflow.getTasks().stream()
483483
.anyMatch(

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ private void retry(WorkflowModel workflow) {
342342
break;
343343
case CANCELED:
344344
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
345-
|| task.getTaskType().equalsIgnoreCase(TaskType.DO_WHILE.toString())) {
345+
|| TaskType.isLoopTask(task.getTaskType())) {
346346
task.setStatus(IN_PROGRESS);
347347
addTaskToQueue(task);
348348
// Task doesn't have to be updated yet. Will be updated along with other
@@ -934,7 +934,7 @@ private void extendLease(TaskResult taskResult) {
934934
* Determines if a workflow can be lazily evaluated, if it meets any of these criteria
935935
*
936936
* <ul>
937-
* <li>The task is NOT a loop task within DO_WHILE
937+
* <li>The task is NOT a loop task within DO_WHILE or WHILE
938938
* <li>The task is one of the intermediate tasks in a branch within a FORK_JOIN
939939
* <li>The task is forked from a FORK_JOIN_DYNAMIC
940940
* </ul>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2023 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.core.execution.mapper;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.Optional;
18+
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.stereotype.Component;
23+
24+
import com.netflix.conductor.common.metadata.tasks.TaskDef;
25+
import com.netflix.conductor.common.metadata.tasks.TaskType;
26+
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
27+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
28+
import com.netflix.conductor.core.utils.ParametersUtils;
29+
import com.netflix.conductor.dao.MetadataDAO;
30+
import com.netflix.conductor.model.TaskModel;
31+
import com.netflix.conductor.model.WorkflowModel;
32+
33+
/**
34+
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
35+
* TaskType#WHILE} to a {@link TaskModel} of type {@link TaskType#WHILE}
36+
*/
37+
@Component
38+
public class WhileTaskMapper implements TaskMapper {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(WhileTaskMapper.class);
41+
42+
private final MetadataDAO metadataDAO;
43+
private final ParametersUtils parametersUtils;
44+
45+
@Autowired
46+
public WhileTaskMapper(MetadataDAO metadataDAO, ParametersUtils parametersUtils) {
47+
this.metadataDAO = metadataDAO;
48+
this.parametersUtils = parametersUtils;
49+
}
50+
51+
@Override
52+
public String getTaskType() {
53+
return TaskType.WHILE.name();
54+
}
55+
56+
/**
57+
* This method maps {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
58+
* TaskType#WHILE} to a {@link TaskModel} of type {@link TaskType#WHILE} with a status of {@link
59+
* TaskModel.Status#IN_PROGRESS}
60+
*
61+
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link
62+
* WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId
63+
* @return: A {@link TaskModel} of type {@link TaskType#WHILE} in a List
64+
*/
65+
@Override
66+
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
67+
LOGGER.debug("TaskMapperContext {} in WhileTaskMapper", taskMapperContext);
68+
69+
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
70+
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
71+
72+
TaskModel task = workflowModel.getTaskByRefName(workflowTask.getTaskReferenceName());
73+
if (task != null && task.getStatus().isTerminal()) {
74+
// Since loopTask is already completed no need to schedule task again.
75+
return List.of();
76+
}
77+
78+
TaskDef taskDefinition =
79+
Optional.ofNullable(taskMapperContext.getTaskDefinition())
80+
.orElseGet(
81+
() ->
82+
Optional.ofNullable(
83+
metadataDAO.getTaskDef(
84+
workflowTask.getName()))
85+
.orElseGet(TaskDef::new));
86+
87+
TaskModel whileTask = taskMapperContext.createTaskModel();
88+
whileTask.setTaskType(TaskType.TASK_TYPE_WHILE);
89+
whileTask.setStatus(TaskModel.Status.IN_PROGRESS);
90+
whileTask.setStartTime(System.currentTimeMillis());
91+
whileTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
92+
whileTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
93+
whileTask.setRetryCount(taskMapperContext.getRetryCount());
94+
95+
Map<String, Object> taskInput =
96+
parametersUtils.getTaskInputV2(
97+
workflowTask.getInputParameters(),
98+
workflowModel,
99+
whileTask.getTaskId(),
100+
taskDefinition);
101+
whileTask.setInputData(taskInput);
102+
return List.of(whileTask);
103+
}
104+
}

0 commit comments

Comments
 (0)