From 82a6e3ae37774e493d85787aa68b8b4ca7124299 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Wed, 2 Feb 2022 14:18:45 -0800 Subject: [PATCH] use domain models in default queues --- .../conductor/client/http/TaskClient.java | 2 +- .../config/AMQPEventQueueConfiguration.java | 7 +-- .../config/SQSEventQueueConfiguration.java | 4 +- .../sqs/DefaultEventQueueProcessorTest.java | 58 ++++++++++++------- .../conductor/core/dal/ModelMapper.java | 4 ++ .../core/events/DefaultEventQueueManager.java | 13 ++--- .../queue/DefaultEventQueueProcessor.java | 36 ++++++------ .../core/execution/DeciderService.java | 3 +- .../core/execution/tasks/SetVariable.java | 2 +- .../utils/ExternalPayloadStorageUtils.java | 4 +- .../netflix/conductor/model/TaskModel.java | 5 -- .../conductor/core/dal/ModelMapperSpec.groovy | 17 ++++++ .../rest/controllers/QueueAdminResource.java | 4 +- .../integration/SetVariableTaskSpec.groovy | 2 +- 14 files changed, 92 insertions(+), 69 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 88700d2b83..54aa0f47e4 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -256,7 +256,7 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType * 1024L) { taskResult.setReasonForIncompletion( String.format( - "The TaskResult payload size: %d is greater than the permissible %d MB", + "The TaskResult payload size: %d is greater than the permissible %d bytes", taskResultSize, payloadSizeThreshold)); taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR); taskResult.setOutputData(null); diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java index d5420a61aa..9691b3c6eb 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2022 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -21,12 +21,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.Builder; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.model.TaskModel.Status; @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(AMQPEventQueueProperties.class) @@ -68,7 +67,7 @@ public Map getQueues( } final boolean useExchange = properties.isUseExchange(); - Status[] statuses = new Task.Status[] {Status.COMPLETED, Status.FAILED}; + Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED}; Map queues = new HashMap<>(); for (Status status : statuses) { String queuePrefix = diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java index 412bbb8cb9..001250211d 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2022 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -22,11 +22,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.model.TaskModel.Status; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQSClient; diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java index f661e62b33..aca5dba120 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Netflix, Inc. + * Copyright 2022 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -29,13 +29,15 @@ import com.netflix.conductor.common.config.TestObjectMapperConfiguration; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.dal.ModelMapper; import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import com.netflix.conductor.service.ExecutionService; +import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.TaskModel.Status; +import com.netflix.conductor.model.WorkflowModel; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Uninterruptibles; @@ -44,13 +46,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; @SuppressWarnings("unchecked") @ContextConfiguration(classes = {TestObjectMapperConfiguration.class}) @@ -58,20 +55,22 @@ public class DefaultEventQueueProcessorTest { private static SQSObservableQueue queue; - private static ExecutionService executionService; + private static WorkflowExecutor workflowExecutor; + private static ModelMapper modelMapper; private DefaultEventQueueProcessor defaultEventQueueProcessor; @Autowired private ObjectMapper objectMapper; private static final List messages = new LinkedList<>(); private static final List updatedTasks = new LinkedList<>(); + private static final List mappedTasks = new LinkedList<>(); @Before public void init() { Map queues = new HashMap<>(); queues.put(Status.COMPLETED, queue); defaultEventQueueProcessor = - new DefaultEventQueueProcessor(queues, executionService, objectMapper); + new DefaultEventQueueProcessor(queues, workflowExecutor, modelMapper, objectMapper); } @BeforeClass @@ -93,20 +92,20 @@ public static void setup() { when(queue.observe()).thenCallRealMethod(); when(queue.getName()).thenReturn(Status.COMPLETED.name()); - Task task0 = new Task(); + TaskModel task0 = new TaskModel(); task0.setStatus(Status.IN_PROGRESS); task0.setTaskId("t0"); task0.setReferenceTaskName("t0"); task0.setTaskType(TASK_TYPE_WAIT); - Workflow workflow0 = new Workflow(); + WorkflowModel workflow0 = new WorkflowModel(); workflow0.setWorkflowId("v_0"); workflow0.getTasks().add(task0); - Task task2 = new Task(); + TaskModel task2 = new TaskModel(); task2.setStatus(Status.IN_PROGRESS); task2.setTaskId("t2"); task2.setTaskType(TASK_TYPE_WAIT); - Workflow workflow2 = new Workflow(); + WorkflowModel workflow2 = new WorkflowModel(); workflow2.setWorkflowId("v_2"); workflow2.getTasks().add(task2); @@ -120,12 +119,27 @@ public static void setup() { .when(queue) .publish(any()); - executionService = mock(ExecutionService.class); - assertNotNull(executionService); + workflowExecutor = mock(WorkflowExecutor.class); + assertNotNull(workflowExecutor); - doReturn(workflow0).when(executionService).getExecutionStatus(eq("v_0"), anyBoolean()); + modelMapper = mock(ModelMapper.class); + when(modelMapper.mapToTaskStatus(any())).thenCallRealMethod(); + when(modelMapper.getTask(any())).thenReturn(new Task()); - doReturn(workflow2).when(executionService).getExecutionStatus(eq("v_2"), anyBoolean()); + doReturn(workflow0).when(workflowExecutor).getWorkflow(eq("v_0"), anyBoolean()); + + doReturn(workflow2).when(workflowExecutor).getWorkflow(eq("v_2"), anyBoolean()); + + doAnswer( + (Answer) + invocation -> { + Task task = new Task(); + task.setTaskId( + invocation.getArgument(0, TaskModel.class).getTaskId()); + return task; + }) + .when(modelMapper) + .getTask(any(TaskModel.class)); doAnswer( (Answer) @@ -133,7 +147,7 @@ public static void setup() { updatedTasks.add(invocation.getArgument(0, TaskResult.class)); return null; }) - .when(executionService) + .when(workflowExecutor) .updateTask(any(TaskResult.class)); } diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java index a1bd3a0bb7..f566d5ea32 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java @@ -121,6 +121,10 @@ public Task getTask(TaskModel taskModel) { return task; } + public Task.Status mapToTaskStatus(TaskModel.Status status) { + return Task.Status.valueOf(status.name()); + } + /** * Populates the workflow input data and the tasks input/output data if stored in external * payload storage. diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java index 1c83096586..2b5abac221 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java @@ -12,12 +12,7 @@ */ package com.netflix.conductor.core.events; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -29,13 +24,13 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.events.EventHandler; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.model.TaskModel.Status; /** * Manages the event queues registered in the system and sets up listeners for these. @@ -108,8 +103,8 @@ public void doStart() { (status, queue) -> { LOGGER.info( "Start listening on default queue {} for status {}", - status, - queue.getName()); + queue.getName(), + status); queue.start(); }); } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java index de3a2811b8..66af09ab4d 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java @@ -12,12 +12,7 @@ */ package com.netflix.conductor.core.events.queue; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -26,12 +21,14 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.dal.ModelMapper; import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.core.exception.ApplicationException.Code; -import com.netflix.conductor.service.ExecutionService; +import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.TaskModel.Status; +import com.netflix.conductor.model.WorkflowModel; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; @@ -55,16 +52,19 @@ public class DefaultEventQueueProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class); private final Map queues; - private final ExecutionService executionService; + private final WorkflowExecutor workflowExecutor; + private final ModelMapper modelMapper; private static final TypeReference> _mapType = new TypeReference<>() {}; private final ObjectMapper objectMapper; public DefaultEventQueueProcessor( Map queues, - ExecutionService executionService, + WorkflowExecutor workflowExecutor, + ModelMapper modelMapper, ObjectMapper objectMapper) { this.queues = queues; - this.executionService = executionService; + this.workflowExecutor = workflowExecutor; + this.modelMapper = modelMapper; this.objectMapper = objectMapper; queues.forEach(this::startMonitor); LOGGER.info( @@ -98,9 +98,9 @@ private void startMonitor(Status status, ObservableQueue queue) { queue.ack(Collections.singletonList(msg)); return; } - Workflow workflow = - executionService.getExecutionStatus(workflowId, true); - Optional taskOptional; + WorkflowModel workflow = + workflowExecutor.getWorkflow(workflowId, true); + Optional taskOptional; if (StringUtils.isNotEmpty(taskId)) { taskOptional = workflow.getTasks().stream() @@ -145,11 +145,11 @@ private void startMonitor(Status status, ObservableQueue queue) { return; } - Task task = taskOptional.get(); - task.setStatus(status); + Task task = modelMapper.getTask(taskOptional.get()); + task.setStatus(modelMapper.mapToTaskStatus(status)); task.getOutputData() .putAll(objectMapper.convertValue(payloadJSON, _mapType)); - executionService.updateTask(new TaskResult(task)); + workflowExecutor.updateTask(new TaskResult(task)); List failures = queue.ack(Collections.singletonList(msg)); if (!failures.isEmpty()) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 66cc2c9e74..d16e9d030b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -400,8 +400,7 @@ void updateWorkflowOutput(final WorkflowModel workflow, TaskModel task) { workflow.setOutput(output); } - @VisibleForTesting - boolean checkForWorkflowCompletion(final WorkflowModel workflow) + public boolean checkForWorkflowCompletion(final WorkflowModel workflow) throws TerminateWorkflowException { List allTasks = workflow.getTasks(); if (allTasks.isEmpty()) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java index 2bdcbfb7d0..2e84742148 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java @@ -59,7 +59,7 @@ private boolean validateVariablesSize( if (payloadSize > maxThreshold * 1024) { String errorMsg = String.format( - "The variables payload size: %dB of workflow: %s is greater than the permissible limit: %dKB", + "The variables payload size: %d of workflow: %s is greater than the permissible limit: %d bytes", payloadSize, workflowId, maxThreshold); LOGGER.error(errorMsg); task.setReasonForIncompletion(errorMsg); diff --git a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java index fb0b137353..7917b34bf9 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java @@ -126,7 +126,7 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { if (entity instanceof TaskModel) { String errorMsg = String.format( - "The payload size: %dB of task: %s in workflow: %s is greater than the permissible limit: %dKB", + "The payload size: %d of task: %s in workflow: %s is greater than the permissible limit: %d bytes", payloadSize, ((TaskModel) entity).getTaskId(), ((TaskModel) entity).getWorkflowInstanceId(), @@ -135,7 +135,7 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { } else { String errorMsg = String.format( - "The output payload size: %dB of workflow: %s is greater than the permissible limit: %dKB", + "The output payload size: %dB of workflow: %s is greater than the permissible limit: %d bytes", payloadSize, ((WorkflowModel) entity).getWorkflowId(), maxThreshold); diff --git a/core/src/main/java/com/netflix/conductor/model/TaskModel.java b/core/src/main/java/com/netflix/conductor/model/TaskModel.java index 6b3c1573ae..f526a56052 100644 --- a/core/src/main/java/com/netflix/conductor/model/TaskModel.java +++ b/core/src/main/java/com/netflix/conductor/model/TaskModel.java @@ -20,7 +20,6 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; -import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; @@ -62,10 +61,6 @@ public boolean isSuccessful() { public boolean isRetriable() { return retriable; } - - public static Task.Status getTaskStatusDTO(Status status) { - return Task.Status.valueOf(status.name()); - } } private String taskType; diff --git a/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy index 1c8d9d8b38..713970f497 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy @@ -153,4 +153,21 @@ class ModelMapperSpec extends Specification { externalInputPayloadStoragePath == '/relative/task/path' } } + + def "map task model status to task status"() { + expect: + taskStatus == modelMapper.mapToTaskStatus(taskModelStatus) + + where: + taskModelStatus || taskStatus + TaskModel.Status.IN_PROGRESS || Task.Status.IN_PROGRESS + TaskModel.Status.CANCELED || Task.Status.CANCELED + TaskModel.Status.FAILED || Task.Status.FAILED + TaskModel.Status.FAILED_WITH_TERMINAL_ERROR || Task.Status.FAILED_WITH_TERMINAL_ERROR + TaskModel.Status.COMPLETED || Task.Status.COMPLETED + TaskModel.Status.COMPLETED_WITH_ERRORS || Task.Status.COMPLETED_WITH_ERRORS + TaskModel.Status.SCHEDULED || Task.Status.SCHEDULED + TaskModel.Status.TIMED_OUT || Task.Status.TIMED_OUT + TaskModel.Status.SKIPPED || Task.Status.SKIPPED + } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java index 80133c8bc0..70eb058722 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2022 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -21,8 +21,8 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; +import com.netflix.conductor.model.TaskModel.Status; import io.swagger.v3.oas.annotations.Operation; diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SetVariableTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SetVariableTaskSpec.groovy index 4fab0de12b..5f1e25296b 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SetVariableTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SetVariableTaskSpec.groovy @@ -62,7 +62,7 @@ class SetVariableTaskSpec extends AbstractSpecification { def EXTRA_HASHMAP_SIZE = 17 def expectedErrorMessage = String.format( - "The variables payload size: %dB of workflow: %s is greater than the permissible limit: %dKB", + "The variables payload size: %d of workflow: %s is greater than the permissible limit: %d bytes", EXTRA_HASHMAP_SIZE + maxThreshold * 1024 + 1, workflowInstanceId, maxThreshold) then: "verify that the task is completed and variables were set"