Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
separate event poller and processor
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Mar 30, 2021
1 parent ee45885 commit 5998fca
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.events.queue.QueueManager;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.service.ExecutionService;
import org.junit.Before;
Expand Down Expand Up @@ -51,11 +51,11 @@
@SuppressWarnings("unchecked")
@ContextConfiguration(classes = {ObjectMapperConfiguration.class})
@RunWith(SpringRunner.class)
public class QueueManagerTest {
public class DefaultEventQueueProcessorTest {

private static SQSObservableQueue queue;
private static ExecutionService executionService;
private QueueManager queueManager;
private DefaultEventQueueProcessor defaultEventQueueProcessor;

@Autowired
private ObjectMapper objectMapper;
Expand All @@ -67,7 +67,7 @@ public class QueueManagerTest {
public void init() {
Map<Status, ObservableQueue> queues = new HashMap<>();
queues.put(Status.COMPLETED, queue);
queueManager = new QueueManager(queues, executionService, objectMapper);
defaultEventQueueProcessor = new DefaultEventQueueProcessor(queues, executionService, objectMapper);
}

@BeforeClass
Expand Down Expand Up @@ -125,21 +125,21 @@ public static void setup() {

@Test
public void test() throws Exception {
queueManager.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED);
defaultEventQueueProcessor.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED);
Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS);

assertTrue(updatedTasks.stream().anyMatch(task -> task.getTaskId().equals("t0")));
}

@Test(expected = IllegalArgumentException.class)
public void testFailure() throws Exception {
queueManager.updateByTaskRefName("v_1", "t1", new HashMap<>(), Status.CANCELED);
defaultEventQueueProcessor.updateByTaskRefName("v_1", "t1", new HashMap<>(), Status.CANCELED);
Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS);
}

@Test
public void testWithTaskId() throws Exception {
queueManager.updateByTaskId("v_2", "t2", new HashMap<>(), Status.COMPLETED);
defaultEventQueueProcessor.updateByTaskId("v_2", "t2", new HashMap<>(), Status.COMPLETED);
Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS);
assertTrue(updatedTasks.stream().anyMatch(task -> task.getTaskId().equals("t2")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ public abstract class LifecycleAwareComponent implements SmartLifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(LifecycleAwareComponent.class);

@Override
public void start() {
public final void start() {
running = true;
LOGGER.info("{} started.", getClass().getSimpleName());
doStart();
}

@Override
public void stop() {
public final void stop() {
running = false;
LOGGER.info("{} stopped.", getClass().getSimpleName());
doStop();
}

@Override
public boolean isRunning() {
public final boolean isRunning() {
return running;
}

public void doStart() {
}

public void doStop() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
package com.netflix.conductor.core.config;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import rx.Scheduler;
import rx.schedulers.Schedulers;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

@Configuration(proxyBeanMethods = false)
@EnableScheduling
@EnableAsync
public class SchedulerConfiguration {
public class SchedulerConfiguration implements SchedulingConfigurer {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfiguration.class);
public static final String SWEEPER_EXECUTOR_NAME = "WorkflowSweeperExecutor";
public static final String EVENT_PROCESSOR_EXECUTOR_NAME = "EventProcessorExecutor";

/**
* Used by some {@link com.netflix.conductor.core.events.queue.ObservableQueue} implementations.
Expand All @@ -41,10 +47,10 @@ public class SchedulerConfiguration {
@Bean
public Scheduler scheduler(ConductorProperties properties) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("event-queue-poll-scheduler-thread-%d")
.build();
.setNameFormat("event-queue-poll-scheduler-thread-%d")
.build();
Executor executorService = Executors
.newFixedThreadPool(properties.getEventQueueSchedulerPollThreadCount(), threadFactory);
.newFixedThreadPool(properties.getEventQueueSchedulerPollThreadCount(), threadFactory);

return Schedulers.from(executorService);
}
Expand All @@ -53,11 +59,20 @@ public Scheduler scheduler(ConductorProperties properties) {
public Executor sweeperExecutor(ConductorProperties properties) {
if (properties.getSweeperThreadCount() <= 0) {
throw new IllegalStateException("Cannot set workflow sweeper thread count to <=0. To disable workflow "
+ "sweeper, set conductor.workflow-sweeper.enabled=false.");
+ "sweeper, set conductor.workflow-sweeper.enabled=false.");
}
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("sweeper-thread-%d")
.build();
.setNameFormat("sweeper-thread-%d")
.build();
return Executors.newFixedThreadPool(properties.getSweeperThreadCount(), threadFactory);
}

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(2); // equal to the number of scheduled jobs
threadPoolTaskScheduler.setThreadNamePrefix("scheduled-task-pool-");
threadPoolTaskScheduler.initialize();
taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -13,13 +13,12 @@
package com.netflix.conductor.core.events;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.events.EventExecution.Status;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.events.EventHandler.Action;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
Expand All @@ -31,137 +30,64 @@
import com.spotify.futures.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.Lifecycle;
import org.springframework.stereotype.Component;

/**
* Event Processor is used to dispatch actions based on the incoming events to execution queue.
* Event Processor is used to dispatch actions configured in the event handlers, based on incoming events to the event
* queues.
*
* <p><code>Set conductor.default-event-processor.enabled=false</code> to disable event processing.</p>
*/
@Component
@ConditionalOnProperty(name = "conductor.default-event-processor.enabled", havingValue = "true", matchIfMissing = true)
public class SimpleEventProcessor extends LifecycleAwareComponent implements EventProcessor {
public class DefaultEventProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventProcessor.class);
private static final String CLASS_NAME = SimpleEventProcessor.class.getSimpleName();
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventProcessor.class);
private static final int RETRY_COUNT = 3;

private final MetadataService metadataService;
private final ExecutionService executionService;
private final ActionProcessor actionProcessor;
private final EventQueues eventQueues;

private final ExecutorService executorService;
private final Map<String, ObservableQueue> eventToQueueMap = new ConcurrentHashMap<>();
private final ExecutorService eventActionExecutorService;
private final ObjectMapper objectMapper;
private final JsonUtils jsonUtils;
private final boolean isEventMessageIndexingEnabled;

public SimpleEventProcessor(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, ConductorProperties properties,
public DefaultEventProcessor(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, JsonUtils jsonUtils, ConductorProperties properties,
ObjectMapper objectMapper) {
this.executionService = executionService;
this.metadataService = metadataService;
this.actionProcessor = actionProcessor;
this.eventQueues = eventQueues;
this.objectMapper = objectMapper;
this.jsonUtils = jsonUtils;

this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled();
int executorThreadCount = properties.getEventProcessorThreadCount();
if (executorThreadCount <= 0) {
if (properties.getEventProcessorThreadCount() <= 0) {
throw new IllegalStateException("Cannot set event processor thread count to <=0. To disable event "
+ "processing, set conductor.default-event-processor.enabled=false.");
}
executorService = Executors.newFixedThreadPool(executorThreadCount);
refresh();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS);
LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount);
}

/**
* @return Returns a map of queues which are active. Key is event name and value is queue URI
*/
public Map<String, String> getQueues() {
Map<String, String> queues = new HashMap<>();
eventToQueueMap.forEach((key, value) -> queues.put(key, value.getName()));
return queues;
}

public Map<String, Map<String, Long>> getQueueSizes() {
Map<String, Map<String, Long>> queues = new HashMap<>();
eventToQueueMap.forEach((key, value) -> {
Map<String, Long> size = new HashMap<>();
size.put(value.getName(), value.size());
queues.put(key, size);
});
return queues;
}

@Override
public void start() {
eventToQueueMap.forEach((event, queue) -> {
LOGGER.debug("Start listening for events: {}", event);
queue.start();
});
}

@Override
public void stop() {
eventToQueueMap.forEach((event, queue) -> {
LOGGER.debug("Stop listening for events: {}", event);
queue.stop();
});
}

private void refresh() {
try {
Set<String> events = metadataService.getAllEventHandlers().stream()
.map(EventHandler::getEvent)
.collect(Collectors.toSet());

List<ObservableQueue> createdQueues = new LinkedList<>();
events.forEach(event -> eventToQueueMap.computeIfAbsent(event, s -> {
ObservableQueue q = eventQueues.getQueue(event);
createdQueues.add(q);
return q;
}
));

// start listening on all of the created queues
createdQueues.stream()
.filter(Objects::nonNull)
.peek(Lifecycle::start)
.forEach(this::listen);

} catch (Exception e) {
Monitors.error(CLASS_NAME, "refresh");
LOGGER.error("refresh event queues failed", e);
}
}
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("event-action-executor-thread-%d")
.build();
eventActionExecutorService = Executors
.newFixedThreadPool(properties.getEventProcessorThreadCount(), threadFactory);

private void listen(ObservableQueue queue) {
queue.observe().subscribe((Message msg) -> handle(queue, msg));
this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled();
LOGGER.info("Event Processing is ENABLED");
}

protected void handle(ObservableQueue queue, Message msg) {
public void handle(ObservableQueue queue, Message msg) {
try {
if (isEventMessageIndexingEnabled) {
executionService.addMessage(queue.getName(), msg);
Expand Down Expand Up @@ -264,7 +190,7 @@ protected CompletableFuture<List<EventExecution>> executeActionsForEventHandler(
if (executionService.addEventExecution(eventExecution)) {
futuresList.add(CompletableFuture
.supplyAsync(() -> execute(eventExecution, action, getPayloadObject(msg.getPayload())),
executorService));
eventActionExecutorService));
} else {
LOGGER.warn("Duplicate delivery/execution of message: {}", msg.getId());
}
Expand All @@ -279,7 +205,6 @@ protected CompletableFuture<List<EventExecution>> executeActionsForEventHandler(
* @return the event execution updated with execution output, if the execution is completed/failed with
* non-transient error the input event execution, if the execution failed due to transient error
*/
@VisibleForTesting
protected EventExecution execute(EventExecution eventExecution, Action action, Object payload) {
try {
String methodName = "executeEventAction";
Expand Down
Loading

0 comments on commit 5998fca

Please sign in to comment.