From 5998fca69a91c34dacd6a305b257cac9418831f7 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 25 Mar 2021 15:52:29 -0700 Subject: [PATCH] separate event poller and processor --- ...va => DefaultEventQueueProcessorTest.java} | 14 +- .../core/LifecycleAwareComponent.java | 14 +- .../core/config/SchedulerConfiguration.java | 37 +++-- ...cessor.java => DefaultEventProcessor.java} | 115 +++----------- .../core/events/DefaultEventQueueManager.java | 147 ++++++++++++++++++ ...tProcessor.java => EventQueueManager.java} | 2 +- ...r.java => DefaultEventQueueProcessor.java} | 24 +-- .../core/execution/WorkflowPoller.java | 20 +-- .../tasks/SystemTaskWorkerCoordinator.java | 15 +- .../conductor/service/EventServiceImpl.java | 18 +-- .../service/MetadataServiceImpl.java | 11 +- ...or.java => TestDefaultEventProcessor.java} | 137 ++++++---------- .../conductor/core/utils/JsonUtilsTest.java | 21 ++- .../conductor/service/EventServiceTest.java | 27 ++-- .../service/MetadataServiceTest.java | 35 ++--- .../rest/controllers/QueueAdminResource.java | 16 +- 16 files changed, 345 insertions(+), 308 deletions(-) rename contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/{QueueManagerTest.java => DefaultEventQueueProcessorTest.java} (89%) rename core/src/main/java/com/netflix/conductor/core/events/{SimpleEventProcessor.java => DefaultEventProcessor.java} (76%) create mode 100644 core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java rename core/src/main/java/com/netflix/conductor/core/events/{EventProcessor.java => EventQueueManager.java} (95%) rename core/src/main/java/com/netflix/conductor/core/events/queue/{QueueManager.java => DefaultEventQueueProcessor.java} (94%) rename core/src/test/java/com/netflix/conductor/core/events/{TestSimpleEventProcessor.java => TestDefaultEventProcessor.java} (78%) diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java similarity index 89% rename from contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java rename to contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java index ed80065525..d65658cd57 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java @@ -20,7 +20,7 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import com.netflix.conductor.core.events.queue.QueueManager; +import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.service.ExecutionService; import org.junit.Before; @@ -51,11 +51,11 @@ @SuppressWarnings("unchecked") @ContextConfiguration(classes = {ObjectMapperConfiguration.class}) @RunWith(SpringRunner.class) -public class QueueManagerTest { +public class DefaultEventQueueProcessorTest { private static SQSObservableQueue queue; private static ExecutionService executionService; - private QueueManager queueManager; + private DefaultEventQueueProcessor defaultEventQueueProcessor; @Autowired private ObjectMapper objectMapper; @@ -67,7 +67,7 @@ public class QueueManagerTest { public void init() { Map queues = new HashMap<>(); queues.put(Status.COMPLETED, queue); - queueManager = new QueueManager(queues, executionService, objectMapper); + defaultEventQueueProcessor = new DefaultEventQueueProcessor(queues, executionService, objectMapper); } @BeforeClass @@ -125,7 +125,7 @@ public static void setup() { @Test public void test() throws Exception { - queueManager.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED); + defaultEventQueueProcessor.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED); Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS); assertTrue(updatedTasks.stream().anyMatch(task -> task.getTaskId().equals("t0"))); @@ -133,13 +133,13 @@ public void test() throws Exception { @Test(expected = IllegalArgumentException.class) public void testFailure() throws Exception { - queueManager.updateByTaskRefName("v_1", "t1", new HashMap<>(), Status.CANCELED); + defaultEventQueueProcessor.updateByTaskRefName("v_1", "t1", new HashMap<>(), Status.CANCELED); Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS); } @Test public void testWithTaskId() throws Exception { - queueManager.updateByTaskId("v_2", "t2", new HashMap<>(), Status.COMPLETED); + defaultEventQueueProcessor.updateByTaskId("v_2", "t2", new HashMap<>(), Status.COMPLETED); Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS); assertTrue(updatedTasks.stream().anyMatch(task -> task.getTaskId().equals("t2"))); } diff --git a/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java index eba224a3b5..1b468d8573 100644 --- a/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java +++ b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java @@ -25,19 +25,27 @@ public abstract class LifecycleAwareComponent implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(LifecycleAwareComponent.class); @Override - public void start() { + public final void start() { running = true; LOGGER.info("{} started.", getClass().getSimpleName()); + doStart(); } @Override - public void stop() { + public final void stop() { running = false; LOGGER.info("{} stopped.", getClass().getSimpleName()); + doStop(); } @Override - public boolean isRunning() { + public final boolean isRunning() { return running; } + + public void doStart() { + } + + public void doStop() { + } } diff --git a/core/src/main/java/com/netflix/conductor/core/config/SchedulerConfiguration.java b/core/src/main/java/com/netflix/conductor/core/config/SchedulerConfiguration.java index 8453685c97..f99c59f6f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/SchedulerConfiguration.java +++ b/core/src/main/java/com/netflix/conductor/core/config/SchedulerConfiguration.java @@ -15,23 +15,29 @@ package com.netflix.conductor.core.config; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; import rx.Scheduler; import rx.schedulers.Schedulers; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - @Configuration(proxyBeanMethods = false) @EnableScheduling @EnableAsync -public class SchedulerConfiguration { +public class SchedulerConfiguration implements SchedulingConfigurer { + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfiguration.class); public static final String SWEEPER_EXECUTOR_NAME = "WorkflowSweeperExecutor"; + public static final String EVENT_PROCESSOR_EXECUTOR_NAME = "EventProcessorExecutor"; /** * Used by some {@link com.netflix.conductor.core.events.queue.ObservableQueue} implementations. @@ -41,10 +47,10 @@ public class SchedulerConfiguration { @Bean public Scheduler scheduler(ConductorProperties properties) { ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("event-queue-poll-scheduler-thread-%d") - .build(); + .setNameFormat("event-queue-poll-scheduler-thread-%d") + .build(); Executor executorService = Executors - .newFixedThreadPool(properties.getEventQueueSchedulerPollThreadCount(), threadFactory); + .newFixedThreadPool(properties.getEventQueueSchedulerPollThreadCount(), threadFactory); return Schedulers.from(executorService); } @@ -53,11 +59,20 @@ public Scheduler scheduler(ConductorProperties properties) { public Executor sweeperExecutor(ConductorProperties properties) { if (properties.getSweeperThreadCount() <= 0) { throw new IllegalStateException("Cannot set workflow sweeper thread count to <=0. To disable workflow " - + "sweeper, set conductor.workflow-sweeper.enabled=false."); + + "sweeper, set conductor.workflow-sweeper.enabled=false."); } ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("sweeper-thread-%d") - .build(); + .setNameFormat("sweeper-thread-%d") + .build(); return Executors.newFixedThreadPool(properties.getSweeperThreadCount(), threadFactory); } + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + threadPoolTaskScheduler.setPoolSize(2); // equal to the number of scheduled jobs + threadPoolTaskScheduler.setThreadNamePrefix("scheduled-task-pool-"); + threadPoolTaskScheduler.initialize(); + taskRegistrar.setTaskScheduler(threadPoolTaskScheduler); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java similarity index 76% rename from core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java rename to core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java index 616d47b97e..bc3c064dda 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2021 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -13,13 +13,12 @@ package com.netflix.conductor.core.events; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.events.EventExecution.Status; import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.utils.RetryUtil; -import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; @@ -31,137 +30,64 @@ import com.spotify.futures.CompletableFutures; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.ThreadFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.Lifecycle; import org.springframework.stereotype.Component; /** - * Event Processor is used to dispatch actions based on the incoming events to execution queue. + * Event Processor is used to dispatch actions configured in the event handlers, based on incoming events to the event + * queues. * *

Set conductor.default-event-processor.enabled=false to disable event processing.

*/ @Component @ConditionalOnProperty(name = "conductor.default-event-processor.enabled", havingValue = "true", matchIfMissing = true) -public class SimpleEventProcessor extends LifecycleAwareComponent implements EventProcessor { +public class DefaultEventProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventProcessor.class); - private static final String CLASS_NAME = SimpleEventProcessor.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventProcessor.class); private static final int RETRY_COUNT = 3; private final MetadataService metadataService; private final ExecutionService executionService; private final ActionProcessor actionProcessor; - private final EventQueues eventQueues; - private final ExecutorService executorService; - private final Map eventToQueueMap = new ConcurrentHashMap<>(); + private final ExecutorService eventActionExecutorService; private final ObjectMapper objectMapper; private final JsonUtils jsonUtils; private final boolean isEventMessageIndexingEnabled; - public SimpleEventProcessor(ExecutionService executionService, MetadataService metadataService, - ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, ConductorProperties properties, + public DefaultEventProcessor(ExecutionService executionService, MetadataService metadataService, + ActionProcessor actionProcessor, JsonUtils jsonUtils, ConductorProperties properties, ObjectMapper objectMapper) { this.executionService = executionService; this.metadataService = metadataService; this.actionProcessor = actionProcessor; - this.eventQueues = eventQueues; this.objectMapper = objectMapper; this.jsonUtils = jsonUtils; - this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled(); - int executorThreadCount = properties.getEventProcessorThreadCount(); - if (executorThreadCount <= 0) { + if (properties.getEventProcessorThreadCount() <= 0) { throw new IllegalStateException("Cannot set event processor thread count to <=0. To disable event " + "processing, set conductor.default-event-processor.enabled=false."); } - executorService = Executors.newFixedThreadPool(executorThreadCount); - refresh(); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS); - LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount); - } - - /** - * @return Returns a map of queues which are active. Key is event name and value is queue URI - */ - public Map getQueues() { - Map queues = new HashMap<>(); - eventToQueueMap.forEach((key, value) -> queues.put(key, value.getName())); - return queues; - } - - public Map> getQueueSizes() { - Map> queues = new HashMap<>(); - eventToQueueMap.forEach((key, value) -> { - Map size = new HashMap<>(); - size.put(value.getName(), value.size()); - queues.put(key, size); - }); - return queues; - } - - @Override - public void start() { - eventToQueueMap.forEach((event, queue) -> { - LOGGER.debug("Start listening for events: {}", event); - queue.start(); - }); - } - - @Override - public void stop() { - eventToQueueMap.forEach((event, queue) -> { - LOGGER.debug("Stop listening for events: {}", event); - queue.stop(); - }); - } - - private void refresh() { - try { - Set events = metadataService.getAllEventHandlers().stream() - .map(EventHandler::getEvent) - .collect(Collectors.toSet()); - - List createdQueues = new LinkedList<>(); - events.forEach(event -> eventToQueueMap.computeIfAbsent(event, s -> { - ObservableQueue q = eventQueues.getQueue(event); - createdQueues.add(q); - return q; - } - )); - - // start listening on all of the created queues - createdQueues.stream() - .filter(Objects::nonNull) - .peek(Lifecycle::start) - .forEach(this::listen); - - } catch (Exception e) { - Monitors.error(CLASS_NAME, "refresh"); - LOGGER.error("refresh event queues failed", e); - } - } + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("event-action-executor-thread-%d") + .build(); + eventActionExecutorService = Executors + .newFixedThreadPool(properties.getEventProcessorThreadCount(), threadFactory); - private void listen(ObservableQueue queue) { - queue.observe().subscribe((Message msg) -> handle(queue, msg)); + this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled(); + LOGGER.info("Event Processing is ENABLED"); } - protected void handle(ObservableQueue queue, Message msg) { + public void handle(ObservableQueue queue, Message msg) { try { if (isEventMessageIndexingEnabled) { executionService.addMessage(queue.getName(), msg); @@ -264,7 +190,7 @@ protected CompletableFuture> executeActionsForEventHandler( if (executionService.addEventExecution(eventExecution)) { futuresList.add(CompletableFuture .supplyAsync(() -> execute(eventExecution, action, getPayloadObject(msg.getPayload())), - executorService)); + eventActionExecutorService)); } else { LOGGER.warn("Duplicate delivery/execution of message: {}", msg.getId()); } @@ -279,7 +205,6 @@ protected CompletableFuture> executeActionsForEventHandler( * @return the event execution updated with execution output, if the execution is completed/failed with * non-transient error the input event execution, if the execution failed due to transient error */ - @VisibleForTesting protected EventExecution execute(EventExecution eventExecution, Action action, Object payload) { try { String methodName = "executeEventAction"; diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java new file mode 100644 index 0000000000..8a1589b02b --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventQueueManager.java @@ -0,0 +1,147 @@ +/* + * + * * Copyright 2021 Netflix, Inc. + * *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * *

+ * * http://www.apache.org/licenses/LICENSE-2.0 + * *

+ * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations under the License. + * + */ +package com.netflix.conductor.core.events; + +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.core.LifecycleAwareComponent; +import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.dao.EventHandlerDAO; +import com.netflix.conductor.metrics.Monitors; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.Lifecycle; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * Manages the event queues registered in the system and sets up listeners for these. + *

+ * Manages the lifecycle of - + *

    + *
  • Queues registered with event handlers
  • + *
  • Default event queues that Conductor listens on
  • + *
+ * + * @see DefaultEventQueueProcessor + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +@ConditionalOnProperty(name = "conductor.default-event-processor.enabled", havingValue = "true", matchIfMissing = true) +public class DefaultEventQueueManager extends LifecycleAwareComponent implements EventQueueManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueManager.class); + + private final EventHandlerDAO eventHandlerDAO; + private final EventQueues eventQueues; + private final DefaultEventProcessor defaultEventProcessor; + private final Map eventToQueueMap = new ConcurrentHashMap<>(); + private final Map defaultQueues; + + public DefaultEventQueueManager(Map defaultQueues, EventHandlerDAO eventHandlerDAO, + EventQueues eventQueues, DefaultEventProcessor defaultEventProcessor) { + this.defaultQueues = defaultQueues; + this.eventHandlerDAO = eventHandlerDAO; + this.eventQueues = eventQueues; + this.defaultEventProcessor = defaultEventProcessor; + } + + /** + * @return Returns a map of queues which are active. Key is event name and value is queue URI + */ + @Override + public Map getQueues() { + Map queues = new HashMap<>(); + eventToQueueMap.forEach((key, value) -> queues.put(key, value.getName())); + return queues; + } + + @Override + public Map> getQueueSizes() { + Map> queues = new HashMap<>(); + eventToQueueMap.forEach((key, value) -> { + Map size = new HashMap<>(); + size.put(value.getName(), value.size()); + queues.put(key, size); + }); + return queues; + } + + @Override + public void doStart() { + eventToQueueMap.forEach((event, queue) -> { + LOGGER.info("Start listening for events: {}", event); + queue.start(); + }); + defaultQueues.forEach((status, queue) -> { + LOGGER.info("Start listening on default queue {} for status {}", status, queue.getName()); + queue.start(); + }); + } + + @Override + public void doStop() { + eventToQueueMap.forEach((event, queue) -> { + LOGGER.info("Stop listening for events: {}", event); + queue.stop(); + }); + defaultQueues.forEach((status, queue) -> { + LOGGER.info("Stop listening on default queue {} for status {}", status, queue.getName()); + queue.stop(); + }); + } + + @Scheduled(fixedDelay = 60_000) + public void refreshEventQueues() { + try { + Set events = eventHandlerDAO.getAllEventHandlers().stream() + .map(EventHandler::getEvent) + .collect(Collectors.toSet()); + + List createdQueues = new LinkedList<>(); + events.forEach(event -> eventToQueueMap.computeIfAbsent(event, s -> { + ObservableQueue q = eventQueues.getQueue(event); + createdQueues.add(q); + return q; + } + )); + + // start listening on all of the created queues + createdQueues.stream() + .filter(Objects::nonNull) + .peek(Lifecycle::start) + .forEach(this::listen); + + } catch (Exception e) { + Monitors.error(getClass().getSimpleName(), "refresh"); + LOGGER.error("refresh event queues failed", e); + } + } + + private void listen(ObservableQueue queue) { + queue.observe().subscribe((Message msg) -> defaultEventProcessor.handle(queue, msg)); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/EventQueueManager.java similarity index 95% rename from core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java rename to core/src/main/java/com/netflix/conductor/core/events/EventQueueManager.java index a8ab1a34d6..9b532c38d8 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/EventQueueManager.java @@ -14,7 +14,7 @@ import java.util.Map; -public interface EventProcessor { +public interface EventQueueManager { Map getQueues(); diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/QueueManager.java b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java similarity index 94% rename from core/src/main/java/com/netflix/conductor/core/events/queue/QueueManager.java rename to core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java index 97a70bf5bb..a647c72a63 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/QueueManager.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2021 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -23,31 +23,33 @@ import com.netflix.conductor.core.exception.ApplicationException.Code; import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.service.ExecutionService; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +/** + * Monitors and processes messages on the default event queues that Conductor listens on. + *

+ * The default event queue type is controlled using the property: conductor.default-event-queue.type + */ @Component -public class QueueManager { +public class DefaultEventQueueProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(QueueManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class); private final Map queues; private final ExecutionService executionService; private static final TypeReference> _mapType = new TypeReference>() { }; private final ObjectMapper objectMapper; - @Autowired - public QueueManager(Map queues, ExecutionService executionService, + public DefaultEventQueueProcessor(Map queues, ExecutionService executionService, ObjectMapper objectMapper) { this.queues = queues; this.executionService = executionService; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowPoller.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowPoller.java index 7c298a17f3..892ebb75b9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowPoller.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowPoller.java @@ -14,21 +14,21 @@ */ package com.netflix.conductor.core.execution; +import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE; + import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE; - +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @ConditionalOnProperty(name = "conductor.workflow-sweeper.enabled", havingValue = "true", matchIfMissing = true) public class WorkflowPoller extends LifecycleAwareComponent { @@ -53,14 +53,14 @@ public void pollAndSweep() { LOGGER.debug("Component stopped, skip workflow sweep"); } else { List workflowIds = queueDAO.pop(DECIDER_QUEUE, - properties.getSweeperThreadCount(), 2000); + properties.getSweeperThreadCount(), 2000); if (workflowIds != null) { // wait for all workflow ids to be "swept" CompletableFuture.allOf(workflowIds - .stream() - .map(workflowSweeper::sweepAsync) - .toArray(CompletableFuture[]::new)) - .get(); + .stream() + .map(workflowSweeper::sweepAsync) + .toArray(CompletableFuture[]::new)) + .get(); LOGGER.debug("Sweeper processed {} from the decider queue", String.join(",", workflowIds)); } //NOTE: Disabling the sweeper implicitly disables this metric. diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index 9987426d9b..efa5c75a26 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -20,14 +20,6 @@ import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionService; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,6 +30,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component diff --git a/core/src/main/java/com/netflix/conductor/service/EventServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/EventServiceImpl.java index 70a854c783..4ba8e05bbb 100644 --- a/core/src/main/java/com/netflix/conductor/service/EventServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/EventServiceImpl.java @@ -13,25 +13,25 @@ package com.netflix.conductor.service; import com.netflix.conductor.common.metadata.events.EventHandler; -import com.netflix.conductor.core.events.EventProcessor; +import com.netflix.conductor.core.events.EventQueueManager; import com.netflix.conductor.core.events.EventQueues; -import org.springframework.stereotype.Service; - import java.util.List; import java.util.Map; import java.util.Optional; +import org.springframework.stereotype.Service; @Service public class EventServiceImpl implements EventService { private final MetadataService metadataService; - private final EventProcessor eventProcessor; + private final EventQueueManager eventQueueManager; private final EventQueues eventQueues; - public EventServiceImpl(MetadataService metadataService, Optional optionalEventProcessor, EventQueues eventQueues) { + public EventServiceImpl(MetadataService metadataService, Optional optionalEventQueueManager, + EventQueues eventQueues) { this.metadataService = metadataService; - // EventProcessor is optional and may not be enabled - this.eventProcessor = optionalEventProcessor.orElse(null); + // EventQueueManager is optional and may not be enabled + this.eventQueueManager = optionalEventQueueManager.orElse(null); this.eventQueues = eventQueues; } @@ -89,10 +89,10 @@ public List getEventHandlersForEvent(String event, boolean activeO * @return map of event queues */ public Map getEventQueues(boolean verbose) { - if(eventProcessor == null) { + if (eventQueueManager == null) { throw new IllegalStateException("Event processing is DISABLED"); } - return (verbose ? eventProcessor.getQueueSizes() : eventProcessor.getQueues()); + return (verbose ? eventQueueManager.getQueueSizes() : eventQueueManager.getQueues()); } /** diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index 4430f1ccd4..fb7985485e 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -18,29 +18,26 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.events.EventQueues; import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.core.exception.ApplicationException.Code; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.validations.ValidationContext; -import org.springframework.stereotype.Service; - import java.util.List; import java.util.Optional; +import org.springframework.stereotype.Service; +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Service public class MetadataServiceImpl implements MetadataService { private final MetadataDAO metadataDAO; private final EventHandlerDAO eventHandlerDAO; - private final EventQueues eventQueues; - public MetadataServiceImpl(MetadataDAO metadataDAO, EventHandlerDAO eventHandlerDAO, EventQueues eventQueues, + public MetadataServiceImpl(MetadataDAO metadataDAO, EventHandlerDAO eventHandlerDAO, ConductorProperties properties) { this.metadataDAO = metadataDAO; this.eventHandlerDAO = eventHandlerDAO; - this.eventQueues = eventQueues; ValidationContext.initialize(metadataDAO); OwnerEmailMandatoryConstraint.WorkflowTaskValidValidator @@ -173,7 +170,6 @@ public void unregisterWorkflowDef(String name, Integer version) { * the name */ public void addEventHandler(EventHandler eventHandler) { - eventQueues.getQueue(eventHandler.getEvent()); eventHandlerDAO.addEventHandler(eventHandler); } @@ -181,7 +177,6 @@ public void addEventHandler(EventHandler eventHandler) { * @param eventHandler Event handler to be updated. */ public void updateEventHandler(EventHandler eventHandler) { - eventQueues.getQueue(eventHandler.getEvent()); eventHandlerDAO.updateEventHandler(eventHandler); } diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java similarity index 78% rename from core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java rename to core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java index 675f9979d0..6803afcf61 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Netflix, Inc. + * Copyright 2021 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -12,8 +12,21 @@ */ package com.netflix.conductor.core.events; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.conductor.common.config.ObjectMapperConfiguration; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.events.EventHandler; @@ -22,7 +35,6 @@ import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow; import com.netflix.conductor.common.metadata.events.EventHandler.TaskDetails; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.queue.Message; @@ -33,45 +45,26 @@ import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.stubbing.Answer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; -import rx.Observable; - import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.stubbing.Answer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; @ContextConfiguration(classes = {ObjectMapperConfiguration.class}) @RunWith(SpringRunner.class) -public class TestSimpleEventProcessor { +public class TestDefaultEventProcessor { private String event; - private String queueURI; - private ObservableQueue queue; private MetadataService metadataService; private ExecutionService executionService; @@ -81,6 +74,7 @@ public class TestSimpleEventProcessor { private ParametersUtils parametersUtils; private JsonUtils jsonUtils; private ConductorProperties properties; + private Message message; @Autowired private ObjectMapper objectMapper; @@ -88,7 +82,7 @@ public class TestSimpleEventProcessor { @Before public void setup() { event = "sqs:arn:account090:sqstest1"; - queueURI = "arn:account090:sqstest1"; + String queueURI = "arn:account090:sqstest1"; metadataService = mock(MetadataService.class); executionService = mock(ExecutionService.class); @@ -97,23 +91,14 @@ public void setup() { parametersUtils = new ParametersUtils(objectMapper); jsonUtils = new JsonUtils(objectMapper); - EventQueueProvider provider = mock(EventQueueProvider.class); queue = mock(ObservableQueue.class); - Message[] messages = new Message[1]; - messages[0] = new Message("t0", + message = new Message("t0", "{\"Type\":\"Notification\",\"MessageId\":\"7e4e6415-01e9-5caf-abaa-37fd05d446ff\",\"Message\":\"{\\n \\\"testKey1\\\": \\\"level1\\\",\\n \\\"metadata\\\": {\\n \\\"testKey2\\\": 123456 }\\n }\",\"Timestamp\":\"2018-08-10T21:22:05.029Z\",\"SignatureVersion\":\"1\"}", "t0"); - Observable msgObservable = Observable.from(messages); - when(queue.observe()).thenReturn(msgObservable); when(queue.getURI()).thenReturn(queueURI); when(queue.getName()).thenReturn(queueURI); when(queue.getType()).thenReturn("sqs"); - when(provider.getQueue(queueURI)).thenReturn(queue); - - Map providers = new HashMap<>(); - providers.put("sqs", provider); - eventQueues = new EventQueues(providers, parametersUtils); properties = mock(ConductorProperties.class); when(properties.isEventMessageIndexingEnabled()).thenReturn(true); @@ -148,7 +133,6 @@ public void testEventProcessor() { eventHandler.setEvent(event); - when(metadataService.getAllEventHandlers()).thenReturn(Collections.singletonList(eventHandler)); when(metadataService.getEventHandlersForEvent(event, true)).thenReturn(Collections.singletonList(eventHandler)); when(executionService.addEventExecution(any())).thenReturn(true); when(queue.rePublishIfNoAck()).thenReturn(false); @@ -175,28 +159,13 @@ public void testEventProcessor() { when(workflowExecutor.getWorkflow(completeTaskAction.getComplete_task().getWorkflowId(), true)) .thenReturn(workflow); - WorkflowDef workflowDef = new WorkflowDef(); - workflowDef.setVersion(startWorkflowAction.getStart_workflow().getVersion()); - workflowDef.setName(startWorkflowAction.getStart_workflow().getName()); - when(metadataService.getWorkflowDef(any(), any())).thenReturn(workflowDef); - SimpleActionProcessor actionProcessor = new SimpleActionProcessor(workflowExecutor, parametersUtils, jsonUtils); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); - assertNotNull(eventProcessor.getQueues()); - assertEquals(1, eventProcessor.getQueues().size()); - - String queueEvent = eventProcessor.getQueues().keySet().iterator().next(); - assertEquals(eventHandler.getEvent(), queueEvent); - - String eventProcessorQueue = eventProcessor.getQueues().values().iterator().next(); - assertEquals(queueURI, eventProcessorQueue); - - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); + eventProcessor.handle(queue, message); assertTrue(started.get()); assertTrue(completed.get()); - verify(queue, atMost(1)).ack(any()); verify(queue, never()).publish(any()); } @@ -224,7 +193,6 @@ public void testEventHandlerWithCondition() { eventHandler.setEvent(event); - when(metadataService.getAllEventHandlers()).thenReturn(Collections.singletonList(eventHandler)); when(metadataService.getEventHandlersForEvent(event, true)).thenReturn(Collections.singletonList(eventHandler)); when(executionService.addEventExecution(any())).thenReturn(true); when(queue.rePublishIfNoAck()).thenReturn(false); @@ -238,18 +206,11 @@ public void testEventHandlerWithCondition() { eq(startWorkflowAction.getStart_workflow().getVersion()), eq(startWorkflowAction.getStart_workflow().getCorrelationId()), anyMap(), eq(null), eq(event), eq(null)); - WorkflowDef workflowDef = new WorkflowDef(); - workflowDef.setName(startWorkflowAction.getStart_workflow().getName()); - when(metadataService.getWorkflowDef(any(), any())).thenReturn(workflowDef); - SimpleActionProcessor actionProcessor = new SimpleActionProcessor(workflowExecutor, parametersUtils, jsonUtils); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); - assertNotNull(eventProcessor.getQueues()); - assertEquals(1, eventProcessor.getQueues().size()); - - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); + eventProcessor.handle(queue, message); assertTrue(started.get()); } @@ -269,19 +230,14 @@ public void testEventProcessorWithRetriableError() { eventHandler.getActions().add(completeTaskAction); when(queue.rePublishIfNoAck()).thenReturn(false); - when(metadataService.getAllEventHandlers()).thenReturn(Collections.singletonList(eventHandler)); when(metadataService.getEventHandlersForEvent(event, true)).thenReturn(Collections.singletonList(eventHandler)); when(executionService.addEventExecution(any())).thenReturn(true); when(actionProcessor.execute(any(), any(), any(), any())) .thenThrow(new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "some retriable error")); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); - assertNotNull(eventProcessor.getQueues()); - assertEquals(1, eventProcessor.getQueues().size()); - - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); + eventProcessor.handle(queue, message); verify(queue, never()).ack(any()); verify(queue, never()).publish(any()); } @@ -301,20 +257,15 @@ public void testEventProcessorWithNonRetriableError() { completeTaskAction.getComplete_task().setOutput(new HashMap<>()); eventHandler.getActions().add(completeTaskAction); - when(metadataService.getAllEventHandlers()).thenReturn(Collections.singletonList(eventHandler)); when(metadataService.getEventHandlersForEvent(event, true)).thenReturn(Collections.singletonList(eventHandler)); when(executionService.addEventExecution(any())).thenReturn(true); when(actionProcessor.execute(any(), any(), any(), any())) .thenThrow(new ApplicationException(ApplicationException.Code.INVALID_INPUT, "some non-retriable error")); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); - assertNotNull(eventProcessor.getQueues()); - assertEquals(1, eventProcessor.getQueues().size()); - - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); + eventProcessor.handle(queue, message); verify(queue, atMost(1)).ack(any()); verify(queue, never()).publish(any()); } @@ -327,8 +278,8 @@ public void testExecuteInvalidAction() { throw new UnsupportedOperationException("error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setName("handler"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); @@ -350,8 +301,8 @@ public void testExecuteNonRetriableApplicationException() { throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "some non-retriable error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); @@ -374,8 +325,8 @@ public void testExecuteRetriableApplicationException() { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "some retriable error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, - actionProcessor, eventQueues, jsonUtils, properties, objectMapper); + DefaultEventProcessor eventProcessor = new DefaultEventProcessor(executionService, metadataService, + actionProcessor, jsonUtils, properties, objectMapper); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); diff --git a/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java b/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java index 405e157261..98182187d4 100644 --- a/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java +++ b/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java @@ -12,25 +12,24 @@ */ package com.netflix.conductor.core.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.common.config.ObjectMapperConfiguration; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; @ContextConfiguration(classes = {ObjectMapperConfiguration.class}) @RunWith(SpringRunner.class) diff --git a/core/src/test/java/com/netflix/conductor/service/EventServiceTest.java b/core/src/test/java/com/netflix/conductor/service/EventServiceTest.java index 614e3b0c1c..52df4af5f9 100644 --- a/core/src/test/java/com/netflix/conductor/service/EventServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/EventServiceTest.java @@ -12,9 +12,18 @@ */ package com.netflix.conductor.service; -import com.netflix.conductor.core.events.EventProcessor; +import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import com.netflix.conductor.core.events.EventQueueManager; import com.netflix.conductor.core.events.EventQueues; -import com.netflix.conductor.core.events.SimpleEventProcessor; +import com.netflix.conductor.core.events.DefaultEventQueueManager; +import java.util.Optional; +import java.util.Set; +import javax.validation.ConstraintViolationException; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -23,16 +32,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.junit4.SpringRunner; -import javax.validation.ConstraintViolationException; -import java.util.Optional; -import java.util.Set; - -import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - @SuppressWarnings("SpringJavaAutowiredMembersInspection") @RunWith(SpringRunner.class) @EnableAutoConfiguration @@ -44,9 +43,9 @@ static class TestEventConfiguration { @Bean public EventService eventService() { MetadataService metadataService = mock(MetadataService.class); - EventProcessor eventProcessor = mock(SimpleEventProcessor.class); + EventQueueManager eventQueueManager = mock(DefaultEventQueueManager.class); EventQueues eventQueues = mock(EventQueues.class); - return new EventServiceImpl(metadataService, Optional.of(eventProcessor), eventQueues); + return new EventServiceImpl(metadataService, Optional.of(eventQueueManager), eventQueues); } } diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index e64fd1d53b..3f1a18dc08 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -12,15 +12,29 @@ */ package com.netflix.conductor.service; +import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.events.EventQueues; import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.dao.MetadataDAO; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import javax.validation.ConstraintViolationException; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -29,22 +43,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.junit4.SpringRunner; -import javax.validation.ConstraintViolationException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @SuppressWarnings("SpringJavaAutowiredMembersInspection") @RunWith(SpringRunner.class) @EnableAutoConfiguration @@ -68,8 +66,7 @@ public ConductorProperties properties() { @Bean public MetadataService metadataService(MetadataDAO metadataDAO, ConductorProperties properties) { EventHandlerDAO eventHandlerDAO = mock(EventHandlerDAO.class); - EventQueues eventQueues = mock(EventQueues.class); - return new MetadataServiceImpl(metadataDAO, eventHandlerDAO, eventQueues, properties); + return new MetadataServiceImpl(metadataDAO, eventHandlerDAO, properties); } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java index 271992f333..12dffce333 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java @@ -13,7 +13,7 @@ package com.netflix.conductor.rest.controllers; import com.netflix.conductor.common.metadata.tasks.Task.Status; -import com.netflix.conductor.core.events.queue.QueueManager; +import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor; import io.swagger.v3.oas.annotations.Operation; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -30,35 +30,35 @@ @RequestMapping(QUEUE) public class QueueAdminResource { - private final QueueManager queueManager; + private final DefaultEventQueueProcessor defaultEventQueueProcessor; - public QueueAdminResource(QueueManager queueManager) { - this.queueManager = queueManager; + public QueueAdminResource(DefaultEventQueueProcessor defaultEventQueueProcessor) { + this.defaultEventQueueProcessor = defaultEventQueueProcessor; } @Operation(summary = "Get the queue length") @GetMapping(value = "/size") public Map size() { - return queueManager.size(); + return defaultEventQueueProcessor.size(); } @Operation(summary = "Get Queue Names") @GetMapping(value = "/") public Map names() { - return queueManager.queues(); + return defaultEventQueueProcessor.queues(); } @Operation(summary = "Publish a message in queue to mark a wait task as completed.") @PostMapping(value = "/update/{workflowId}/{taskRefName}/{status}") public void update(@PathVariable("workflowId") String workflowId, @PathVariable("taskRefName") String taskRefName, @PathVariable("status") Status status, @RequestBody Map output) throws Exception { - queueManager.updateByTaskRefName(workflowId, taskRefName, output, status); + defaultEventQueueProcessor.updateByTaskRefName(workflowId, taskRefName, output, status); } @Operation(summary = "Publish a message in queue to mark a wait task (by taskId) as completed.") @PostMapping("/update/{workflowId}/task/{taskId}/{status}") public void updateByTaskId(@PathVariable("workflowId") String workflowId, @PathVariable("taskId") String taskId, @PathVariable("status") Status status, @RequestBody Map output) throws Exception { - queueManager.updateByTaskId(workflowId, taskId, output, status); + defaultEventQueueProcessor.updateByTaskId(workflowId, taskId, output, status); } }