Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2141 from Netflix/observable_queue_lifecycle
Browse files Browse the repository at this point in the history
wire observable queues with lifecycle
  • Loading branch information
apanicker-nflx authored Mar 26, 2021
2 parents 1d54e1e + 5d4e29f commit 6474e2b
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
/**
* @author Ritu Parathody
*/
public class AMQPObservableQueue extends LifecycleAwareComponent implements ObservableQueue {
public class AMQPObservableQueue implements ObservableQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class);

Expand All @@ -67,6 +67,7 @@ public class AMQPObservableQueue extends LifecycleAwareComponent implements Obse
private Channel channel;
private final Address[] addresses;
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
private volatile boolean running;

public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange,
AMQPSettings settings, int batchSize, int pollTimeInMS) {
Expand Down Expand Up @@ -270,6 +271,23 @@ public void close() {
closeConnection();
}

@Override
public void start() {
LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), settings.getQueueOrExchangeName());
running = true;
}

@Override
public void stop() {
LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), settings.getQueueOrExchangeName());
running = false;
}

@Override
public boolean isRunning() {
return running;
}

public static class Builder {

private final Address[] addresses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
import com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.Builder;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Ritu Parathody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/**
* @author Oleksiy Lysak
*/
public abstract class NATSAbstractQueue extends LifecycleAwareComponent implements ObservableQueue {
public abstract class NATSAbstractQueue implements ObservableQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(NATSAbstractQueue.class);
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
Expand All @@ -50,6 +50,7 @@ public abstract class NATSAbstractQueue extends LifecycleAwareComponent implemen
// Indicates that observe was called (Event Handler) and we must to re-initiate subscription upon reconnection
private boolean observable;
private boolean isOpened;
private volatile boolean running;

NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) {
this.queueURI = queueURI;
Expand Down Expand Up @@ -247,6 +248,23 @@ void ensureConnected() {
}
}

@Override
public void start() {
LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), queueURI);
running = true;
}

@Override
public void stop() {
LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), queueURI);
running = false;
}

@Override
public boolean isRunning() {
return running;
}

abstract void connect();

abstract boolean isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.ConnectionFactory;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import rx.Scheduler;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author Oleksiy Lysak
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author Oleksiy Lysak
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Scheduler;

public class SQSObservableQueue extends LifecycleAwareComponent implements ObservableQueue {
public class SQSObservableQueue implements ObservableQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(SQSObservableQueue.class);
private static final String QUEUE_TYPE = "sqs";
Expand All @@ -67,6 +68,7 @@ public class SQSObservableQueue extends LifecycleAwareComponent implements Obser
private final long pollTimeInMS;
private final String queueURL;
private final Scheduler scheduler;
private volatile boolean running;

private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize,
long pollTimeInMS, List<String> accountsToAuthorize, Scheduler scheduler) {
Expand Down Expand Up @@ -143,6 +145,23 @@ public int getVisibilityTimeoutInSeconds() {
return visibilityTimeoutInSeconds;
}

@Override
public void start() {
LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), queueName);
running = true;
}

@Override
public void stop() {
LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), queueName);
running = false;
}

@Override
public boolean isRunning() {
return running;
}

public static class Builder {

private String queueName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
package com.netflix.conductor.contribs.queue.sqs.config;

import com.amazonaws.services.sqs.AmazonSQSClient;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;

public class SQSEventQueueProvider implements EventQueueProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(SQSEventQueueProvider.class);
private final Map<String, ObservableQueue> queues = new ConcurrentHashMap<>();
private final AmazonSQSClient client;
private final int batchSize;
Expand All @@ -44,15 +48,13 @@ public String getQueueType() {

@Override
public ObservableQueue getQueue(String queueURI) {
return queues.computeIfAbsent(queueURI, q -> {
Builder builder = new Builder();
return builder.withBatchSize(this.batchSize)
.withClient(client)
.withPollTimeInMS(this.pollTimeInMS)
.withQueueName(queueURI)
.withVisibilityTimeout(this.visibilityTimeoutInSeconds)
.withScheduler(scheduler)
.build();
});
return queues.computeIfAbsent(queueURI, q -> new Builder()
.withBatchSize(this.batchSize)
.withClient(client)
.withPollTimeInMS(this.pollTimeInMS)
.withQueueName(queueURI)
.withVisibilityTimeout(this.visibilityTimeoutInSeconds)
.withScheduler(scheduler)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.context.SmartLifecycle;

public abstract class LifecycleAwareComponent implements SmartLifecycle {

private volatile boolean running = false;

private static final Logger LOGGER = LoggerFactory.getLogger(LifecycleAwareComponent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.events.EventHandler.Action;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
Expand All @@ -28,12 +29,6 @@
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.spotify.futures.CompletableFutures;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -48,6 +43,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.Lifecycle;
import org.springframework.stereotype.Component;

/**
* Event Processor is used to dispatch actions based on the incoming events to execution queue.
Expand All @@ -56,7 +57,7 @@
*/
@Component
@ConditionalOnProperty(name = "conductor.default-event-processor.enabled", havingValue = "true", matchIfMissing = true)
public class SimpleEventProcessor implements EventProcessor {
public class SimpleEventProcessor extends LifecycleAwareComponent implements EventProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventProcessor.class);
private static final String CLASS_NAME = SimpleEventProcessor.class.getSimpleName();
Expand Down Expand Up @@ -85,16 +86,14 @@ public SimpleEventProcessor(ExecutionService executionService, MetadataService m

this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled();
int executorThreadCount = properties.getEventProcessorThreadCount();
if (executorThreadCount > 0) {
executorService = Executors.newFixedThreadPool(executorThreadCount);
refresh();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS);
LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount);
} else {
LOGGER.warn("workflow.event.processor.thread.count={} must be greater than 0. " +
"To disable event processing, set conductor.default-event-processor.enabled=false", executorThreadCount);
throw new IllegalStateException("workflow.event.processor.thread.count must be greater than 0");
if (executorThreadCount <= 0) {
throw new IllegalStateException("Cannot set event processor thread count to <=0. To disable event "
+ "processing, set conductor.default-event-processor.enabled=false.");
}
executorService = Executors.newFixedThreadPool(executorThreadCount);
refresh();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS);
LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount);
}

/**
Expand All @@ -116,6 +115,22 @@ public Map<String, Map<String, Long>> getQueueSizes() {
return queues;
}

@Override
public void start() {
eventToQueueMap.forEach((event, queue) -> {
LOGGER.debug("Start listening for events: {}", event);
queue.start();
});
}

@Override
public void stop() {
eventToQueueMap.forEach((event, queue) -> {
LOGGER.debug("Stop listening for events: {}", event);
queue.stop();
});
}

private void refresh() {
try {
Set<String> events = metadataService.getAllEventHandlers().stream()
Expand All @@ -133,6 +148,7 @@ private void refresh() {
// start listening on all of the created queues
createdQueues.stream()
.filter(Objects::nonNull)
.peek(Lifecycle::start)
.forEach(this::listen);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.netflix.conductor.dao.QueueDAO;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import rx.Scheduler;
Expand All @@ -29,10 +31,12 @@
*
* @see ConductorObservableQueue
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
@ConditionalOnProperty(name = "conductor.event-queues.default.enabled", havingValue = "true", matchIfMissing = true)
public class ConductorEventQueueProvider implements EventQueueProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(ConductorEventQueueProvider.class);
private final Map<String, ObservableQueue> queues = new ConcurrentHashMap<>();
private final QueueDAO queueDAO;
private final ConductorProperties properties;
Expand Down
Loading

0 comments on commit 6474e2b

Please sign in to comment.