Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #194 from Netflix/dev
Browse files Browse the repository at this point in the history
Bug fix for dynamic task retries
  • Loading branch information
v1r3n authored May 11, 2017
2 parents 34354be + 190533f commit b883407
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 13 deletions.
1 change: 0 additions & 1 deletion buildViaTravis.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/bin/bash
# This script will build the project.

if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
echo -e "Build Pull Request #$TRAVIS_PULL_REQUEST => Branch [$TRAVIS_BRANCH]"
./gradlew build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li
}
if (workflowTask != null && workflowTask.isOptional()) {
task.setStatus(Status.COMPLETED_WITH_ERRORS);
//outcome.tasksToBeUpdated.add(task);
} else {
Task rt = retry(taskDef, workflowTask, task, workflow);
tasksToBeScheduled.put(rt.getReferenceTaskName(), rt);
Expand Down Expand Up @@ -321,10 +320,11 @@ private Task retry(TaskDef taskDef, WorkflowTask workflowTask, Task task, Workfl
rescheduled.setRetriedTaskId(task.getTaskId());
rescheduled.setStatus(Status.SCHEDULED);
rescheduled.setPollCount(0);

rescheduled.setInputData(new HashMap<>());
rescheduled.getInputData().putAll(task.getInputData());
if(workflowTask != null && workflow.getSchemaVersion() > 1) {
Map<String, Object> taskInput = pu.getTaskInputV2(workflowTask.getInputParameters(), workflow, rescheduled.getTaskId(), taskDef);
rescheduled.setInputData(taskInput);
rescheduled.getInputData().putAll(taskInput);
}
//for the schema version 1, we do not have to recompute the inputs
return rescheduled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
Expand All @@ -56,13 +58,23 @@ public class TestDeciderOutcomes {

private DeciderService ds;

private ObjectMapper om = new ObjectMapper();
private static ObjectMapper om = new ObjectMapper();

static {
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
om.setSerializationInclusion(Include.NON_NULL);
om.setSerializationInclusion(Include.NON_EMPTY);
}


@Before
public void init() throws Exception {

MetadataDAO metadata = mock(MetadataDAO.class);
TaskDef td = new TaskDef();
td.setRetryCount(1);
when(metadata.getTaskDef(any())).thenReturn(td);
this.ds = new DeciderService(metadata, om);
}
Expand Down Expand Up @@ -107,35 +119,102 @@ public void testRetries() {
task.setType("USER_TASK");
task.setTaskReferenceName("t0");
task.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
task.getInputParameters().put("requestId", "${workflow.input.requestId}");

def.getTasks().add(task);
def.setSchemaVersion(2);


Workflow workflow = new Workflow();
workflow.getInput().put("requestId", 123);
workflow.setStartTime(System.currentTimeMillis());
DeciderOutcome outcome = ds.decide(workflow, def);
assertNotNull(outcome);

System.out.println(outcome.tasksToBeScheduled);
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals(task.getTaskReferenceName(), outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
System.out.println(outcome.tasksToBeScheduled.get(0).getInputData());

String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));
assertEquals(123, outcome.tasksToBeScheduled.get(0).getInputData().get("requestId"));

outcome.tasksToBeScheduled.get(0).setStatus(Status.FAILED);
workflow.getTasks().addAll(outcome.tasksToBeScheduled);

outcome = ds.decide(workflow, def);
assertNotNull(outcome);
System.out.println(outcome.tasksToBeScheduled);
System.out.println(outcome.tasksToBeUpdated);

assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertNotSame(task1Id, outcome.tasksToBeScheduled.get(0).getTaskId());
assertEquals(outcome.tasksToBeScheduled.get(0).getTaskId(), outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getRetriedTaskId());
assertEquals(123, outcome.tasksToBeScheduled.get(0).getInputData().get("requestId"));


WorkflowTask fork = new WorkflowTask();
fork.setName("fork0");
fork.setWorkflowTaskType(Type.FORK_JOIN_DYNAMIC);
fork.setTaskReferenceName("fork0");
fork.setDynamicForkTasksInputParamName("forkedInputs");
fork.setDynamicForkTasksParam("forks");
fork.getInputParameters().put("forks", "${workflow.input.forks}");
fork.getInputParameters().put("forkedInputs", "${workflow.input.forkedInputs}");

WorkflowTask join = new WorkflowTask();
join.setName("join0");
join.setType("JOIN");
join.setTaskReferenceName("join0");

def.getTasks().clear();
def.getTasks().add(fork);
def.getTasks().add(join);

List<WorkflowTask> forks = new LinkedList<>();
Map<String, Map<String, Object>> forkedInputs = new HashMap<>();

for(int i = 0; i < 1; i++) {
WorkflowTask wft = new WorkflowTask();
wft.setName("f" + i);
wft.setTaskReferenceName("f" + i);
wft.setWorkflowTaskType(Type.SIMPLE);
wft.getInputParameters().put("requestId", "${workflow.input.requestId}");
wft.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
forks.add(wft);
Map<String, Object> input = new HashMap<>();
input.put("k", "v");
input.put("k1", 1);
forkedInputs.put(wft.getTaskReferenceName(), input);
}
workflow = new Workflow();
workflow.getInput().put("requestId", 123);
workflow.setStartTime(System.currentTimeMillis());

workflow.getInput().put("forks", forks);
workflow.getInput().put("forkedInputs", forkedInputs);

outcome = ds.decide(workflow, def);
assertNotNull(outcome);
assertEquals(3, outcome.tasksToBeScheduled.size());
assertEquals(0, outcome.tasksToBeUpdated.size());

assertEquals("v", outcome.tasksToBeScheduled.get(1).getInputData().get("k"));
assertEquals(1, outcome.tasksToBeScheduled.get(1).getInputData().get("k1"));
assertEquals(outcome.tasksToBeScheduled.get(1).getTaskId(), outcome.tasksToBeScheduled.get(1).getInputData().get("taskId"));
System.out.println(outcome.tasksToBeScheduled.get(1).getInputData());
task1Id = outcome.tasksToBeScheduled.get(1).getTaskId();

outcome.tasksToBeScheduled.get(1).setStatus(Status.FAILED);
workflow.getTasks().addAll(outcome.tasksToBeScheduled);

outcome = ds.decide(workflow, def);
assertEquals("v", outcome.tasksToBeScheduled.get(1).getInputData().get("k"));
assertEquals(1, outcome.tasksToBeScheduled.get(1).getInputData().get("k1"));
assertEquals(outcome.tasksToBeScheduled.get(1).getTaskId(), outcome.tasksToBeScheduled.get(1).getInputData().get("taskId"));
assertNotSame(task1Id, outcome.tasksToBeScheduled.get(1).getTaskId());
assertEquals(task1Id, outcome.tasksToBeScheduled.get(1).getRetriedTaskId());
System.out.println(outcome.tasksToBeScheduled.get(1).getInputData());

}

@Test
Expand Down
8 changes: 7 additions & 1 deletion ui/src/api/wfe.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ router.get('/id/:workflowId', async (req, res, next) => {
const subworkflows = {};
result.tasks.forEach(task=>{
if(task.taskType == 'SUB_WORKFLOW'){
subs.push({name: task.inputData.subWorkflowName, version: task.inputData.subWorkflowVersion, referenceTaskName: task.referenceTaskName, subWorkflowId: task.outputData.subWorkflowId});
let subWorkflowId = task.outputData.subWorkflowId;
if(subWorkflowId == null) {
subWorkflowId = task.inputData.subWorkflowId;
}
if(subWorkflowId != null) {
subs.push({name: task.inputData.subWorkflowName, version: task.inputData.subWorkflowVersion, referenceTaskName: task.referenceTaskName, subWorkflowId: subWorkflowId});
}
}
});
let submeta = {};
Expand Down
4 changes: 2 additions & 2 deletions ui/src/components/common/Grapher.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ class Grapher extends Component {
let subg = {n : n, vx: vx, layout: layout};

d3.select("#propsdiv").style("left", (window.outerWidth-600) + 'px');
div.style.left = (window.outerWidth-600) + "px";
p.setState({selectedTask: data.task, showSubGraph:true, showSideBar: false, subGraph: subg, subGraphId: innerGraph[v].id});
div.style.left = (window.outerWidth-1200) + "px";
p.setState({selectedTask: data.task, showSubGraph:true, showSideBar: true, subGraph: subg, subGraphId: innerGraph[v].id});
p.setState({showSubGraph: true});

} else if(vertices[v].tooltip != null){
Expand Down

0 comments on commit b883407

Please sign in to comment.