From 5d4e29f2c6fc9e3c31f698681993aae580571053 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 25 Mar 2021 15:52:29 -0700 Subject: [PATCH] wire observable queues with lifecycle; prevent implicit type conversion when expanding json --- .../queue/amqp/AMQPObservableQueue.java | 20 +++++++- .../amqp/config/AMQPEventQueueProvider.java | 5 +- .../queue/nats/NATSAbstractQueue.java | 20 +++++++- .../nats/config/NATSEventQueueProvider.java | 7 ++- .../config/NATSStreamEventQueueProvider.java | 5 +- .../queue/sqs/SQSObservableQueue.java | 21 +++++++- .../sqs/config/SQSEventQueueProvider.java | 22 +++++---- .../core/LifecycleAwareComponent.java | 1 + .../core/events/SimpleEventProcessor.java | 48 ++++++++++++------- .../queue/ConductorEventQueueProvider.java | 4 ++ .../queue/ConductorObservableQueue.java | 32 +++++++++---- .../core/events/queue/ObservableQueue.java | 7 ++- .../conductor/core/utils/JsonUtils.java | 17 +++++-- .../core/events/MockObservableQueue.java | 15 ++++++ .../conductor/core/utils/JsonUtilsTest.java | 17 +++++++ .../conductor/grpc/AbstractProtoMapper.java | 6 ++- 16 files changed, 189 insertions(+), 58 deletions(-) diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 8a872a4a37..3af9797ecd 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -52,7 +52,7 @@ /** * @author Ritu Parathody */ -public class AMQPObservableQueue extends LifecycleAwareComponent implements ObservableQueue { +public class AMQPObservableQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class); @@ -67,6 +67,7 @@ public class AMQPObservableQueue extends LifecycleAwareComponent implements Obse private Channel channel; private final Address[] addresses; protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); + private volatile boolean running; public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange, AMQPSettings settings, int batchSize, int pollTimeInMS) { @@ -270,6 +271,23 @@ public void close() { closeConnection(); } + @Override + public void start() { + LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), settings.getQueueOrExchangeName()); + running = true; + } + + @Override + public void stop() { + LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), settings.getQueueOrExchangeName()); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + public static class Builder { private final Address[] addresses; diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProvider.java index d442534cfd..89b8bcf173 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProvider.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProvider.java @@ -16,11 +16,10 @@ import com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.Builder; import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Ritu Parathody diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java index 3e1175e741..507f4b48da 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java @@ -34,7 +34,7 @@ /** * @author Oleksiy Lysak */ -public abstract class NATSAbstractQueue extends LifecycleAwareComponent implements ObservableQueue { +public abstract class NATSAbstractQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(NATSAbstractQueue.class); protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); @@ -50,6 +50,7 @@ public abstract class NATSAbstractQueue extends LifecycleAwareComponent implemen // Indicates that observe was called (Event Handler) and we must to re-initiate subscription upon reconnection private boolean observable; private boolean isOpened; + private volatile boolean running; NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) { this.queueURI = queueURI; @@ -247,6 +248,23 @@ void ensureConnected() { } } + @Override + public void start() { + LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), queueURI); + running = true; + } + + @Override + public void stop() { + LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), queueURI); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + abstract void connect(); abstract boolean isConnected(); diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java index 226499a750..ed0f7e4e1d 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java @@ -16,15 +16,14 @@ import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; import io.nats.client.ConnectionFactory; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.env.Environment; import rx.Scheduler; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - /** * @author Oleksiy Lysak */ diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamEventQueueProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamEventQueueProvider.java index a10b253324..83e088d6d7 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamEventQueueProvider.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamEventQueueProvider.java @@ -15,13 +15,12 @@ import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue; import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Scheduler; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * @author Oleksiy Lysak */ diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java index 9daf9bdb31..f6666a9a8f 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ import rx.Observable.OnSubscribe; import rx.Scheduler; -public class SQSObservableQueue extends LifecycleAwareComponent implements ObservableQueue { +public class SQSObservableQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(SQSObservableQueue.class); private static final String QUEUE_TYPE = "sqs"; @@ -67,6 +68,7 @@ public class SQSObservableQueue extends LifecycleAwareComponent implements Obser private final long pollTimeInMS; private final String queueURL; private final Scheduler scheduler; + private volatile boolean running; private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize, long pollTimeInMS, List accountsToAuthorize, Scheduler scheduler) { @@ -143,6 +145,23 @@ public int getVisibilityTimeoutInSeconds() { return visibilityTimeoutInSeconds; } + @Override + public void start() { + LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), queueName); + running = true; + } + + @Override + public void stop() { + LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), queueName); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + public static class Builder { private String queueName; diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueProvider.java index 51557b6f35..d2d73bf119 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueProvider.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueProvider.java @@ -13,15 +13,19 @@ package com.netflix.conductor.contribs.queue.sqs.config; import com.amazonaws.services.sqs.AmazonSQSClient; +import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue; import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder; import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; public class SQSEventQueueProvider implements EventQueueProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(SQSEventQueueProvider.class); private final Map queues = new ConcurrentHashMap<>(); private final AmazonSQSClient client; private final int batchSize; @@ -44,15 +48,13 @@ public String getQueueType() { @Override public ObservableQueue getQueue(String queueURI) { - return queues.computeIfAbsent(queueURI, q -> { - Builder builder = new Builder(); - return builder.withBatchSize(this.batchSize) - .withClient(client) - .withPollTimeInMS(this.pollTimeInMS) - .withQueueName(queueURI) - .withVisibilityTimeout(this.visibilityTimeoutInSeconds) - .withScheduler(scheduler) - .build(); - }); + return queues.computeIfAbsent(queueURI, q -> new Builder() + .withBatchSize(this.batchSize) + .withClient(client) + .withPollTimeInMS(this.pollTimeInMS) + .withQueueName(queueURI) + .withVisibilityTimeout(this.visibilityTimeoutInSeconds) + .withScheduler(scheduler) + .build()); } } diff --git a/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java index f2b4f4a6cf..eba224a3b5 100644 --- a/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java +++ b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java @@ -19,6 +19,7 @@ import org.springframework.context.SmartLifecycle; public abstract class LifecycleAwareComponent implements SmartLifecycle { + private volatile boolean running = false; private static final Logger LOGGER = LoggerFactory.getLogger(LifecycleAwareComponent.class); diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java index c9b640e237..616d47b97e 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java @@ -19,6 +19,7 @@ import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.utils.RetryUtil; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; @@ -28,12 +29,6 @@ import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; import com.spotify.futures.CompletableFutures; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,6 +43,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.Lifecycle; +import org.springframework.stereotype.Component; /** * Event Processor is used to dispatch actions based on the incoming events to execution queue. @@ -56,7 +57,7 @@ */ @Component @ConditionalOnProperty(name = "conductor.default-event-processor.enabled", havingValue = "true", matchIfMissing = true) -public class SimpleEventProcessor implements EventProcessor { +public class SimpleEventProcessor extends LifecycleAwareComponent implements EventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventProcessor.class); private static final String CLASS_NAME = SimpleEventProcessor.class.getSimpleName(); @@ -85,16 +86,14 @@ public SimpleEventProcessor(ExecutionService executionService, MetadataService m this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled(); int executorThreadCount = properties.getEventProcessorThreadCount(); - if (executorThreadCount > 0) { - executorService = Executors.newFixedThreadPool(executorThreadCount); - refresh(); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS); - LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount); - } else { - LOGGER.warn("workflow.event.processor.thread.count={} must be greater than 0. " + - "To disable event processing, set conductor.default-event-processor.enabled=false", executorThreadCount); - throw new IllegalStateException("workflow.event.processor.thread.count must be greater than 0"); + if (executorThreadCount <= 0) { + throw new IllegalStateException("Cannot set event processor thread count to <=0. To disable event " + + "processing, set conductor.default-event-processor.enabled=false."); } + executorService = Executors.newFixedThreadPool(executorThreadCount); + refresh(); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS); + LOGGER.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount); } /** @@ -116,6 +115,22 @@ public Map> getQueueSizes() { return queues; } + @Override + public void start() { + eventToQueueMap.forEach((event, queue) -> { + LOGGER.debug("Start listening for events: {}", event); + queue.start(); + }); + } + + @Override + public void stop() { + eventToQueueMap.forEach((event, queue) -> { + LOGGER.debug("Stop listening for events: {}", event); + queue.stop(); + }); + } + private void refresh() { try { Set events = metadataService.getAllEventHandlers().stream() @@ -133,6 +148,7 @@ private void refresh() { // start listening on all of the created queues createdQueues.stream() .filter(Objects::nonNull) + .peek(Lifecycle::start) .forEach(this::listen); } catch (Exception e) { diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorEventQueueProvider.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorEventQueueProvider.java index 31c888d336..8afb910c0b 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorEventQueueProvider.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorEventQueueProvider.java @@ -17,6 +17,8 @@ import com.netflix.conductor.dao.QueueDAO; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import rx.Scheduler; @@ -29,10 +31,12 @@ * * @see ConductorObservableQueue */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @ConditionalOnProperty(name = "conductor.event-queues.default.enabled", havingValue = "true", matchIfMissing = true) public class ConductorEventQueueProvider implements EventQueueProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(ConductorEventQueueProvider.class); private final Map queues = new ConcurrentHashMap<>(); private final QueueDAO queueDAO; private final ConductorProperties properties; diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java index 950470863e..a8d1eccc29 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java @@ -12,26 +12,24 @@ */ package com.netflix.conductor.core.events.queue; -import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Scheduler; - import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Scheduler; /** * An {@link ObservableQueue} implementation using the underlying {@link QueueDAO} implementation. */ -public class ConductorObservableQueue extends LifecycleAwareComponent implements ObservableQueue { +public class ConductorObservableQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(ConductorObservableQueue.class); @@ -43,6 +41,7 @@ public class ConductorObservableQueue extends LifecycleAwareComponent implements private final int longPollTimeout; private final int pollCount; private final Scheduler scheduler; + private volatile boolean running; ConductorObservableQueue(String queueName, QueueDAO queueDAO, ConductorProperties properties, Scheduler scheduler) { this.queueName = queueName; @@ -122,4 +121,21 @@ private OnSubscribe getOnSubscribe() { }).subscribe(subscriber::onNext, subscriber::onError); }; } + + @Override + public void start() { + LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), queueName); + running = true; + } + + @Override + public void stop() { + LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), queueName); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java index 707f85cee2..c58d4c64ba 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java @@ -12,12 +12,11 @@ */ package com.netflix.conductor.core.events.queue; -import org.springframework.context.SmartLifecycle; -import rx.Observable; - import java.util.List; +import org.springframework.context.Lifecycle; +import rx.Observable; -public interface ObservableQueue { +public interface ObservableQueue extends Lifecycle { /** * @return An observable for the given queue diff --git a/core/src/main/java/com/netflix/conductor/core/utils/JsonUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/JsonUtils.java index 1ce15ee9ef..cf7ddc1132 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/JsonUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/JsonUtils.java @@ -13,10 +13,9 @@ package com.netflix.conductor.core.utils; import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.stereotype.Component; - import java.util.List; import java.util.Map; +import org.springframework.stereotype.Component; /** * This class contains utility functions for parsing/expanding JSON. @@ -54,7 +53,9 @@ public Object expand(Object input) { private void expandList(List input) { for (Object value : input) { if (value instanceof String) { - value = getJson(value.toString()); + if (isJsonString(value.toString())) { + value = getJson(value.toString()); + } } else if (value instanceof Map) { expandMap((Map) value); } else if (value instanceof List) { @@ -67,8 +68,9 @@ private void expandMap(Map input) { for (Map.Entry entry : input.entrySet()) { Object value = entry.getValue(); if (value instanceof String) { - value = getJson(value.toString()); - entry.setValue(value); + if (isJsonString(value.toString())) { + entry.setValue(getJson(value.toString())); + } } else if (value instanceof Map) { expandMap((Map) value); } else if (value instanceof List) { @@ -91,4 +93,9 @@ private Object getJson(String jsonAsString) { return jsonAsString; } } + + private boolean isJsonString(String jsonAsString) { + jsonAsString = jsonAsString.trim(); + return jsonAsString.startsWith("{") || jsonAsString.startsWith("["); + } } diff --git a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java index 3cf3f0ea16..b3a9dc93df 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java +++ b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java @@ -77,4 +77,19 @@ public long size() { public String toString() { return "MockObservableQueue [uri=" + uri + ", name=" + name + ", type=" + type + "]"; } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public boolean isRunning() { + return false; + } } diff --git a/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java b/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java index c3e822c803..405e157261 100644 --- a/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java +++ b/core/src/test/java/com/netflix/conductor/core/utils/JsonUtilsTest.java @@ -102,4 +102,21 @@ public void testMultiLevelMap() { Object jsonObject = jsonUtils.expand(parentMap); assertNotNull(jsonObject); } + + // This test verifies that the types of the elements in the input are maintained upon expanding the JSON object + @Test + public void testTypes() throws Exception { + String map = "{\"requestId\":\"1375128656908832001\",\"workflowId\":\"fc147e1d-5408-4d41-b066-53cb2e551d0e\"," + + "\"inner\":{\"num\":42,\"status\":\"READY\"}}"; + jsonUtils.expand(map); + + Object jsonObject = jsonUtils.expand(map); + assertNotNull(jsonObject); + assertTrue(jsonObject instanceof LinkedHashMap); + assertTrue(((LinkedHashMap) jsonObject).get("requestId") instanceof String); + assertTrue(((LinkedHashMap) jsonObject).get("workflowId") instanceof String); + assertTrue(((LinkedHashMap) jsonObject).get("inner") instanceof LinkedHashMap); + assertTrue(((LinkedHashMap) ((LinkedHashMap) jsonObject).get("inner")).get("num") instanceof Integer); + assertTrue(((LinkedHashMap) ((LinkedHashMap) jsonObject).get("inner")).get("status") instanceof String); + } } diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 380e885a98..04067cd70c 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -38,14 +38,16 @@ import com.netflix.conductor.proto.WorkflowPb; import com.netflix.conductor.proto.WorkflowSummaryPb; import com.netflix.conductor.proto.WorkflowTaskPb; - -import javax.annotation.Generated; +import java.lang.IllegalArgumentException; +import java.lang.Object; +import java.lang.String; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Generated; @Generated("com.github.vmg.protogen.ProtoGen") public abstract class AbstractProtoMapper {