diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index e6db4b37df..4627d5edca 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -130,6 +130,10 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName()); if(taskDef != null) { checkForTimeout(taskDef, task); + // If the task has not been updated for "responseTimeout" then rescheduled it. + if(checkForResponseTimeout(taskDef, task)){ + outcome.tasksToBeRequeued.add(task); + } } if (!task.getStatus().isSuccessful()) { @@ -374,6 +378,30 @@ void checkForTimeout(TaskDef taskType, Task task) { return; } + @VisibleForTesting + boolean checkForResponseTimeout(TaskDef taskType, Task task) { + + if(taskType == null){ + logger.warn("missing task type " + task.getTaskDefName() + ", workflowId=" + task.getWorkflowInstanceId()); + return false; + } + if (task.getStatus().isTerminal() || taskType.getTimeoutSeconds() <= 0 || + !task.getStatus().equals(Status.IN_PROGRESS) || taskType.getResponseTimeoutSeconds() == 0) { + return false; + } + + long responseTimeout = 1000 * taskType.getResponseTimeoutSeconds(); + long now = System.currentTimeMillis(); + long noResponseTime = now - task.getUpdateTime(); + + if (noResponseTime < responseTimeout) { + return false; + } + Monitors.recordTaskResponseTimeout(task.getTaskDefName()); + + return true; + } + private List getTasksToBeScheduled(WorkflowDef def, Workflow workflow, WorkflowTask taskToSchedule, int retryCount) { return getTasksToBeScheduled(def, workflow, taskToSchedule, retryCount, null); } @@ -672,6 +700,8 @@ public static class DeciderOutcome { List tasksToBeScheduled = new LinkedList<>(); List tasksToBeUpdated = new LinkedList<>(); + + List tasksToBeRequeued = new LinkedList<>(); boolean isComplete; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 94e4356087..3a57d5f1bc 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -511,8 +511,12 @@ public boolean decide(String workflowId) throws Exception { List tasksToBeScheduled = outcome.tasksToBeScheduled; setTaskDomains(tasksToBeScheduled, workflow); List tasksToBeUpdated = outcome.tasksToBeUpdated; + List tasksToBeRequeued = outcome.tasksToBeRequeued; boolean stateChanged = false; + if(!tasksToBeRequeued.isEmpty()){ + addTaskToQueue(tasksToBeRequeued); + } workflow.getTasks().addAll(tasksToBeScheduled); for(Task task : tasksToBeScheduled) { if (SystemTaskType.is(task.getTaskType()) && !task.getStatus().isTerminal()) { diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 5d0f78698c..418aa29da8 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -184,6 +184,10 @@ public static void recordTaskTimeout(String taskType) { counter(classQualifier, "task_timeout", "taskType", taskType); } + public static void recordTaskResponseTimeout(String taskType) { + counter(classQualifier, "task_response_timeout", "taskType", taskType); + } + public static void recordWorkflowTermination(String workflowType, WorkflowStatus status, String ownerApp) { counter(classQualifier, "workflow_failure", "workflowName", workflowType, "status", status.name(), "ownerApp", ""+ownerApp); } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java index e10bb05ec3..a5e245596f 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java @@ -1148,6 +1148,8 @@ public void testSimpleWorkflow() throws Exception { assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); assertEquals(wfid, task.getWorkflowInstanceId()); + provider.decide(wfid); + String task1Op = "task1.Done"; List tasks = ess.getTasks(task.getTaskType(), null, 1); assertNotNull(tasks); @@ -1197,6 +1199,67 @@ public void testSimpleWorkflow() throws Exception { } + @Test + public void testSimpleWorkflowWithResponseTimeout() throws Exception { + + createWFWithResponseTimeout(); + + String correlationId = "unit_test_1"; + Map input = new HashMap(); + String inputParam1 = "p1 value"; + input.put("param1", inputParam1); + input.put("param2", "p2 value"); + String wfid = provider.startWorkflow("RTOWF", 1, correlationId , input); + System.out.println("testSimpleWorkflowWithResponseTimeout.wfid=" + wfid); + assertNotNull(wfid); + + Workflow es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(1, es.getTasks().size()); //The very first task is the one that should be scheduled. + + + // Polling for the first task should return the same task as before + Task task = ess.poll("task_rt", "task1.junit.worker"); + assertNotNull(task); + assertEquals("task_rt", task.getTaskType()); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + assertEquals(wfid, task.getWorkflowInstanceId()); + + // As the task_rt is out of the queue, the next poll should not get it + Task nullTask = ess.poll("task_rt", "task1.junit.worker"); + assertNull(nullTask); + + // Now since the ResponseTimeOut is set to 15 secs, sleep + Thread.sleep(15000); + provider.decide(wfid); + + // Polling now should get the same task back because it should have been put back in the queue + Task taskAgain = ess.poll("task_rt", "task1.junit.worker"); + assertNotNull(taskAgain); + assertEquals(task.getTaskId(), taskAgain.getTaskId()); + + String task1Op = "task1.Done"; + task.getOutputData().put("op", task1Op); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertEquals("junit_task_2", task.getTaskType()); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + + task.setStatus(Status.COMPLETED); + task.setReasonForIncompletion("unit test failure"); + ess.updateTask(task); + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.COMPLETED, es.getStatus()); + + } + @Test public void testWorkflowRerunWithSubWorkflows() throws Exception { // Execute a workflow @@ -3475,7 +3538,50 @@ private void createWorkflowDefForDomain(){ } catch (Exception e) {} } - + private void createWFWithResponseTimeout() throws Exception{ + TaskDef task = new TaskDef(); + task.setName("task_rt"); + task.setTimeoutSeconds(120); + task.setRetryCount(RETRY_COUNT); + task.setResponseTimeoutSeconds(15); + ms.registerTaskDef(Arrays.asList(task)); + + WorkflowDef def = new WorkflowDef(); + def.setName("RTOWF"); + def.setDescription(def.getName()); + def.setVersion(1); + def.setInputParameters(Arrays.asList("param1", "param2")); + Map outputParameters = new HashMap<>(); + outputParameters.put("o1", "${workflow.input.param1}"); + outputParameters.put("o2", "${t2.output.uuid}"); + outputParameters.put("o3", "${t1.output.op}"); + def.setOutputParameters(outputParameters); + def.setFailureWorkflow("$workflow.input.failureWfName"); + def.setSchemaVersion(2); + LinkedList wftasks = new LinkedList<>(); + + WorkflowTask wft1 = new WorkflowTask(); + wft1.setName("task_rt"); + Map ip1 = new HashMap<>(); + ip1.put("p1", "${workflow.input.param1}"); + ip1.put("p2", "${workflow.input.param2}"); + wft1.setInputParameters(ip1); + wft1.setTaskReferenceName("task_rt_t1"); + + WorkflowTask wft2 = new WorkflowTask(); + wft2.setName("junit_task_2"); + Map ip2 = new HashMap<>(); + ip2.put("tp1", "${workflow.input.param1}"); + ip2.put("tp2", "${t1.output.op}"); + wft2.setInputParameters(ip2); + wft2.setTaskReferenceName("t2"); + + wftasks.add(wft1); + wftasks.add(wft2); + def.setTasks(wftasks); + + ms.updateWorkflowDef(def); + } private String runWorkflowWithSubworkflow() throws Exception{ clearWorkflows();