diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java index 5affe7df29..6559ae86bd 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java @@ -12,13 +12,21 @@ */ package com.netflix.conductor.contribs.queue.nats.config; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue; +import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import rx.Scheduler; +import java.util.HashMap; +import java.util.Map; + @Configuration @EnableConfigurationProperties(NATSStreamProperties.class) @ConditionalOnProperty(name = "conductor.event-queues.nats-stream.enabled", havingValue = "true") @@ -28,4 +36,33 @@ public class NATSStreamConfiguration { public EventQueueProvider natsEventQueueProvider(NATSStreamProperties properties, Scheduler scheduler) { return new NATSStreamEventQueueProvider(properties, scheduler); } + + @ConditionalOnProperty(name = "conductor.default-event-queue.type", havingValue = "nats_stream") + @Bean + public Map getQueues(ConductorProperties conductorProperties, + NATSStreamProperties properties, Scheduler scheduler) { + String stack = ""; + if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) { + stack = conductorProperties.getStack() + "_"; + } + Task.Status[] statuses = new Task.Status[]{ Task.Status.COMPLETED, Task.Status.FAILED}; + Map queues = new HashMap<>(); + for (Task.Status status : statuses) { + String queuePrefix = StringUtils.isBlank(properties.getListenerQueuePrefix()) + ? conductorProperties.getAppId() + "_nats_stream_notify_" + stack + : properties.getListenerQueuePrefix(); + + String queueName = queuePrefix + status.name(); + + ObservableQueue queue = new NATSStreamObservableQueue( + properties.getClusterId(), + properties.getUrl(), + properties.getDurableName(), + queueName, + scheduler); + queues.put(status, queue); + } + + return queues; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java index f7c8ba8a33..d0b99b0ba7 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java @@ -33,6 +33,11 @@ public class NATSStreamProperties { */ private String url = Nats.DEFAULT_URL; + /** + * The prefix to be used for the default listener queues + */ + private String listenerQueuePrefix = ""; + public String getClusterId() { return clusterId; } @@ -56,4 +61,12 @@ public String getUrl() { public void setUrl(String url) { this.url = url; } + + public String getListenerQueuePrefix() { + return listenerQueuePrefix; + } + + public void setListenerQueuePrefix(String listenerQueuePrefix) { + this.listenerQueuePrefix = listenerQueuePrefix; + } } diff --git a/contribs/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/contribs/src/main/resources/META-INF/additional-spring-configuration-metadata.json index a9bbb827ba..3156ecbef3 100644 --- a/contribs/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/contribs/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -104,6 +104,10 @@ { "value": "amqp", "description": "Use RabbitMQ as the event queue to listen on for the WAIT task." + }, + { + "value": "nats_stream", + "description": "Use NATS Stream as the event queue to listen on for the WAIT task." } ] },