diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java index 18843debd7..fc4abefa0e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java @@ -12,9 +12,14 @@ */ package com.netflix.conductor.core.execution.mapper; +import java.text.ParseException; +import java.time.Duration; +import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -27,6 +32,11 @@ import com.netflix.conductor.model.WorkflowModel; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT; +import static com.netflix.conductor.core.execution.tasks.Wait.DURATION_INPUT; +import static com.netflix.conductor.core.execution.tasks.Wait.UNTIL_INPUT; +import static com.netflix.conductor.core.utils.DateTimeUtils.parseDate; +import static com.netflix.conductor.core.utils.DateTimeUtils.parseDuration; +import static com.netflix.conductor.model.TaskModel.Status.FAILED_WITH_TERMINAL_ERROR; /** * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link @@ -69,6 +79,52 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { waitTask.setInputData(waitTaskInput); waitTask.setStartTime(System.currentTimeMillis()); waitTask.setStatus(TaskModel.Status.IN_PROGRESS); + setCallbackAfter(waitTask); return List.of(waitTask); } + + void setCallbackAfter(TaskModel task) { + String duration = + Optional.ofNullable(task.getInputData().get(DURATION_INPUT)).orElse("").toString(); + String until = + Optional.ofNullable(task.getInputData().get(UNTIL_INPUT)).orElse("").toString(); + + if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) { + task.setReasonForIncompletion( + "Both 'duration' and 'until' specified. Please provide only one input"); + task.setStatus(FAILED_WITH_TERMINAL_ERROR); + return; + } + + if (StringUtils.isNotBlank(duration)) { + + Duration timeDuration = parseDuration(duration); + long waitTimeout = System.currentTimeMillis() + (timeDuration.getSeconds() * 1000); + task.setWaitTimeout(waitTimeout); + long seconds = timeDuration.getSeconds(); + task.setCallbackAfterSeconds(seconds); + + } else if (StringUtils.isNotBlank(until)) { + try { + + Date expiryDate = parseDate(until); + long timeInMS = expiryDate.getTime(); + long now = System.currentTimeMillis(); + long seconds = ((timeInMS - now) / 1000) + 1; + if (seconds < 0) { + seconds = 0; + } + task.setCallbackAfterSeconds(seconds); + task.setWaitTimeout(timeInMS); + + } catch (ParseException parseException) { + task.setReasonForIncompletion( + "Invalid/Unsupported Wait Until format. Provided: " + until); + task.setStatus(FAILED_WITH_TERMINAL_ERROR); + } + } else { + // If there is no time duration specified then the WAIT task should wait forever + task.setCallbackAfterSeconds(Integer.MAX_VALUE); + } + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Wait.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Wait.java index 47db207273..903355aa6f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Wait.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Wait.java @@ -12,12 +12,6 @@ */ package com.netflix.conductor.core.execution.tasks; -import java.text.ParseException; -import java.time.Duration; -import java.util.Date; -import java.util.Optional; - -import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import com.netflix.conductor.core.execution.WorkflowExecutor; @@ -25,8 +19,6 @@ import com.netflix.conductor.model.WorkflowModel; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT; -import static com.netflix.conductor.core.utils.DateTimeUtils.parseDate; -import static com.netflix.conductor.core.utils.DateTimeUtils.parseDuration; import static com.netflix.conductor.model.TaskModel.Status.*; @Component(TASK_TYPE_WAIT) @@ -39,46 +31,6 @@ public Wait() { super(TASK_TYPE_WAIT); } - @Override - public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { - - String duration = - Optional.ofNullable(task.getInputData().get(DURATION_INPUT)).orElse("").toString(); - String until = - Optional.ofNullable(task.getInputData().get(UNTIL_INPUT)).orElse("").toString(); - - if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) { - task.setReasonForIncompletion( - "Both 'duration' and 'until' specified. Please provide only one input"); - task.setStatus(FAILED_WITH_TERMINAL_ERROR); - return; - } - - if (StringUtils.isNotBlank(duration)) { - - Duration timeDuration = parseDuration(duration); - long waitTimeout = System.currentTimeMillis() + (timeDuration.getSeconds() * 1000); - task.setWaitTimeout(waitTimeout); - - long seconds = timeDuration.getSeconds(); - task.setCallbackAfterSeconds(seconds); - } else if (StringUtils.isNotBlank(until)) { - try { - Date expiryDate = parseDate(until); - long timeInMS = expiryDate.getTime(); - long now = System.currentTimeMillis(); - long seconds = (timeInMS - now) / 1000; - task.setWaitTimeout(timeInMS); - - } catch (ParseException parseException) { - task.setReasonForIncompletion( - "Invalid/Unsupported Wait Until format. Provided: " + until); - task.setStatus(FAILED_WITH_TERMINAL_ERROR); - } - } - task.setStatus(IN_PROGRESS); - } - @Override public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { task.setStatus(TaskModel.Status.CANCELED); @@ -98,4 +50,8 @@ public boolean execute( return false; } + + public boolean isAsync() { + return true; + } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 133db22ac7..bc3da8c609 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -316,8 +316,9 @@ public void testScheduleTask() { doAnswer(answer).when(queueDAO).push(any(), any(), anyInt(), anyLong()); boolean stateChanged = workflowExecutor.scheduleTask(workflow, tasks); - assertEquals(2, startedTaskCount.get()); - assertEquals(1, queuedTaskCount.get()); + // Wait task is no async to it will be queued. + assertEquals(1, startedTaskCount.get()); + assertEquals(2, queuedTaskCount.get()); assertTrue(stateChanged); assertFalse(httpTask.isStarted()); assertTrue(http2Task.isStarted()); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapperTest.java index 3b9071cef8..a1f2662dba 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapperTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapperTest.java @@ -12,8 +12,11 @@ */ package com.netflix.conductor.core.execution.mapper; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.junit.Test; @@ -21,6 +24,7 @@ import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.model.TaskModel; @@ -29,6 +33,9 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; public class WaitTaskMapperTest { @@ -65,4 +72,145 @@ public void getMappedTasks() { assertEquals(1, mappedTasks.size()); assertEquals(TASK_TYPE_WAIT, mappedTasks.get(0).getTaskType()); } + + @Test + public void testWaitForever() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("Wait_task"); + workflowTask.setType(TaskType.WAIT.name()); + String taskId = new IDGenerator().generate(); + + ParametersUtils parametersUtils = mock(ParametersUtils.class); + WorkflowModel workflow = new WorkflowModel(); + WorkflowDef workflowDef = new WorkflowDef(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withTaskId(taskId) + .build(); + + WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils); + // When + List mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext); + assertEquals(1, mappedTasks.size()); + assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS); + assertTrue(mappedTasks.get(0).getOutputData().isEmpty()); + } + + @Test + public void testWaitUntil() { + + String dateFormat = "yyyy-MM-dd HH:mm"; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat); + LocalDateTime now = LocalDateTime.now(); + String formatted = formatter.format(now); + System.out.println(formatted); + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("Wait_task"); + workflowTask.setType(TaskType.WAIT.name()); + String taskId = new IDGenerator().generate(); + Map input = Map.of(Wait.UNTIL_INPUT, formatted); + workflowTask.setInputParameters(input); + + ParametersUtils parametersUtils = mock(ParametersUtils.class); + doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any()); + + WorkflowModel workflow = new WorkflowModel(); + WorkflowDef workflowDef = new WorkflowDef(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(Map.of(Wait.UNTIL_INPUT, formatted)) + .withRetryCount(0) + .withTaskId(taskId) + .build(); + + WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils); + // When + List mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext); + assertEquals(1, mappedTasks.size()); + assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS); + assertEquals(mappedTasks.get(0).getCallbackAfterSeconds(), 0L); + } + + @Test + public void testWaitDuration() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("Wait_task"); + workflowTask.setType(TaskType.WAIT.name()); + String taskId = new IDGenerator().generate(); + Map input = Map.of(Wait.DURATION_INPUT, "1s"); + workflowTask.setInputParameters(input); + + ParametersUtils parametersUtils = mock(ParametersUtils.class); + doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any()); + WorkflowModel workflow = new WorkflowModel(); + WorkflowDef workflowDef = new WorkflowDef(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(Map.of(Wait.DURATION_INPUT, "1s")) + .withRetryCount(0) + .withTaskId(taskId) + .build(); + + WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils); + // When + List mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext); + assertEquals(1, mappedTasks.size()); + assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS); + assertTrue(mappedTasks.get(0).getCallbackAfterSeconds() <= 1L); + } + + @Test + public void testInvalidWaitConfig() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("Wait_task"); + workflowTask.setType(TaskType.WAIT.name()); + String taskId = new IDGenerator().generate(); + Map input = + Map.of(Wait.DURATION_INPUT, "1s", Wait.UNTIL_INPUT, "2022-12-12"); + workflowTask.setInputParameters(input); + + ParametersUtils parametersUtils = mock(ParametersUtils.class); + doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any()); + WorkflowModel workflow = new WorkflowModel(); + WorkflowDef workflowDef = new WorkflowDef(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput( + Map.of(Wait.DURATION_INPUT, "1s", Wait.UNTIL_INPUT, "2022-12-12")) + .withRetryCount(0) + .withTaskId(taskId) + .build(); + + WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils); + // When + List mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext); + assertEquals(1, mappedTasks.size()); + assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.FAILED_WITH_TERMINAL_ERROR); + } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestWait.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestWait.java deleted file mode 100644 index e419d07951..0000000000 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestWait.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2022 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package com.netflix.conductor.core.execution.tasks; - -import java.text.ParseException; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Date; - -import org.apache.commons.lang3.time.DateUtils; -import org.junit.Test; - -import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; - -import static org.junit.Assert.*; - -public class TestWait { - - private final Wait wait = new Wait(); - - @Test - public void testWaitForever() { - - TaskModel task = new TaskModel(); - task.setStatus(TaskModel.Status.SCHEDULED); - WorkflowModel model = new WorkflowModel(); - - wait.start(model, task, null); - assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus()); - assertTrue(task.getOutputData().isEmpty()); - } - - @Test - public void testWaitUntil() throws ParseException { - String dateFormat = "yyyy-MM-dd HH:mm"; - - WorkflowModel model = new WorkflowModel(); - - TaskModel task = new TaskModel(); - task.setStatus(TaskModel.Status.SCHEDULED); - - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat); - LocalDateTime now = LocalDateTime.now(); - String formatted = formatter.format(now); - System.out.println(formatted); - - task.getInputData().put(Wait.UNTIL_INPUT, formatted); - Date parsed = DateUtils.parseDate(formatted, dateFormat); - - wait.start(model, task, null); - assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus()); - assertEquals(parsed.getTime(), task.getWaitTimeout()); - - // Execute runs when checking if the task has completed - boolean updated = wait.execute(model, task, null); - assertTrue(updated); - assertEquals(TaskModel.Status.COMPLETED, task.getStatus()); - } - - @Test - public void testWaitDuration() throws ParseException { - WorkflowModel model = new WorkflowModel(); - - TaskModel task = new TaskModel(); - task.setStatus(TaskModel.Status.SCHEDULED); - - task.getInputData().put(Wait.DURATION_INPUT, "1s"); - wait.start(model, task, null); - long now = System.currentTimeMillis(); - - assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus()); - assertEquals(now + 1000, task.getWaitTimeout()); - - try { - Thread.sleep(2_000); - } catch (InterruptedException e) { - } - - // Execute runs when checking if the task has completed - boolean updated = wait.execute(model, task, null); - assertTrue(updated); - assertEquals(TaskModel.Status.COMPLETED, task.getStatus()); - } - - @Test - public void testInvalidWaitConfig() throws ParseException { - WorkflowModel model = new WorkflowModel(); - - TaskModel task = new TaskModel(); - task.setStatus(TaskModel.Status.SCHEDULED); - - task.getInputData().put(Wait.DURATION_INPUT, "1s"); - task.getInputData().put(Wait.UNTIL_INPUT, "2022-12-12"); - wait.start(model, task, null); - assertEquals(TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, task.getStatus()); - assertTrue(!task.getReasonForIncompletion().isEmpty()); - } -} diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 593ae6866e..0fbf0ec0b3 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -730,7 +730,7 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { tasks[0].taskType == 'WAIT' tasks[0].status == Task.Status.TIMED_OUT tasks[1].taskType == 'WAIT' - tasks[1].status == Task.Status.IN_PROGRESS + tasks[1].status == Task.Status.SCHEDULED } when: "The wait task is completed"