diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
index 88700d2b83..54aa0f47e4 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -256,7 +256,7 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType
* 1024L) {
taskResult.setReasonForIncompletion(
String.format(
- "The TaskResult payload size: %d is greater than the permissible %d MB",
+ "The TaskResult payload size: %d is greater than the permissible %d bytes",
taskResultSize, payloadSizeThreshold));
taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
taskResult.setOutputData(null);
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java
index d5420a61aa..9691b3c6eb 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -21,12 +21,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.Builder;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
+import com.netflix.conductor.model.TaskModel.Status;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AMQPEventQueueProperties.class)
@@ -68,7 +67,7 @@ public Map getQueues(
}
final boolean useExchange = properties.isUseExchange();
- Status[] statuses = new Task.Status[] {Status.COMPLETED, Status.FAILED};
+ Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED};
Map queues = new HashMap<>();
for (Status status : statuses) {
String queuePrefix =
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
index 412bbb8cb9..001250211d 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -22,11 +22,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
+import com.netflix.conductor.model.TaskModel.Status;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClient;
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
index f661e62b33..aca5dba120 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -29,13 +29,15 @@
import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
-import com.netflix.conductor.service.ExecutionService;
+import com.netflix.conductor.core.execution.WorkflowExecutor;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.TaskModel.Status;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -44,13 +46,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
@SuppressWarnings("unchecked")
@ContextConfiguration(classes = {TestObjectMapperConfiguration.class})
@@ -58,20 +55,22 @@
public class DefaultEventQueueProcessorTest {
private static SQSObservableQueue queue;
- private static ExecutionService executionService;
+ private static WorkflowExecutor workflowExecutor;
+ private static ModelMapper modelMapper;
private DefaultEventQueueProcessor defaultEventQueueProcessor;
@Autowired private ObjectMapper objectMapper;
private static final List messages = new LinkedList<>();
private static final List updatedTasks = new LinkedList<>();
+ private static final List mappedTasks = new LinkedList<>();
@Before
public void init() {
Map queues = new HashMap<>();
queues.put(Status.COMPLETED, queue);
defaultEventQueueProcessor =
- new DefaultEventQueueProcessor(queues, executionService, objectMapper);
+ new DefaultEventQueueProcessor(queues, workflowExecutor, modelMapper, objectMapper);
}
@BeforeClass
@@ -93,20 +92,20 @@ public static void setup() {
when(queue.observe()).thenCallRealMethod();
when(queue.getName()).thenReturn(Status.COMPLETED.name());
- Task task0 = new Task();
+ TaskModel task0 = new TaskModel();
task0.setStatus(Status.IN_PROGRESS);
task0.setTaskId("t0");
task0.setReferenceTaskName("t0");
task0.setTaskType(TASK_TYPE_WAIT);
- Workflow workflow0 = new Workflow();
+ WorkflowModel workflow0 = new WorkflowModel();
workflow0.setWorkflowId("v_0");
workflow0.getTasks().add(task0);
- Task task2 = new Task();
+ TaskModel task2 = new TaskModel();
task2.setStatus(Status.IN_PROGRESS);
task2.setTaskId("t2");
task2.setTaskType(TASK_TYPE_WAIT);
- Workflow workflow2 = new Workflow();
+ WorkflowModel workflow2 = new WorkflowModel();
workflow2.setWorkflowId("v_2");
workflow2.getTasks().add(task2);
@@ -120,12 +119,27 @@ public static void setup() {
.when(queue)
.publish(any());
- executionService = mock(ExecutionService.class);
- assertNotNull(executionService);
+ workflowExecutor = mock(WorkflowExecutor.class);
+ assertNotNull(workflowExecutor);
- doReturn(workflow0).when(executionService).getExecutionStatus(eq("v_0"), anyBoolean());
+ modelMapper = mock(ModelMapper.class);
+ when(modelMapper.mapToTaskStatus(any())).thenCallRealMethod();
+ when(modelMapper.getTask(any())).thenReturn(new Task());
- doReturn(workflow2).when(executionService).getExecutionStatus(eq("v_2"), anyBoolean());
+ doReturn(workflow0).when(workflowExecutor).getWorkflow(eq("v_0"), anyBoolean());
+
+ doReturn(workflow2).when(workflowExecutor).getWorkflow(eq("v_2"), anyBoolean());
+
+ doAnswer(
+ (Answer)
+ invocation -> {
+ Task task = new Task();
+ task.setTaskId(
+ invocation.getArgument(0, TaskModel.class).getTaskId());
+ return task;
+ })
+ .when(modelMapper)
+ .getTask(any(TaskModel.class));
doAnswer(
(Answer)
@@ -133,7 +147,7 @@ public static void setup() {
updatedTasks.add(invocation.getArgument(0, TaskResult.class));
return null;
})
- .when(executionService)
+ .when(workflowExecutor)
.updateTask(any(TaskResult.class));
}
diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java
index a1bd3a0bb7..f566d5ea32 100644
--- a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java
@@ -121,6 +121,10 @@ public Task getTask(TaskModel taskModel) {
return task;
}
+ public Task.Status mapToTaskStatus(TaskModel.Status status) {
+ return Task.Status.valueOf(status.name());
+ }
+
/**
* Populates the workflow input data and the tasks input/output data if stored in external
* payload storage.
diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java
index 1c83096586..2b5abac221 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java
@@ -12,12 +12,7 @@
*/
package com.netflix.conductor.core.events;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -29,13 +24,13 @@
import org.springframework.stereotype.Component;
import com.netflix.conductor.common.metadata.events.EventHandler;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.dao.EventHandlerDAO;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.TaskModel.Status;
/**
* Manages the event queues registered in the system and sets up listeners for these.
@@ -108,8 +103,8 @@ public void doStart() {
(status, queue) -> {
LOGGER.info(
"Start listening on default queue {} for status {}",
- status,
- queue.getName());
+ queue.getName(),
+ status);
queue.start();
});
}
diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java
index de3a2811b8..66af09ab4d 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java
@@ -12,12 +12,7 @@
*/
package com.netflix.conductor.core.events.queue;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -26,12 +21,14 @@
import org.springframework.stereotype.Component;
import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
-import com.netflix.conductor.service.ExecutionService;
+import com.netflix.conductor.core.execution.WorkflowExecutor;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.TaskModel.Status;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -55,16 +52,19 @@ public class DefaultEventQueueProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class);
private final Map queues;
- private final ExecutionService executionService;
+ private final WorkflowExecutor workflowExecutor;
+ private final ModelMapper modelMapper;
private static final TypeReference