diff --git a/buildViaTravis.sh b/buildViaTravis.sh index 879e34c644..9cc169e471 100755 --- a/buildViaTravis.sh +++ b/buildViaTravis.sh @@ -1,6 +1,5 @@ #!/bin/bash # This script will build the project. - if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then echo -e "Build Pull Request #$TRAVIS_PULL_REQUEST => Branch [$TRAVIS_BRANCH]" ./gradlew build diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index f062006e96..7999d12ece 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -137,7 +137,6 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li } if (workflowTask != null && workflowTask.isOptional()) { task.setStatus(Status.COMPLETED_WITH_ERRORS); - //outcome.tasksToBeUpdated.add(task); } else { Task rt = retry(taskDef, workflowTask, task, workflow); tasksToBeScheduled.put(rt.getReferenceTaskName(), rt); @@ -321,10 +320,11 @@ private Task retry(TaskDef taskDef, WorkflowTask workflowTask, Task task, Workfl rescheduled.setRetriedTaskId(task.getTaskId()); rescheduled.setStatus(Status.SCHEDULED); rescheduled.setPollCount(0); - + rescheduled.setInputData(new HashMap<>()); + rescheduled.getInputData().putAll(task.getInputData()); if(workflowTask != null && workflow.getSchemaVersion() > 1) { Map taskInput = pu.getTaskInputV2(workflowTask.getInputParameters(), workflow, rescheduled.getTaskId(), taskDef); - rescheduled.setInputData(taskInput); + rescheduled.getInputData().putAll(taskInput); } //for the schema version 1, we do not have to recompute the inputs return rescheduled; diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index ae24c33823..00239f47e7 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -36,6 +36,8 @@ import org.junit.Before; import org.junit.Test; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; @@ -56,13 +58,23 @@ public class TestDeciderOutcomes { private DeciderService ds; - private ObjectMapper om = new ObjectMapper(); + private static ObjectMapper om = new ObjectMapper(); + + static { + om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); + om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); + om.setSerializationInclusion(Include.NON_NULL); + om.setSerializationInclusion(Include.NON_EMPTY); + } + @Before public void init() throws Exception { MetadataDAO metadata = mock(MetadataDAO.class); TaskDef td = new TaskDef(); + td.setRetryCount(1); when(metadata.getTaskDef(any())).thenReturn(td); this.ds = new DeciderService(metadata, om); } @@ -107,35 +119,102 @@ public void testRetries() { task.setType("USER_TASK"); task.setTaskReferenceName("t0"); task.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + task.getInputParameters().put("requestId", "${workflow.input.requestId}"); def.getTasks().add(task); def.setSchemaVersion(2); - Workflow workflow = new Workflow(); + workflow.getInput().put("requestId", 123); workflow.setStartTime(System.currentTimeMillis()); DeciderOutcome outcome = ds.decide(workflow, def); assertNotNull(outcome); - System.out.println(outcome.tasksToBeScheduled); assertEquals(1, outcome.tasksToBeScheduled.size()); assertEquals(task.getTaskReferenceName(), outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); - System.out.println(outcome.tasksToBeScheduled.get(0).getInputData()); + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + assertEquals(123, outcome.tasksToBeScheduled.get(0).getInputData().get("requestId")); outcome.tasksToBeScheduled.get(0).setStatus(Status.FAILED); workflow.getTasks().addAll(outcome.tasksToBeScheduled); outcome = ds.decide(workflow, def); assertNotNull(outcome); - System.out.println(outcome.tasksToBeScheduled); - System.out.println(outcome.tasksToBeUpdated); assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); assertNotSame(task1Id, outcome.tasksToBeScheduled.get(0).getTaskId()); assertEquals(outcome.tasksToBeScheduled.get(0).getTaskId(), outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getRetriedTaskId()); + assertEquals(123, outcome.tasksToBeScheduled.get(0).getInputData().get("requestId")); + + + WorkflowTask fork = new WorkflowTask(); + fork.setName("fork0"); + fork.setWorkflowTaskType(Type.FORK_JOIN_DYNAMIC); + fork.setTaskReferenceName("fork0"); + fork.setDynamicForkTasksInputParamName("forkedInputs"); + fork.setDynamicForkTasksParam("forks"); + fork.getInputParameters().put("forks", "${workflow.input.forks}"); + fork.getInputParameters().put("forkedInputs", "${workflow.input.forkedInputs}"); + + WorkflowTask join = new WorkflowTask(); + join.setName("join0"); + join.setType("JOIN"); + join.setTaskReferenceName("join0"); + + def.getTasks().clear(); + def.getTasks().add(fork); + def.getTasks().add(join); + + List forks = new LinkedList<>(); + Map> forkedInputs = new HashMap<>(); + + for(int i = 0; i < 1; i++) { + WorkflowTask wft = new WorkflowTask(); + wft.setName("f" + i); + wft.setTaskReferenceName("f" + i); + wft.setWorkflowTaskType(Type.SIMPLE); + wft.getInputParameters().put("requestId", "${workflow.input.requestId}"); + wft.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + forks.add(wft); + Map input = new HashMap<>(); + input.put("k", "v"); + input.put("k1", 1); + forkedInputs.put(wft.getTaskReferenceName(), input); + } + workflow = new Workflow(); + workflow.getInput().put("requestId", 123); + workflow.setStartTime(System.currentTimeMillis()); + + workflow.getInput().put("forks", forks); + workflow.getInput().put("forkedInputs", forkedInputs); + + outcome = ds.decide(workflow, def); + assertNotNull(outcome); + assertEquals(3, outcome.tasksToBeScheduled.size()); + assertEquals(0, outcome.tasksToBeUpdated.size()); + + assertEquals("v", outcome.tasksToBeScheduled.get(1).getInputData().get("k")); + assertEquals(1, outcome.tasksToBeScheduled.get(1).getInputData().get("k1")); + assertEquals(outcome.tasksToBeScheduled.get(1).getTaskId(), outcome.tasksToBeScheduled.get(1).getInputData().get("taskId")); + System.out.println(outcome.tasksToBeScheduled.get(1).getInputData()); + task1Id = outcome.tasksToBeScheduled.get(1).getTaskId(); + + outcome.tasksToBeScheduled.get(1).setStatus(Status.FAILED); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + + outcome = ds.decide(workflow, def); + assertEquals("v", outcome.tasksToBeScheduled.get(1).getInputData().get("k")); + assertEquals(1, outcome.tasksToBeScheduled.get(1).getInputData().get("k1")); + assertEquals(outcome.tasksToBeScheduled.get(1).getTaskId(), outcome.tasksToBeScheduled.get(1).getInputData().get("taskId")); + assertNotSame(task1Id, outcome.tasksToBeScheduled.get(1).getTaskId()); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(1).getRetriedTaskId()); + System.out.println(outcome.tasksToBeScheduled.get(1).getInputData()); + } @Test diff --git a/ui/src/api/wfe.js b/ui/src/api/wfe.js index aa864eba74..e0c5d9c47a 100644 --- a/ui/src/api/wfe.js +++ b/ui/src/api/wfe.js @@ -49,7 +49,13 @@ router.get('/id/:workflowId', async (req, res, next) => { const subworkflows = {}; result.tasks.forEach(task=>{ if(task.taskType == 'SUB_WORKFLOW'){ - subs.push({name: task.inputData.subWorkflowName, version: task.inputData.subWorkflowVersion, referenceTaskName: task.referenceTaskName, subWorkflowId: task.outputData.subWorkflowId}); + let subWorkflowId = task.outputData.subWorkflowId; + if(subWorkflowId == null) { + subWorkflowId = task.inputData.subWorkflowId; + } + if(subWorkflowId != null) { + subs.push({name: task.inputData.subWorkflowName, version: task.inputData.subWorkflowVersion, referenceTaskName: task.referenceTaskName, subWorkflowId: subWorkflowId}); + } } }); let submeta = {}; diff --git a/ui/src/components/common/Grapher.js b/ui/src/components/common/Grapher.js index eb54a39a11..8aee2f82cf 100644 --- a/ui/src/components/common/Grapher.js +++ b/ui/src/components/common/Grapher.js @@ -170,8 +170,8 @@ class Grapher extends Component { let subg = {n : n, vx: vx, layout: layout}; d3.select("#propsdiv").style("left", (window.outerWidth-600) + 'px'); - div.style.left = (window.outerWidth-600) + "px"; - p.setState({selectedTask: data.task, showSubGraph:true, showSideBar: false, subGraph: subg, subGraphId: innerGraph[v].id}); + div.style.left = (window.outerWidth-1200) + "px"; + p.setState({selectedTask: data.task, showSubGraph:true, showSideBar: true, subGraph: subg, subGraphId: innerGraph[v].id}); p.setState({showSubGraph: true}); } else if(vertices[v].tooltip != null){