Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2745 from Netflix/default_queues_domain_models
Browse files Browse the repository at this point in the history
use domain models in default queues
  • Loading branch information
apanicker-nflx authored Feb 3, 2022
2 parents a8fc843 + 82a6e3a commit 5d23971
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType
* 1024L) {
taskResult.setReasonForIncompletion(
String.format(
"The TaskResult payload size: %d is greater than the permissible %d MB",
"The TaskResult payload size: %d is greater than the permissible %d bytes",
taskResultSize, payloadSizeThreshold));
taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
taskResult.setOutputData(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -21,12 +21,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.Builder;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.model.TaskModel.Status;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AMQPEventQueueProperties.class)
Expand Down Expand Up @@ -68,7 +67,7 @@ public Map<Status, ObservableQueue> getQueues(
}
final boolean useExchange = properties.isUseExchange();

Status[] statuses = new Task.Status[] {Status.COMPLETED, Status.FAILED};
Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED};
Map<Status, ObservableQueue> queues = new HashMap<>();
for (Status status : statuses) {
String queuePrefix =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -22,11 +22,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.model.TaskModel.Status;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -29,13 +29,15 @@

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -44,34 +46,31 @@

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

@SuppressWarnings("unchecked")
@ContextConfiguration(classes = {TestObjectMapperConfiguration.class})
@RunWith(SpringRunner.class)
public class DefaultEventQueueProcessorTest {

private static SQSObservableQueue queue;
private static ExecutionService executionService;
private static WorkflowExecutor workflowExecutor;
private static ModelMapper modelMapper;
private DefaultEventQueueProcessor defaultEventQueueProcessor;

@Autowired private ObjectMapper objectMapper;

private static final List<Message> messages = new LinkedList<>();
private static final List<TaskResult> updatedTasks = new LinkedList<>();
private static final List<Task> mappedTasks = new LinkedList<>();

@Before
public void init() {
Map<Status, ObservableQueue> queues = new HashMap<>();
queues.put(Status.COMPLETED, queue);
defaultEventQueueProcessor =
new DefaultEventQueueProcessor(queues, executionService, objectMapper);
new DefaultEventQueueProcessor(queues, workflowExecutor, modelMapper, objectMapper);
}

@BeforeClass
Expand All @@ -93,20 +92,20 @@ public static void setup() {
when(queue.observe()).thenCallRealMethod();
when(queue.getName()).thenReturn(Status.COMPLETED.name());

Task task0 = new Task();
TaskModel task0 = new TaskModel();
task0.setStatus(Status.IN_PROGRESS);
task0.setTaskId("t0");
task0.setReferenceTaskName("t0");
task0.setTaskType(TASK_TYPE_WAIT);
Workflow workflow0 = new Workflow();
WorkflowModel workflow0 = new WorkflowModel();
workflow0.setWorkflowId("v_0");
workflow0.getTasks().add(task0);

Task task2 = new Task();
TaskModel task2 = new TaskModel();
task2.setStatus(Status.IN_PROGRESS);
task2.setTaskId("t2");
task2.setTaskType(TASK_TYPE_WAIT);
Workflow workflow2 = new Workflow();
WorkflowModel workflow2 = new WorkflowModel();
workflow2.setWorkflowId("v_2");
workflow2.getTasks().add(task2);

Expand All @@ -120,20 +119,35 @@ public static void setup() {
.when(queue)
.publish(any());

executionService = mock(ExecutionService.class);
assertNotNull(executionService);
workflowExecutor = mock(WorkflowExecutor.class);
assertNotNull(workflowExecutor);

doReturn(workflow0).when(executionService).getExecutionStatus(eq("v_0"), anyBoolean());
modelMapper = mock(ModelMapper.class);
when(modelMapper.mapToTaskStatus(any())).thenCallRealMethod();
when(modelMapper.getTask(any())).thenReturn(new Task());

doReturn(workflow2).when(executionService).getExecutionStatus(eq("v_2"), anyBoolean());
doReturn(workflow0).when(workflowExecutor).getWorkflow(eq("v_0"), anyBoolean());

doReturn(workflow2).when(workflowExecutor).getWorkflow(eq("v_2"), anyBoolean());

doAnswer(
(Answer<Task>)
invocation -> {
Task task = new Task();
task.setTaskId(
invocation.getArgument(0, TaskModel.class).getTaskId());
return task;
})
.when(modelMapper)
.getTask(any(TaskModel.class));

doAnswer(
(Answer<Void>)
invocation -> {
updatedTasks.add(invocation.getArgument(0, TaskResult.class));
return null;
})
.when(executionService)
.when(workflowExecutor)
.updateTask(any(TaskResult.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public Task getTask(TaskModel taskModel) {
return task;
}

public Task.Status mapToTaskStatus(TaskModel.Status status) {
return Task.Status.valueOf(status.name());
}

/**
* Populates the workflow input data and the tasks input/output data if stored in external
* payload storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
*/
package com.netflix.conductor.core.events;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -29,13 +24,13 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.dao.EventHandlerDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel.Status;

/**
* Manages the event queues registered in the system and sets up listeners for these.
Expand Down Expand Up @@ -108,8 +103,8 @@ public void doStart() {
(status, queue) -> {
LOGGER.info(
"Start listening on default queue {} for status {}",
status,
queue.getName());
queue.getName(),
status);
queue.start();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
*/
package com.netflix.conductor.core.events.queue;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.*;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -26,12 +21,14 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -55,16 +52,19 @@ public class DefaultEventQueueProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class);
private final Map<Status, ObservableQueue> queues;
private final ExecutionService executionService;
private final WorkflowExecutor workflowExecutor;
private final ModelMapper modelMapper;
private static final TypeReference<Map<String, Object>> _mapType = new TypeReference<>() {};
private final ObjectMapper objectMapper;

public DefaultEventQueueProcessor(
Map<Status, ObservableQueue> queues,
ExecutionService executionService,
WorkflowExecutor workflowExecutor,
ModelMapper modelMapper,
ObjectMapper objectMapper) {
this.queues = queues;
this.executionService = executionService;
this.workflowExecutor = workflowExecutor;
this.modelMapper = modelMapper;
this.objectMapper = objectMapper;
queues.forEach(this::startMonitor);
LOGGER.info(
Expand Down Expand Up @@ -98,9 +98,9 @@ private void startMonitor(Status status, ObservableQueue queue) {
queue.ack(Collections.singletonList(msg));
return;
}
Workflow workflow =
executionService.getExecutionStatus(workflowId, true);
Optional<Task> taskOptional;
WorkflowModel workflow =
workflowExecutor.getWorkflow(workflowId, true);
Optional<TaskModel> taskOptional;
if (StringUtils.isNotEmpty(taskId)) {
taskOptional =
workflow.getTasks().stream()
Expand Down Expand Up @@ -145,11 +145,11 @@ private void startMonitor(Status status, ObservableQueue queue) {
return;
}

Task task = taskOptional.get();
task.setStatus(status);
Task task = modelMapper.getTask(taskOptional.get());
task.setStatus(modelMapper.mapToTaskStatus(status));
task.getOutputData()
.putAll(objectMapper.convertValue(payloadJSON, _mapType));
executionService.updateTask(new TaskResult(task));
workflowExecutor.updateTask(new TaskResult(task));

List<String> failures = queue.ack(Collections.singletonList(msg));
if (!failures.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,7 @@ void updateWorkflowOutput(final WorkflowModel workflow, TaskModel task) {
workflow.setOutput(output);
}

@VisibleForTesting
boolean checkForWorkflowCompletion(final WorkflowModel workflow)
public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
throws TerminateWorkflowException {
List<TaskModel> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private boolean validateVariablesSize(
if (payloadSize > maxThreshold * 1024) {
String errorMsg =
String.format(
"The variables payload size: %dB of workflow: %s is greater than the permissible limit: %dKB",
"The variables payload size: %d of workflow: %s is greater than the permissible limit: %d bytes",
payloadSize, workflowId, maxThreshold);
LOGGER.error(errorMsg);
task.setReasonForIncompletion(errorMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
if (entity instanceof TaskModel) {
String errorMsg =
String.format(
"The payload size: %dB of task: %s in workflow: %s is greater than the permissible limit: %dKB",
"The payload size: %d of task: %s in workflow: %s is greater than the permissible limit: %d bytes",
payloadSize,
((TaskModel) entity).getTaskId(),
((TaskModel) entity).getWorkflowInstanceId(),
Expand All @@ -135,7 +135,7 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
} else {
String errorMsg =
String.format(
"The output payload size: %dB of workflow: %s is greater than the permissible limit: %dKB",
"The output payload size: %dB of workflow: %s is greater than the permissible limit: %d bytes",
payloadSize,
((WorkflowModel) entity).getWorkflowId(),
maxThreshold);
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;

Expand Down Expand Up @@ -62,10 +61,6 @@ public boolean isSuccessful() {
public boolean isRetriable() {
return retriable;
}

public static Task.Status getTaskStatusDTO(Status status) {
return Task.Status.valueOf(status.name());
}
}

private String taskType;
Expand Down
Loading

0 comments on commit 5d23971

Please sign in to comment.