Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
added nats stream support for WAIT task queues (#2610)
Browse files Browse the repository at this point in the history
* added nats stream support for WAIT task queues

* added description for nats_stream into additional-spring-configuration-metadata.json.
removed match_if_missing=true for nats_stream queue type.

Co-authored-by: astelmashenko <[email protected]>
  • Loading branch information
astelmashenko and astelmashenko authored Dec 15, 2021
1 parent 76d3b73 commit 206d913
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@
*/
package com.netflix.conductor.contribs.queue.nats.config;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rx.Scheduler;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableConfigurationProperties(NATSStreamProperties.class)
@ConditionalOnProperty(name = "conductor.event-queues.nats-stream.enabled", havingValue = "true")
Expand All @@ -28,4 +36,33 @@ public class NATSStreamConfiguration {
public EventQueueProvider natsEventQueueProvider(NATSStreamProperties properties, Scheduler scheduler) {
return new NATSStreamEventQueueProvider(properties, scheduler);
}

@ConditionalOnProperty(name = "conductor.default-event-queue.type", havingValue = "nats_stream")
@Bean
public Map<Task.Status, ObservableQueue> getQueues(ConductorProperties conductorProperties,
NATSStreamProperties properties, Scheduler scheduler) {
String stack = "";
if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
stack = conductorProperties.getStack() + "_";
}
Task.Status[] statuses = new Task.Status[]{ Task.Status.COMPLETED, Task.Status.FAILED};
Map<Task.Status, ObservableQueue> queues = new HashMap<>();
for (Task.Status status : statuses) {
String queuePrefix = StringUtils.isBlank(properties.getListenerQueuePrefix())
? conductorProperties.getAppId() + "_nats_stream_notify_" + stack
: properties.getListenerQueuePrefix();

String queueName = queuePrefix + status.name();

ObservableQueue queue = new NATSStreamObservableQueue(
properties.getClusterId(),
properties.getUrl(),
properties.getDurableName(),
queueName,
scheduler);
queues.put(status, queue);
}

return queues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class NATSStreamProperties {
*/
private String url = Nats.DEFAULT_URL;

/**
* The prefix to be used for the default listener queues
*/
private String listenerQueuePrefix = "";

public String getClusterId() {
return clusterId;
}
Expand All @@ -56,4 +61,12 @@ public String getUrl() {
public void setUrl(String url) {
this.url = url;
}

public String getListenerQueuePrefix() {
return listenerQueuePrefix;
}

public void setListenerQueuePrefix(String listenerQueuePrefix) {
this.listenerQueuePrefix = listenerQueuePrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@
{
"value": "amqp",
"description": "Use RabbitMQ as the event queue to listen on for the WAIT task."
},
{
"value": "nats_stream",
"description": "Use NATS Stream as the event queue to listen on for the WAIT task."
}
]
},
Expand Down

0 comments on commit 206d913

Please sign in to comment.