From 4b1162a80037e91cd0ffae5db3549e686c762d2a Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 4 Mar 2021 14:33:48 -0800 Subject: [PATCH 1/4] disable background processes when instance is disabled --- CHANGELOG.md | 2 +- buildViaTravis.sh | 6 +- .../queue/amqp/AMQPObservableQueue.java | 57 ++++++++++---- .../queue/nats/NATSAbstractQueue.java | 58 +++++++++----- .../queue/sqs/SQSObservableQueue.java | 25 +++++- .../contribs/queue/sqs/QueueManagerTest.java | 2 +- .../queue/sqs/SQSObservableQueueTest.java | 4 +- .../core/config/ConductorProperties.java | 13 ---- .../queue/ConductorObservableQueue.java | 35 +++++++-- .../core/events/queue/ObservableQueue.java | 5 +- .../core/execution/WorkflowExecutor.java | 3 +- .../core/execution/WorkflowSweeper.java | 78 +++++++++++-------- .../tasks/SystemTaskWorkerCoordinator.java | 22 +++++- .../core/events/MockObservableQueue.java | 16 +++- .../rest/controllers/QueueAdminResource.java | 5 +- .../rest/controllers/TaskResource.java | 2 +- .../rest/controllers/WorkflowResource.java | 4 +- .../application-integrationtest.properties | 2 +- 18 files changed, 231 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c26497b42..a5553928fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,7 +120,6 @@ Modified properties in the `core` module: | APP_ID | conductor.app.appId | conductor | | workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 | | decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s | -| decider.sweep.disable | conductor.app.sweepDisabled | false | | workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 | | workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 | | workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true | @@ -273,6 +272,7 @@ Modified properties that are used for configuring components: | --- | --- | --- | | db | conductor.db.type | "" | | workflow.indexing.enabled | conductor.indexing.enabled | true | +| decider.sweep.disable | conductor.workflow-sweeper.enabled | true | | conductor.grpc.server.enabled | conductor.grpc-server.enabled | false | | workflow.external.payload.storage | conductor.external-payload-storage.type | dummy | | workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true | diff --git a/buildViaTravis.sh b/buildViaTravis.sh index 3972967a76..6951163cc5 100755 --- a/buildViaTravis.sh +++ b/buildViaTravis.sh @@ -5,15 +5,15 @@ if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then ./gradlew build coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']' - ./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']' case "$TRAVIS_TAG" in *-rc\.*) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls ;; *) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls ;; esac else diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 58b09096f2..3bb2d32916 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -63,6 +65,7 @@ public class AMQPObservableQueue implements ObservableQueue { private Channel channel; private final Address[] addresses; protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); + private final AtomicBoolean running = new AtomicBoolean(); public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange, AMQPSettings settings, int batchSize, int pollTimeInMS) { @@ -128,24 +131,29 @@ public Observable observe() { Observable.OnSubscribe onSubscribe = subscriber -> { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach(msg -> { - buffer.append(msg.getId()).append("=").append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(), - buffer.toString())); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from RabbitMQ"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach(msg -> { + buffer.append(msg.getId()).append("=").append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() < available.size()) { + buffer.append(","); + } + }); + LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(), + buffer.toString())); + } + return Observable.from(available); } - return Observable.from(available); }).subscribe(subscriber::onNext, subscriber::onError); }; return Observable.create(onSubscribe); @@ -261,6 +269,21 @@ public void close() { closeConnection(); } + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } + public static class Builder { private final Address[] addresses; diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java index 5c1f404efa..1e2ad76814 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java @@ -15,6 +15,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import io.nats.client.NUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -51,6 +52,8 @@ public abstract class NATSAbstractQueue implements ObservableQueue { private boolean observable; private boolean isOpened; + private final AtomicBoolean running = new AtomicBoolean(); + NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) { this.queueURI = queueURI; this.queueType = queueType; @@ -93,25 +96,29 @@ public Observable observe() { Observable.OnSubscribe onSubscribe = subscriber -> { Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach(msg -> { - buffer.append(msg.getId()).append("=").append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - - LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString())); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from NATS Queue"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach(msg -> { + buffer.append(msg.getId()).append("=").append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() < available.size()) { + buffer.append(","); + } + }); + LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString())); + } + + return Observable.from(available); } - - return Observable.from(available); }).subscribe(subscriber::onNext, subscriber::onError); }; return Observable.create(onSubscribe); @@ -254,4 +261,19 @@ void ensureConnected() { abstract void closeSubs(); abstract void closeConn(); + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java index 61d4ba57e0..5360e11ccf 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java @@ -40,6 +40,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.metrics.Monitors; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -67,6 +68,7 @@ public class SQSObservableQueue implements ObservableQueue { private final long pollTimeInMS; private final String queueURL; private final Scheduler scheduler; + private final AtomicBoolean running = new AtomicBoolean(); private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize, long pollTimeInMS, List accountsToAuthorize, Scheduler scheduler) { @@ -143,6 +145,21 @@ public int getVisibilityTimeoutInSeconds() { return visibilityTimeoutInSeconds; } + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } + public static class Builder { private String queueName; @@ -296,8 +313,12 @@ OnSubscribe getOnSubscribe() { return subscriber -> { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { - List msgs = receiveMessages(); - return Observable.from(msgs); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from SQS"); + return Observable.from(Collections.emptyList()); + } + List messages = receiveMessages(); + return Observable.from(messages); }).subscribe(subscriber::onNext, subscriber::onError); }; } diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java index 815cd66a59..b1764bbc6e 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java @@ -75,6 +75,7 @@ public static void setup() { queue = mock(SQSObservableQueue.class); when(queue.getOrCreateQueue()).thenReturn("junit_queue_url"); + when(queue.isRunning()).thenReturn(true); Answer answer = (Answer>) invocation -> { List copy = new LinkedList<>(messages); messages.clear(); @@ -122,7 +123,6 @@ public static void setup() { }).when(executionService).updateTask(any(Task.class)); } - @Test public void test() throws Exception { queueManager.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java index 98e7fc0bf3..1e43e68e65 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,6 +48,7 @@ public void test() { when(queue.getOrCreateQueue()).thenReturn("junit_queue_url"); Answer answer = (Answer>) invocation -> Collections.emptyList(); when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer); + when(queue.isRunning()).thenReturn(true); when(queue.getOnSubscribe()).thenCallRealMethod(); when(queue.observe()).thenCallRealMethod(); @@ -80,6 +82,7 @@ public void testException() { SQSObservableQueue queue = new SQSObservableQueue.Builder() .withQueueName("junit") .withClient(client).build(); + queue.start(); List found = new LinkedList<>(); Observable observable = queue.observe(); @@ -87,7 +90,6 @@ public void testException() { observable.subscribe(found::add); Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); - assertEquals(1, found.size()); } } diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index af67c50615..f98427abb4 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -47,11 +47,6 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration sweepFrequency = Duration.ofSeconds(30); - /** - * Used to enable/disable the workflow sweeper. - */ - private boolean sweepDisabled = false; - /** * The number of threads to configure the threadpool in the workflow sweeper. */ @@ -289,14 +284,6 @@ public void setSweepFrequency(Duration sweepFrequency) { this.sweepFrequency = sweepFrequency; } - public boolean isSweepDisabled() { - return sweepDisabled; - } - - public void setSweepDisabled(boolean sweepDisabled) { - this.sweepDisabled = sweepDisabled; - } - public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java index fb22613c2d..60e96a4fa5 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java @@ -15,17 +15,18 @@ import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - /** * An {@link ObservableQueue} implementation using the underlying {@link QueueDAO} implementation. */ @@ -41,6 +42,7 @@ public class ConductorObservableQueue implements ObservableQueue { private final int longPollTimeout; private final int pollCount; private final Scheduler scheduler; + private final AtomicBoolean running = new AtomicBoolean(); ConductorObservableQueue(String queueName, QueueDAO queueDAO, ConductorProperties properties, Scheduler scheduler) { this.queueName = queueName; @@ -111,9 +113,28 @@ private OnSubscribe getOnSubscribe() { return subscriber -> { Observable interval = Observable.interval(pollTimeMS, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { - List msgs = receiveMessages(); - return Observable.from(msgs); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from Conductor Queue"); + return Observable.from(Collections.emptyList()); + } + List messages = receiveMessages(); + return Observable.from(messages); }).subscribe(subscriber::onNext, subscriber::onError); }; } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java index 4ab7174571..faa4082826 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java @@ -12,11 +12,12 @@ */ package com.netflix.conductor.core.events.queue; +import org.springframework.context.SmartLifecycle; import rx.Observable; import java.util.List; -public interface ObservableQueue { +public interface ObservableQueue extends SmartLifecycle { /** * @return An observable for the given queue @@ -39,7 +40,7 @@ public interface ObservableQueue { String getURI(); /** - * @param messages messages to be ack'ed + * @param messages to be ack'ed * @return the id of the ones which could not be ack'ed */ List ack(List messages); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 51d7d1eaf7..86387ec4a8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1695,6 +1695,8 @@ private boolean rerunWF(String workflowId, String taskId, Map ta rerunFromTask.setUpdateTime(0); rerunFromTask.setEndTime(0); rerunFromTask.setOutputData(null); + rerunFromTask.setRetried(false); + rerunFromTask.setExecuted(false); rerunFromTask.setExternalOutputPayloadStoragePath(null); if (rerunFromTask.getTaskType().equalsIgnoreCase(SubWorkflow.NAME)) { // if task is sub workflow set task as IN_PROGRESS and reset start time @@ -1708,7 +1710,6 @@ private boolean rerunWF(String workflowId, String taskId, Map ta } addTaskToQueue(rerunFromTask); } - rerunFromTask.setExecuted(false); executionDAOFacade.updateTask(rerunFromTask); decide(workflowId); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java index 1f5ac18eb8..c42c037598 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java @@ -17,11 +17,6 @@ import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -29,17 +24,26 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") -@Service -public class WorkflowSweeper { +@Component +@ConditionalOnProperty(name = "conductor.workflow-sweeper.enabled", havingValue = "true", matchIfMissing = true) +public class WorkflowSweeper implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowSweeper.class); - private ExecutorService executorService; + private final ExecutorService executorService; private final ConductorProperties properties; private final QueueDAO queueDAO; private final int executorThreadPoolSize; + private final AtomicBoolean running = new AtomicBoolean(); private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); @@ -48,36 +52,31 @@ public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService ConductorProperties properties, QueueDAO queueDAO) { this.properties = properties; this.queueDAO = queueDAO; - this.executorThreadPoolSize = properties.getSweeperThreadCount(); - if (this.executorThreadPoolSize > 0) { - this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize); - init(workflowExecutor, workflowRepairService); - LOGGER.info("Workflow Sweeper Initialized"); - } else { - LOGGER.warn("Workflow sweeper is DISABLED"); - } + this.executorThreadPoolSize = properties.getSweeperThreadCount() > 0 ? properties.getSweeperThreadCount() : 1; + this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize); + init(workflowExecutor, workflowRepairService); + LOGGER.info("Workflow Sweeper Initialized"); } public void init(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) { ScheduledExecutorService deciderPool = Executors.newScheduledThreadPool(1); deciderPool.scheduleWithFixedDelay(() -> { - try { - boolean disable = properties.isSweepDisabled(); - if (disable) { - LOGGER.info("Workflow sweep is disabled."); - return; + if (!running.get()) { + LOGGER.debug("Instance disabled, skip workflow sweep"); + } else { + try { + int currentQueueSize = queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE); + LOGGER.debug("Sweeper's current decider queue size: {}", currentQueueSize); + List workflowIds = queueDAO + .pop(WorkflowExecutor.DECIDER_QUEUE, 2 * executorThreadPoolSize, 2000); + if (workflowIds != null) { + LOGGER.debug("Sweeper retrieved {} workflows from the decider queue", workflowIds.size()); + sweep(workflowIds, workflowExecutor, workflowRepairService); + } + } catch (Exception e) { + Monitors.error(CLASS_NAME, "sweep"); + LOGGER.error("Error when sweeping workflow", e); } - List workflowIds = queueDAO - .pop(WorkflowExecutor.DECIDER_QUEUE, 2 * executorThreadPoolSize, 2000); - int currentQueueSize = queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE); - LOGGER.debug("Sweeper's current deciderqueue size: {}.", currentQueueSize); - int retrievedWorkflows = (workflowIds != null) ? workflowIds.size() : 0; - LOGGER.debug("Sweeper retrieved {} workflows from the decider queue.", retrievedWorkflows); - - sweep(workflowIds, workflowExecutor, workflowRepairService); - } catch (Exception e) { - Monitors.error(CLASS_NAME, "sweep"); - LOGGER.error("Error when sweeping workflow", e); } }, 500, 500, TimeUnit.MILLISECONDS); } @@ -130,4 +129,19 @@ public void sweep(List workflowIds, WorkflowExecutor workflowExecutor, future.get(); } } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index 85e13da915..27d72f05f9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -19,10 +19,12 @@ import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.SmartLifecycle; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -39,7 +41,7 @@ @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component -public class SystemTaskWorkerCoordinator { +public class SystemTaskWorkerCoordinator implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class); @@ -59,6 +61,7 @@ public class SystemTaskWorkerCoordinator { private final QueueDAO queueDAO; private final WorkflowExecutor workflowExecutor; private final ExecutionService executionService; + private final AtomicBoolean running = new AtomicBoolean(); public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, ConductorProperties properties, @@ -115,7 +118,7 @@ private void listen(String queueName) { } private void pollAndExecute(String queueName) { - if (properties.isSystemTaskWorkersDisabled()) { + if (properties.isSystemTaskWorkersDisabled() || !running.get()) { LOGGER.warn("System Task Worker is DISABLED. Not polling for system task in queue : {}", queueName); return; } @@ -142,4 +145,19 @@ boolean isAsyncSystemTask(String queue) { } return false; } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java index a3e4b926ba..490a3b26a7 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java +++ b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java @@ -14,13 +14,12 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import rx.Observable; - import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import rx.Observable; public class MockObservableQueue implements ObservableQueue { @@ -78,4 +77,17 @@ public long size() { public String toString() { return "MockObservableQueue [uri=" + uri + ", name=" + name + ", type=" + type + "]"; } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public boolean isRunning() { + return false; + } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java index 824fcefd12..bc52773c40 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java @@ -21,6 +21,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -49,14 +50,14 @@ public Map names() { @Operation(summary = "Publish a message in queue to mark a wait task as completed.") @PostMapping(value = "/update/{workflowId}/{taskRefName}/{status}") public void update(@PathVariable("workflowId") String workflowId, @PathVariable("taskRefName") String taskRefName, - @PathVariable("status") Status status, Map output) throws Exception { + @PathVariable("status") Status status, @RequestBody Map output) throws Exception { queueManager.updateByTaskRefName(workflowId, taskRefName, output, status); } @Operation(summary = "Publish a message in queue to mark a wait task (by taskId) as completed.") @PostMapping("/update/{workflowId}/task/{taskId}/{status}") public void updateByTaskId(@PathVariable("workflowId") String workflowId, @PathVariable("taskId") String taskId, - @PathVariable("status") Status status, Map output) throws Exception { + @PathVariable("status") Status status, @RequestBody Map output) throws Exception { queueManager.updateByTaskId(workflowId, taskId, output, status); } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index 81a84a831f..6566c6a15e 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -105,7 +105,7 @@ public String ack(@PathVariable("taskId") String taskId, @PostMapping("/{taskId}/log") @Operation(summary = "Log Task Execution Details") - public void log(@PathVariable("taskId") String taskId, String log) { + public void log(@PathVariable("taskId") String taskId, @RequestBody String log) { taskService.log(taskId, log); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java index e3addfb42f..dd7960fc83 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java @@ -81,7 +81,7 @@ public List getWorkflows(@PathVariable("name") String name, public Map> getWorkflows(@PathVariable("name") String name, @RequestParam(value = "includeClosed", defaultValue = "false", required = false) boolean includeClosed, @RequestParam(value = "includeTasks", defaultValue = "false", required = false) boolean includeTasks, - List correlationIds) { + @RequestBody List correlationIds) { return workflowService.getWorkflows(name, includeClosed, includeTasks, correlationIds); } @@ -137,7 +137,7 @@ public void skipTaskFromWorkflow(@PathVariable("workflowId") String workflowId, @PostMapping(value = "/{workflowId}/rerun", produces = TEXT_PLAIN_VALUE) @Operation(summary = "Reruns the workflow from a specific task") public String rerun(@PathVariable("workflowId") String workflowId, - RerunWorkflowRequest request) { + @RequestBody RerunWorkflowRequest request) { return workflowService.rerunWorkflow(workflowId, request); } diff --git a/test-harness/src/test/resources/application-integrationtest.properties b/test-harness/src/test/resources/application-integrationtest.properties index 66358d10ae..10a7c20012 100644 --- a/test-harness/src/test/resources/application-integrationtest.properties +++ b/test-harness/src/test/resources/application-integrationtest.properties @@ -7,7 +7,7 @@ conductor.indexing.enabled=false conductor.app.stack=test conductor.app.appId=conductor -conductor.app.sweepDisabled=false +conductor.workflow-sweeper.enabled=true conductor.app.sweepFrequency=30 conductor.app.systemTaskWorkersDisabled=true From cce4837701e50cca9a18b912a5b30f4aa0bc66cd Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Wed, 10 Mar 2021 15:32:31 -0800 Subject: [PATCH 2/4] Upgrade nebula.netflixoss to replace bintray publication and update TravisCI secrets --- .gitignore | 3 ++ .travis.yml | 20 ++++++----- CHANGELOG.md | 2 +- build.gradle | 4 +-- buildViaTravis.sh | 6 ++-- client/build.gradle | 29 +++++++++++++++ .../core/config/ConductorProperties.java | 13 ------- .../tasks/SystemTaskWorkerCoordinator.java | 33 +++++++++--------- grpc/build.gradle | 4 +-- installViaTravis.sh | 7 ++++ secrets/signing-key.enc | Bin 0 -> 6800 bytes .../AbstractResiliencySpecification.groovy | 2 +- .../application-integrationtest.properties | 3 +- 13 files changed, 76 insertions(+), 50 deletions(-) create mode 100755 installViaTravis.sh create mode 100644 secrets/signing-key.enc diff --git a/.gitignore b/.gitignore index b7b36adde6..cd458a3186 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ bin/ target/ .DS_Store .vscode/ + +# publishing secrets +secrets/signing-key diff --git a/.travis.yml b/.travis.yml index 1f58029bb6..2a207363f4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,9 @@ language: java jdk: - openjdk8 - after_success: - bash <(curl -s https://codecov.io/bash) -install: true +install: "./installViaTravis.sh" script: "./buildViaTravis.sh" git: depth: 150 @@ -14,12 +13,15 @@ addons: - redis-server cache: directories: - - "$HOME/.gradle/caches" + - "$HOME/.gradle/caches" env: global: - - secure: Klmmz8ctX995+JBE2BEnR0CQxe9IVMblIHrQMAim/j3Jdw9rgPX4H4UdDgNbt3yEbFOk54R/2g9eJst2hJVXzpsMiArFLo77HwEwl2cHUPgpCzhQLpp5nyw2e/giKBqaYqN99tzA4P/VsDL9EZR6JBdprG5pega2IdNj9pc9Q/QnG7OEwk/PU4CAhcL6iVvQVqmXf9wwgI8SH+6IKa1BVCzs5GQF3Hxtzk9tNIJAh0f/FMTyax6nLHY0cNJdu9ky75BS4X1BYIYRisSxcHKRLGaYBY6JGzWtba3p1o4hmKw6PKtd8f/aOeOLyZTOePGUVrBIn9WXG/R3UvvZgtkcv/0tq7sMjKRF5sDHetlgXTsPmXvM4RzxJYUx7JQKVdAmFKyRiUFtDFjUQKwbF2gdGSQQd9daZLm4X7KGb8tjiTCB9xSQg7wwbCUJrfGZbicA0sZ8y9Lu+fEbrmzkWqSpy+/I2P2gurKMLdM2y9FwWhcV4mN4u/U9r9B/jJHedm7pVoZBxVs32C6uhkEGeCBxy01Kr9h2EVyT/oZvzo0we2Le9pnQvJ/sNUnujbrxRHC/hWF1sZ2aBeitDLp+b7SK+IqiyX9yt1w5Y/1aduKOyttiRUNgpl1Ytg+cp0z1oz9uSe7JrGL1HSdb0zQg0Fw0ZgGcg+rwe8DyWFJYCjlAbD0= - - secure: WNhtLjX0pLoFdxJ5I5C4E98pPNgFnrpz5nuTc+DBNfX29Ip9wD3/L8+RU7GYr+da+f3ALdboOxaYo5akx6nwlQHGfdhMEHE5SDejS2gAktuKcLEH8WPrp4nXqUnrlpfwnQ7DXB/k01XB9yN4fagEiRFEApB2z0kOv/Q1va5d3M9TrFxT28RFsZnnUWy23VzgCjulqzWZchI24Ra2kWiAjif2EWjwAn3ZD/JKYGZghhtFcAHETg1kz65/AXHOnSFXCntED7AONmSXnpCxuk/5p7CqBoKfJ7IXXT8e98ajmrNvlBx/dBkAepspislc2hgLUSbHAjzuqwdQnz2MGm/XE/lMhhVU5uCDaPDnih9MgH/0ewjJtAc1uTKoc2NtjjzoBCTYhaFCSHzJuSWoi5NIVp9dvAM+thfTcm+/buNdGHkrg+1eLwiItwSHK8R1nBAvk76AqHRR7FdJVi919t+KjveiEd7ez1w9iWnaYh3kycHQCgIwQ9LXfzrR8sLULNq4KNManrL/iuDcEyRG/q6SDfaq5X/i5MFhZ9PuCSorrve7sRKiM4ncxwk2vwrRrqOuqps8QFKLAaf/KvBCdn/FE5vhh++/f3/9oMD6Mc8KqcsTYZFnAm/EKn2236dKBrRI8a2pSQQsMKaoGKOTAmaW2jV9+8j4ODMLi9qWQ3bOumY= - - secure: brK+NGTogesfjqwHso/dK5wqO2LxEUo9gEtUZADr9UlFDzcIU6JomGjeZzeaCsOBlFbZ4p0IIRfivGCi7yegQPWad3cAlyoAQ3+0b+ZxiumbJu5SVVr32x5NxfHNvnW9zIqFIOA9A6GjNq2AkdfBrr9bAerPYc8RjbU4PkH/+CM4HDrx2m/6eXrEbtElCi7IfdRLH9wu6D9/2ANdpK7bCjY2S9sMBvDUsUzGmcoUnJBdInjPYxL1tmAQlAMgWW8E/vKVdyjKq6JsYpwuVnztHlMryrXVQglwrbXtB0gl4Qvqdv0kXAlTf76wQsViEOIvoJV63o/cnFG2lZbVAJ+JGE7cCRaQpIzDf0il5XDkF86XQjaqWpfeEQu7CNj2yjXItn/2q2HaMu4uoEQwQSifRo0n44S7WOSkrZcYly4/Hax9SjiVvDDimlVqp0fURNpo41SMtlW0jXWIYEstft+0vWtPpwzHd9mWEqCQiXkDoAYpjPpfQFpcwFLAi+JO+4Y1Yhuw8NBMHTIDOzjbEwRo06yO/9pYICmg34a1mVAOTdAhpXR8HfTbFlTd80Xm3kYLmrOZrj5yWvP1+XgLDnMFEqw3nHYHQYGWKaVWs5OfYlkhWrpinMVdciLJEp20fUudiSfO7zcbjOXbN5Gg7E0X9kdIbKG9/h+m9buHJuD8QPA= - - secure: W03DuzGYB2tpW6cJJgeFwG7urNPxSbNrrDk3kOApDb216woJ9BVSBGF1Jhhje6o7yYK9k2C2z02ulMNRnfkZ4Zt5WrsiD5zljXKM9G5BOy4zMVesEj93hRq99pfMiidH4pd6N1SZpFCeybxHIIuGHl43lCeDlgxxvpavsnoRwwDLGeRdiMlSB2uChAa9j0CmPr28cYB1r1iXpQPyOjgApI7TzRB42+j+pR0GmZWdCUbKpUPeyg13jQ3d4udgRSPG7b2jUTdrSiVkOD71d/25tmLNWygt2O+mUfp6cKDxZrYpD+V6MFIxHd5AWg9Z3KY/QBUizPKAvpKNDQ8pVj6yqsePYShl4IpTUhzbeFkATvSNXZyzSlmHXkAwkO7Gb8yOOvFqbH8cSqfXqNtjBIoP8WnA4caeY1ZCQ8ec0IpIc3nqng/lTk89hJ+vlmmj1h2G0Yh7syaNxNd7+yno5BXoLXlobACPMUYeHifEjtzcngM9i91m9yFviv6n6WGTnbSz4QTB0Pr5IEzIrOAudBPS4MijR/PmsgEa5l1tiCSWiTf2VJwMcB7g5tAzZqGX/wp4C6A/gbfPUutZBbeVnFCzGP5f9R0QtOOZm07cmN1IoO9+uBvPI62K3TQefgIF0/XKfiRhGKnhwdgZl5RZwN0WkAPVEjoWYXY4QSAZg99H88E= - - secure: VAHbP/8nTAIl2UuT++C/BfSfBDxJPZOEgbCQcCyUpHsFa8SdstuB5Le6VZYaAzcs7wR9WFIHP6+llJyg76p1OhxHC/iG+5QFSqKSkA+RkPyBAmtNTw+Pt5i/0MMxNbBrlogPvFoGe9/wighYQKNwK3In431PSh5n4sEiXPc4XVSzaP+Qxpd1g4VQwQV950JTx97QMLwnR1RNNz/LhBaisE7XdTM561znmqhcRmfGZY7dlhdZxMp+60ngutIZUfSekFLY2nYecoWZv6kEMBxEMnnGBYamCUy856TIVgzGAtD5VScSiRxkwawBKN1OsgvEfwxg/duCTZ9GkQ1LFwxjNDX7bVUo3DsjlqteyJ8n1bh3oYlKgFN6XRiC/Tz8fh66N94AFM8+dc9aJFyBlPBPW1MxxjS+4Y9l3cHxTvyoixguKSHdOypT2PdWkWWSIPGE6j6S33sUJyJuuA/Eq4pG4bd1OfXcjdw+/UJlkLsb3p+ojPhlFqDtRlFTLeS2Mz565EOs/jTzUjuQFNrz3f4Ht+1JpWq9To8KjHzRelRxWR183cikTD9SCDRTQlBlMXcMJHXAasssU5BFr6ZprulKI4UNiU0b3CCVlofDiL/Zd/788TDyqCX/pqI/YmK62zP/EWxOZTCdbfbYetu/+b4c5z//ygfLbw2j3bmtB8ojnE0= - - secure: jiEHSPnbGaejrl6I9Aj4ZOmunzwBtLtnYLggB6W2KBVj115QLRTr2E/SkXrHINWLksV98oPs8J6E6v/LSJ7YwMQssyPmO2UjhakFNZCZpUIYeo+l8vP9LKRZhTbhav9dOG80RUIXUzqJl48GjaFrChYzdzNSXEwBhVqS/cPbEkfxZ+bPnPsuUseLjd/pFbn09CJduqhUWqv9OzjVa0cTjnVGIBDoqWp69p5M2Q8Kpf4wMsZ/gn1oww20YE/XpDrxo1bZyNLbPwsqRSK5lnwG8uqgohkFYAJfIzoriXK74pEPqqp99zmAIO8otdKeEVU6EA6NoK6LzAUa/6l8sa2cxcxNU6bbVEC/IbAWQYWGRDrUa0fNWYaNF/2aMSKbXCgH/KQQnBR8laVlNhhXArxUJGBaygSrLPL12l53tSAXPoPD6jYABtkPPkW95jyp4Zu7LrmjRCNJN/qMXl/DOl306WKzBHnftBeeICsFw6AEkoSHIEIrEJpk/jN1uLWhoOmE6o7sEn6mwVhq4/DqqCGnZZez6RwwqQ2Hiq2Agf7LXEzt5lfm3dKkaxVw4mFuieMWcxmrXYEe9MtrYwdUzssse/p5x2a+SeDgoSg2w17ZNoTUJD6ZSgxMuYJEIPzXgISqZh+ln3ZO0+Raa5yVALhrVY/FCKCuPhwDESE9i65MVlY= + - secure: VAHbP/8nTAIl2UuT++C/BfSfBDxJPZOEgbCQcCyUpHsFa8SdstuB5Le6VZYaAzcs7wR9WFIHP6+llJyg76p1OhxHC/iG+5QFSqKSkA+RkPyBAmtNTw+Pt5i/0MMxNbBrlogPvFoGe9/wighYQKNwK3In431PSh5n4sEiXPc4XVSzaP+Qxpd1g4VQwQV950JTx97QMLwnR1RNNz/LhBaisE7XdTM561znmqhcRmfGZY7dlhdZxMp+60ngutIZUfSekFLY2nYecoWZv6kEMBxEMnnGBYamCUy856TIVgzGAtD5VScSiRxkwawBKN1OsgvEfwxg/duCTZ9GkQ1LFwxjNDX7bVUo3DsjlqteyJ8n1bh3oYlKgFN6XRiC/Tz8fh66N94AFM8+dc9aJFyBlPBPW1MxxjS+4Y9l3cHxTvyoixguKSHdOypT2PdWkWWSIPGE6j6S33sUJyJuuA/Eq4pG4bd1OfXcjdw+/UJlkLsb3p+ojPhlFqDtRlFTLeS2Mz565EOs/jTzUjuQFNrz3f4Ht+1JpWq9To8KjHzRelRxWR183cikTD9SCDRTQlBlMXcMJHXAasssU5BFr6ZprulKI4UNiU0b3CCVlofDiL/Zd/788TDyqCX/pqI/YmK62zP/EWxOZTCdbfbYetu/+b4c5z//ygfLbw2j3bmtB8ojnE0= + - secure: jiEHSPnbGaejrl6I9Aj4ZOmunzwBtLtnYLggB6W2KBVj115QLRTr2E/SkXrHINWLksV98oPs8J6E6v/LSJ7YwMQssyPmO2UjhakFNZCZpUIYeo+l8vP9LKRZhTbhav9dOG80RUIXUzqJl48GjaFrChYzdzNSXEwBhVqS/cPbEkfxZ+bPnPsuUseLjd/pFbn09CJduqhUWqv9OzjVa0cTjnVGIBDoqWp69p5M2Q8Kpf4wMsZ/gn1oww20YE/XpDrxo1bZyNLbPwsqRSK5lnwG8uqgohkFYAJfIzoriXK74pEPqqp99zmAIO8otdKeEVU6EA6NoK6LzAUa/6l8sa2cxcxNU6bbVEC/IbAWQYWGRDrUa0fNWYaNF/2aMSKbXCgH/KQQnBR8laVlNhhXArxUJGBaygSrLPL12l53tSAXPoPD6jYABtkPPkW95jyp4Zu7LrmjRCNJN/qMXl/DOl306WKzBHnftBeeICsFw6AEkoSHIEIrEJpk/jN1uLWhoOmE6o7sEn6mwVhq4/DqqCGnZZez6RwwqQ2Hiq2Agf7LXEzt5lfm3dKkaxVw4mFuieMWcxmrXYEe9MtrYwdUzssse/p5x2a+SeDgoSg2w17ZNoTUJD6ZSgxMuYJEIPzXgISqZh+ln3ZO0+Raa5yVALhrVY/FCKCuPhwDESE9i65MVlY= + - secure: Aq29mY3fX5K4MkdYkGQG3iYYnIjT3yYvMJbTrOql8LD2320pdq4nD63xg6KKqVLql/JA89szH2hTI+XWPzhtOTHzViTOdjJjB/FS5bU1WpMt1nH4NoFOdHJ7yb5Y1GT97+7Mq0+njK2jL3Nh/nw0sNBMINvBuiT6KO12IFs3ciWBCTy/g5d2bxBx3wyQgqVm685SIqKOomGU+y3Lz4yo8Qjh75XYeanw2QoHhFcWzEMhy6uL2wOy1WGvcbPROaUI2HMCqOdF6KYJQl2WXyqCGzGbCxvKaYsZ43bMjB+ndCd4EAFDn7iO3BJkX1usgq4VIZ3KWPi18M2GYxHg9seE608Z22i2sOyS4rY+7gKs+WfuGxbEDODmEoKavQ/BSzIZYKgn2dA5qgeyLK83bf2AF/TEuQSfwW4JabtsHo86n1RRJvEggaGxEQMqenhX0IZkBmI1q90+gX4oSnghU6ilnNeZuDoJjdDnJKfxXL22LA71EZC5QiGr4lWOmZ3AQ67AuoTs44h6zuOpogQGWDtLgeb3FIGBbitjMVzUHHsSAeR7yiZQdgsbMb9s+ko+tG1p70JTZYyCINJGakSSsWBQPu9c+tvglRtK6aZlqk+3UBqrC+goNgJpBdZPYCIjztslu9p9doLMetemxeSTI3Lqd1Yjsw7YcQ9TW8B+lP14lzM= + - secure: uTP0jEFCBOiZ45wF65a2ZPvzxXLBp7ayZurQBR45ty3jlVAIw9GAyGm6zFK2Diw02HJDNFO8Jht5z9YoSz+B3SnYSn3Z5u/QqfWniaCGVmErU9BJkOsAora3j/o76mY1309B/Dl97wLkNOHMlUyXhGmxNq4DW15iERlqlVJlxREWduL/ol8VV2k69Cvjgl7jxuuKV9BsLJ0vssFsyj+LspRrK1S1lkdTMgQBMuBRdNNzQO6z4XPxJBeMS8ecAGHuuYsBeXlDwMVsEmSl9PPUOO8DNIHSxI/Zlbi7LJQDgxprI2oDMKGgNVCTNecpuoCllRDFx0gxxXYjl6CMxGdH6KtFdql961rMZ+Vtxkanrq7yJaX/1UaT9EnYnz5dFLXDP895S7psOfZifTMEl6Db0X6krrYYL+44LWsQUk5CbpUEkK281YGa6ln8lTvxAg4iR9YVjngMMGzGI4KmP7BqpPoEydsGYerJKfSxR5HumQOI0pRsQuP3Uj5NXxMjjMXpkBsvbvk7ijTdeV0E8U4DoeZRr2ntoCXSbZfPUFeED0g5aDfRgGgLZSuNCO6nGACzKcgMaR7NrQepB8dU7ecK8+wjyTM/6uWExPXuq56PTqG15PwAb9ko+kCCRWbtO9Ccj8bnuud1gKKLhFq/3AQjMFuqpmnAv6kgbI8gH9AaTwM= + - secure: fTrUSWExuF81XHxunu+oqUR94RtAfeikLyX64dNk6hucdNyieRw3MELTkoSiyEKGIRYlDk4DcAMRrpCzpob9RQZUDHxdP82bxIc/+ig4PkNq8gAIUsBjUq9CF6NVxYeewRWi2behZB0GVacWGfDJQUG0Ead4tjB6g7Rk9RqyZbaXiDxSfIMVbXdHSaEJ2eIni7KWb8IkL8zViZr2L/xD61PP+CAljpL+j1/JcT0NP/WJ8NVXaqm31mvNTgSuNSE1UwF+jINZEFPWAyBZlr6D3/r8KLKYnJSTky1yzmDX4paoOXTEf223qtRIvwB84U/cJVPANd8fBMGuGW2hRIEeCqqy/m/tXccg6fklzDKErGQLeuH6RVZlDNIlgWuwLRteU8JQsEmEmqllag0rkaW7gJXQcTqCmFooENfBI4vVQYvWbkBlPE0rWsRrBWKeXE1Q9fE2hvubb0V2lY2nT8Qj2SDHJnuCUbnj6jp5ZEb5rH8WT6JWUhlbML2u1UWStASyLaNy57AO3PpoSWysAGVW7y8nDg3HOZCl36BoqjTTgd7a8ijnYHo9WyVYdAICWFLZXAqO8dENWo5SFx+lMfYwHAJtZlC+2AGwL+jHLUfe5Qb6dJej9d5QsGL//pxXmsDNZtHzXlKhNf/WgAIFoTRpOz7Osj5QpUr+TtUg2mU3vq4= + - secure: M4pmOTHS6JVr86X80Kiy1P6Ls9BejcG2tpYdSrVUpDakv2xgt3Ri+M5FPoZcpkKeb4MPLB9FaAamB4uvr5yXqkkeERmmUHpN4XBfitY8XAKYp1oKPcvHEDSJvHx5zRWHX+G5SnBrSc9aQprluQzj28f0TI8hpAe/+xabqNqQmuOhtz77+6hgK+l6w8XqqaPUgdI1cFSlZXyi6DM7URDDiKtFJZZYl5Vt7NGB9xz+idzwqic6rKv2lC+KRKGxw+Mz0RtxvGQdkbRmbPd7tBJ86CmhDq43Hi/rhQpX9kyJsmFxPfdZpGWIDvNArp3E+NVw7v03/vY4IygxqETQbV3itgr4LRlbEaSj7l8V5rkJVknJVCXPSPuE+y811mva7E8z4+/VS2QEbWgKaYCiFbRpOigmclh5jJRPHWJyh9MgDTzbhBJ+k/Q8aLJi/Y6zKowAUBgISovtbcWqgEAfnBLIh/CStitJ0YWwSLHBFk0UoOyvst179WXr3DhkYr2/oP2R88Qfm9dtlMt2cU12bhsztIo4Wf5UPuNYIJRyblqyV03zRRs/nUjiM/vg0kPsWIo52iauxKNBRXJerj3Kdi0P+GRheIT4XyqOKh3zmBpDO2Pz2N1nCr6eLaKUrQHj1APjTctGhO/qtKj6BxsTngMXvobpilQD05Bx2j+4XB4r8/w= + - secure: uoXN6OtjFlYwDXCHsb6igV2hFN/GJrkhneI+veJj5hgAD7F/5w6V24d8QKav0H4kG6ErMglXF9KZZUcaGr/cKNJo5vuMpFiGjpj2FuT2NjRQ/adYv/qc3SVnF1xIEnQVt9TehxbpN13kwtPhaiedVxlHxTIFSzzMuIshuazZmbZIBLId2GwKAHEouvLM1jFbkbzLDbO6OfhsHW1C6vMLOOs/HHVGVqcZiI0pEuFDI1nRakQSbV55HIYyhuvVAXC71WiGof+0IXgokh1S3aZauxrkYfkM+mmcGuWJ9/cpyJjKmv0b7CEDMf6OOXRMUqClDO3GA/U6aN0XeMzbcdJDdRdKuopzDGgQFUONf/KsA79Ewt3y61w21+bR5FIxShRLN3wfZl9/Rvq5BUP6n+no9/UQwq8iLv+i/jQ4q5lvHk+BUG34MbpUKqFXiNI+tyuaTdnIThVIAB4U1twYjWvtKoLGX+0LKJouTAJXCZxVIlOQQmFVX9rEwo5+K+3CfZCFcOon9qgrlT+J0T6gVAyXNPYdCnhEd7SrTj1fbxq8zgwRgywfr0EJ0mGFmNh9H8eIItdQWC0QwszkZ5CqVSgxQkpnA3ItMrvY8Tog8FkaIaxeVvYjRgnDRHdkbskb1QLWDsPX/cSjjpLm5NKz0NUzSENDC4n9svn7CbfrOyjlThY= + - secure: doEhOSV2bJFjceHe8aiEFeCggg/wRE3LShqGleEk8P4oNoMqD1neQKWKtE4pVWM9ZVAUG/8QQsgpV9BI5uQSbw/bHdSv9x8lVl5gqIIQbdiBQNJ0zFbcpvu9HVbmHAS6lmmDnQzZOilPbwxMtazubLIS5py40yAEA7HkOUST/Ul4du7rxo+CB98FCzYAiueuZ61preOjVJ4l/Ko0ddAxWhtSOS8Z/fwsJu6MLGo+GVuAkR30t5gWfBeFUxSJpKZoQ8tagcW/4bVNeQ+msV0Bmtj6wzh7s8gEXS2wKOMjj8WO8wvbyHmgILCPKcBbH593YEZMMusXmvch0nGgaUcc3uHfkJkQICGboAkGj0vCDFoh5P9jdpGJUqr2ThASfWPYnnC0U2de/fA/2K5Lb60G4WWCwXg6DjmFGofVdYsxssaQXwx93eo8nxetoHSSqKgTPMLgH/xUcEPkrH/cqrsSGB6kGJ/DJ/76MjEvecwmj4vVZD2a4/VX91FgpXiO36O7Bgnvt3GSKBMiEpyrhxGwzd7w383SRWMyCX2K7a8NaokMwseOe7DI1dOyXAiB47x0MCMqduiSq9nlrEPP1aWFRfDPEHOhToAw4EQCxRhdMXxk0h9ADpAmnZqBvJgyPvaYZgwNrlblebqnUFU50RVopAELQf0KTo715Yq17gvUJ5Y= + - secure: s1BmNXlM5RhexGZX6ng36ixSuaDL7r2wVpYC/z1AcAZoRlAgk8nstBSYncAtYyN1wWQ9+vCFDv89tarmXG4fT68VeFEQykr+NJDylU6ZVQs3WeaWlLBU1ecv6iGBbCj5idxwiCf7AIFdHGWKmuvX4Bk+L/5IL9nVnjcNx709okO9VGTRCArE/DGgdRJA6tFE7dyl8n7BbHVuaQJj68Ylx8CDFRFUPho011DYYyyiOwhOGrgMWNXn7YbvEH/PVaUtqtGYA2pAZjbdOYO9ODIGPwO99bMDHHJHhyrcbltjmVUpVF5QAXnSWGwGO5pekyQxy3DYcdKe38CLr1V87qPwokq7chZB9R+zAMlKIf1+JslVsh+RJWF2R8gFAgfivZT4XbEdSOhJ1T0WysFpnHodbGjJkYjJv3akaUYUtluS8hpRVV79iZduZ0KYCVvUslKfgvFVvt9xdX3h5F7eoJVID/HpyR1PtPU6Ct+WM/sMk6kcDK2AWF8L7cjdAWQWjxQBdu7EXo5qLAEFWy2Sh2OlZtfaNBcc5QJ5Yd8Oas7KWWySzVNqDmj8GdR5r+YMYQHCWzkkuj9P8dmghhydre44K5YK9cjpfB2r3UJ2CNvq9JufJI+FW7na9flxLxZsRg02uwYBDjYDRuzra4pROj3yvO00febv9Vm556BT8gDsNHk= diff --git a/CHANGELOG.md b/CHANGELOG.md index a5553928fb..5bf6d143d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,7 +137,6 @@ Modified properties in the `core` module: | workflow.system.task.worker.executionNameSpace | conductor.app.systemTaskWorkerExecutionNamespace | "" | | workflow.isolated.system.task.worker.thread.count | conductor.app.isolatedSystemTaskWorkerThreadCount | 1 | | workflow.system.task.queue.pollCount | conductor.app.systemTaskMaxPollCount | 1 | -| conductor.disable.async.workers | conductor.app.systemTaskWorkersDisabled | false | | async.update.short.workflow.duration.seconds | conductor.app.asyncUpdateShortRunningWorkflowDuration | 30s | | async.update.delay.seconds | conductor.app.asyncUpdateDelay | 60s | | workflow.owner.email.mandatory | conductor.app.ownerEmailMandatory | true | @@ -272,6 +271,7 @@ Modified properties that are used for configuring components: | --- | --- | --- | | db | conductor.db.type | "" | | workflow.indexing.enabled | conductor.indexing.enabled | true | +| conductor.disable.async.workers | conductor.system-task-workers.enabled | true | | decider.sweep.disable | conductor.workflow-sweeper.enabled | true | | conductor.grpc.server.enabled | conductor.grpc-server.enabled | false | | workflow.external.payload.storage | conductor.external-payload-storage.type | dummy | diff --git a/build.gradle b/build.gradle index 2bcfa04d21..9afc32251a 100644 --- a/build.gradle +++ b/build.gradle @@ -4,7 +4,7 @@ buildscript { jcenter() } dependencies { - classpath 'com.netflix.nebula:gradle-extra-configurations-plugin:5.0.2' + classpath 'com.netflix.nebula:gradle-extra-configurations-plugin:5.0.3' classpath 'org.springframework.boot:spring-boot-gradle-plugin:2.3.1.RELEASE' } } @@ -14,7 +14,7 @@ plugins { id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' id 'application' - id 'nebula.netflixoss' version '8.8.1' + id 'nebula.netflixoss' version '9.2.2' id 'com.github.kt3k.coveralls' version '2.8.2' } diff --git a/buildViaTravis.sh b/buildViaTravis.sh index 6951163cc5..70eb5b5eb1 100755 --- a/buildViaTravis.sh +++ b/buildViaTravis.sh @@ -5,15 +5,15 @@ if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then ./gradlew build coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']' - ./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls + ./gradlew -Prelease.travisci=true -PnetflixOss.username=$NETFLIX_OSS_REPO_USERNAME -PnetflixOss.password=$NETFLIX_OSS_REPO_PASSWORD -Psonatype.signingPassword=$NETFLIX_OSS_SIGNING_PASSWORD -Prelease.scope=patch build snapshot coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']' case "$TRAVIS_TAG" in *-rc\.*) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PnetflixOss.username=$NETFLIX_OSS_REPO_USERNAME -PnetflixOss.password=$NETFLIX_OSS_REPO_PASSWORD -Psonatype.signingPassword=$NETFLIX_OSS_SIGNING_PASSWORD candidate coveralls ;; *) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PnetflixOss.username=$NETFLIX_OSS_REPO_USERNAME -PnetflixOss.password=$NETFLIX_OSS_REPO_PASSWORD -Psonatype.username=$NETFLIX_OSS_SONATYPE_USERNAME -Psonatype.password=$NETFLIX_OSS_SONATYPE_PASSWORD -Psonatype.signingPassword=$NETFLIX_OSS_SIGNING_PASSWORD final coveralls ;; esac else diff --git a/client/build.gradle b/client/build.gradle index 112a20c621..628f97c2f6 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -1,3 +1,17 @@ +buildscript { + repositories { + maven { + url "https://plugins.gradle.org/m2/" + } + } + dependencies { + classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.2" + } +} + +apply plugin: 'com.github.spotbugs' +apply plugin: 'pmd' + configurations.all { exclude group: 'amazon', module: 'aws-java-sdk' } @@ -25,3 +39,18 @@ dependencies { testImplementation "org.powermock:powermock-module-junit4:${revPowerMock}" testImplementation "org.powermock:powermock-api-mockito2:${revPowerMock}" } + +spotbugsMain { + reports { + xml { + enabled = false + } + html { + enabled = true + } + } +} + +pmd { + ignoreFailures = true +} \ No newline at end of file diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f98427abb4..808b9b6108 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -136,11 +136,6 @@ public class ConductorProperties { */ private int systemTaskMaxPollCount = 1; - /** - * Used to enable/disable the system task workers that execute async system tasks like HTTP, etc. - */ - private boolean systemTaskWorkersDisabled = false; - /** * The duration of workflow execution which qualifies a workflow as a short-running workflow when async indexing to * elasticsearch is enabled. @@ -420,14 +415,6 @@ public void setSystemTaskMaxPollCount(int systemTaskMaxPollCount) { this.systemTaskMaxPollCount = systemTaskMaxPollCount; } - public boolean isSystemTaskWorkersDisabled() { - return systemTaskWorkersDisabled; - } - - public void setSystemTaskWorkersDisabled(boolean systemTaskWorkersDisabled) { - this.systemTaskWorkersDisabled = systemTaskWorkersDisabled; - } - public Duration getAsyncUpdateShortRunningWorkflowDuration() { return asyncUpdateShortRunningWorkflowDuration; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index 27d72f05f9..4b82aa4e7e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.SmartLifecycle; import org.springframework.context.event.EventListener; @@ -41,6 +42,7 @@ @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component +@ConditionalOnProperty(name="conductor.system-task-workers.enabled", havingValue = "true", matchIfMissing = true) public class SystemTaskWorkerCoordinator implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class); @@ -64,9 +66,9 @@ public class SystemTaskWorkerCoordinator implements SmartLifecycle { private final AtomicBoolean running = new AtomicBoolean(); public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, - ConductorProperties properties, - ExecutionService executionService, - List workflowSystemTasks) { + ConductorProperties properties, + ExecutionService executionService, + List workflowSystemTasks) { this.properties = properties; this.workflowSystemTasks = workflowSystemTasks; this.executionNameSpace = properties.getSystemTaskWorkerExecutionNamespace(); @@ -78,15 +80,12 @@ public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowE @EventListener(ApplicationReadyEvent.class) public void initSystemTaskExecutor() { - int threadCount = properties.getSystemTaskWorkerThreadCount(); - if (threadCount > 0) { - this.workflowSystemTasks.forEach(this::add); - this.systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); - new Thread(this::listen).start(); - LOGGER.info("System Task Worker Coordinator initialized with poll interval: {}", pollInterval); - } else { - LOGGER.info("System Task Worker DISABLED"); - } + int threadCount = + properties.getSystemTaskWorkerThreadCount() > 0 ? properties.getSystemTaskWorkerThreadCount() : 1; + this.workflowSystemTasks.forEach(this::add); + this.systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + new Thread(this::listen).start(); + LOGGER.info("System Task Worker Coordinator initialized with poll interval: {}", pollInterval); } private void add(WorkflowSystemTask systemTask) { @@ -100,7 +99,7 @@ private void listen() { for (; ; ) { String workflowSystemTaskQueueName = queue.poll(60, TimeUnit.SECONDS); if (workflowSystemTaskQueueName != null && !listeningTaskQueues.contains(workflowSystemTaskQueueName) - && shouldListen(workflowSystemTaskQueueName)) { + && shouldListen(workflowSystemTaskQueueName)) { listen(workflowSystemTaskQueueName); listeningTaskQueues.add(workflowSystemTaskQueueName); } @@ -113,13 +112,13 @@ && shouldListen(workflowSystemTaskQueueName)) { private void listen(String queueName) { Executors.newSingleThreadScheduledExecutor() - .scheduleWithFixedDelay(() -> pollAndExecute(queueName), 1000, pollInterval, TimeUnit.MILLISECONDS); + .scheduleWithFixedDelay(() -> pollAndExecute(queueName), 1000, pollInterval, TimeUnit.MILLISECONDS); LOGGER.info("Started listening for queue: {}", queueName); } private void pollAndExecute(String queueName) { - if (properties.isSystemTaskWorkersDisabled() || !running.get()) { - LOGGER.warn("System Task Worker is DISABLED. Not polling for system task in queue : {}", queueName); + if (!running.get()) { + LOGGER.debug("System Task Worker is DISABLED. Not polling for system task in queue : {}", queueName); return; } systemTaskExecutor.pollAndExecute(queueName); @@ -133,7 +132,7 @@ boolean isFromCoordinatorExecutionNameSpace(String queueName) { private boolean shouldListen(String workflowSystemTaskQueueName) { return isFromCoordinatorExecutionNameSpace(workflowSystemTaskQueueName) - && isAsyncSystemTask(workflowSystemTaskQueueName); + && isAsyncSystemTask(workflowSystemTaskQueueName); } @VisibleForTesting diff --git a/grpc/build.gradle b/grpc/build.gradle index f7c0e04671..1d6edeee21 100644 --- a/grpc/build.gradle +++ b/grpc/build.gradle @@ -1,13 +1,13 @@ buildscript { dependencies { - classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.13' + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.15' } } plugins { id 'java' id 'idea' - id "com.google.protobuf" version "0.8.13" + id "com.google.protobuf" version "0.8.15" } repositories{ diff --git a/installViaTravis.sh b/installViaTravis.sh new file mode 100755 index 0000000000..82cf1b8803 --- /dev/null +++ b/installViaTravis.sh @@ -0,0 +1,7 @@ +#!/bin/bash +# This script will build the project. + +if [ "$TRAVIS_SECURE_ENV_VARS" = "true" ]; then + echo "Decrypting publishing credentials" + openssl aes-256-cbc -k "$NETFLIX_OSS_SIGNING_FILE_PASSWORD" -in secrets/signing-key.enc -out secrets/signing-key -d +fi diff --git a/secrets/signing-key.enc b/secrets/signing-key.enc new file mode 100644 index 0000000000000000000000000000000000000000..1f361de8a9d164ae73319abd734f60c1445d9d06 GIT binary patch literal 6800 zcmV;B8gJ!OVQh3|WM5zE^AC$fA2ls#bthS!_vi76n6f_34GPkSf6^S9{(tBUXj`^P z9rTov+FjApS!MMe1awpakR<(@znnCi-Q?v?5^!@v#)xs@i#FmruQlcg>UjPD2k>#ZTglIH zeT-UNBpfji2$QZ5!_-7d#iDJ?YECfkR_6$6BIiysC`vJwBnTdvS6b>N=<00ao1DzY zFW+@w?@e}kZI%=`WPH6nY^LE-n{NHM)Ek*Nsj6&`gOcVyvaNNVsS9K$NGQIu%*A`i z`kbu5YY5=CddumC5Ea8Am!tg>`%WG#bp(H;bI@&uDw+I!~A{qI8W;~c1%JADkHaIa#T$Il+ou&=cS zj2DZk)FLWL=N-Ivl)S*SFLE9{XfuOsm-$c(k^*CMln-=c>;}rj}{mFI!YvrmJ z1pBn#w{f@}(P;w34UM>ncZ)uFJ3$Cz>i*swz8NX!@UwJtu&)`*dStomzJ3*QGGax$ zl2NvVF*t-2-^r#u;Sd@(vGy%7Cb(F(0f_3#RP#OTK*AFaW2rD#7I#Edbg2mH@f8I3vI*16o2t4raA#HD2?AIiviTAodpb35vj_l~#E z$T&g-8)$iO)#=&}M3 zVyt3Ib6$=sBdfMt3>(qvxAjv#a25lmyBSHQ5%z8I)Qe~IUy_n=PRUF{xQT{qn_#^y zA$~E*BxtgM0ur?!7D7jFf_vv(R)Np$hJ=SGU2{^{IZp&w#R0)mHF%o{Vmu?!8qftV z#Z+|?rAK1|*!h_m&=eZy{JFWYl4`6-5M5V1 zW?J>{;c2NXW%KIB2%2)UAL6^o>6NI6ce1{LhqP1PyS4ZMIVbGL=yUb&cW~#NXBW_b zk^YAYsHBvbMCUf9{v?c|aK*j}6S!+cx;37y1^+&>_O@{d?8{+5^Jmti34e-xfli6j z$m-z8EF!2n3oyP1{#X;kES|&jo}4E1PF?FT8wSW@d}X;}A|}f3u}*Iq*c1XT^0-WqnHZ=Tp*!q)+GqOAu50qPA%jw#G5> zHhtzV3GM2@0~2wN;2w+mA4dk5VDoy$zQVq`59IDD9>nrSgf+yVY#qU?ySYlaZ6K9vn) zvn%PsJ3Xp<`fuA_xPCgL;@G!*x3P2l|*a2#hkzK5n=d zEu4VT=%6~%b9W#xKcWRg0%*RKbEYZSx+@GIs8ITuYbcYkZEMsyFm!eytyaVmaOX#3 zrm0sUqDhEhFGmx$d5Y(*k^TAJd8I+9?!Sum9?{JU>!HHWSFZkEARKgl&IeG6Y%CjC zDum5}?;F0aUkf0sc7~i;vuB=aKbs#Xb=4?_r87<|R25hSLCvLTBQ9<$^^LRn^MY}z z4Nx)65{1_duo{sePFOu8tf?5Z(Q{X>ECUb2K>#WM9#T3?H7Cxly^uv%)!YdE73fRf zv@W2}+##l|IaJ!Uqh|)OctzfBtxAFc{66+rTU$UQ!@vI&h|hUrH>NS%PwcCZq=F#KMS#y)q#6U=SWnppLhdm<}fyZOB4cEeHIP<3SWnI7ZW3pBFi3(tsd&u4~=e!cs*e3Z2MI#A{Uc zieEC3xsB3F0E1(t?-dfTkd<|OY!QV@A^#orFL@UxG%;Nk%VE8!?8&qmee%(G!dbQ ztHx<^*UElFONz;vvT&^-{DdWDMqcz)A3*MdK=nh7uCPZkoK4g6&N4GP@1>sdI+)iD*eAaNrf#G)NpV}_o@e@sSDZAVlFlqxw!_3+( z3Hc~%B-^h9K@6kbox(jylP&4xna`%4c*?EWlbPrxk}cCb+P2SmGrIkKD{C6*NsW~2wDIaF3FT*OS>MOZ@mUVt6H+>xnHt^^8aiEjX-8Gu z4dvB%0$kjXH^6=q!Mm}oaw;>i=H>3^J!hS*cT~D=Q^x-@6_zY}JZrsmEB(11b5Y$v zYzWLXHp5H^4ChXKTLqS?S3UlGtg}QNshS4?wr-wE+B6mV>CY#M-qjE$h!9&ot!?m| znma3x9JY(73a5vAId!c?{?X!X2$VUMvB%%`x-n>5B%K)Zsb|243alxM5v;M6`@#P( zB=JG@wz&59A19XlVEJ$a{pqD}l&s3J=y{ddg+^_M#s5mBF(fdC$azQ%o(|R`F z4`;i5VE&R9c^4={?RY|}76R6YG1ZU^#x|7s^5qQZsT1xZJ}wjMZP?Emqh-LhfDq!w zDMC~<`m%t#H7Amgl6C_^xsFg17Yj z3!$=a$LJ!IKeMSWI~Jg*I9oUS>+%)2x8&W4t&hI^oWkrsfEz9>03cguDIukcZav=C z$C?`_qLA{r3F0{02QCWtGA5OdUO@Eu};+?&5_hRxaM2Xs|dX zj*f@njbw3apU^Gvn5+3&Ti{{HxLhav;w5##{-1>!MVkHcJYTu)t4s|otdwQA3d`}Z zR#0gYp@)l}f+($PqYzKV5f2G9IYj7Gibh$>OnzILSe3T~m~WyCIV>(0R!3oJF_w=! z++PfgE(&k(B;4E%aWb@0UynI|aKTlfVJZ=S#!W{r;!p-e=sT|#(+j&C$?FBTl!Y4j z&I_nQ7mb)A?;R9pQfnu8!Gx@w&CV%^r(n>6;1Ueq^@JKzfs)0@;aFBrfQ6ovF+mh# zoxb(%V(Iv=md3>F*^_t(Y|K1hF6Ti)*vNL^ z9g+t(pXEVCujasuZ3Z|~@5dBs^X_bW$8nEQsZFQJpHXEn20ngbrwvEBm zJ&dK+wf0wAC)V--GQ2$%(R{dY?4orh6kxoyeN~0fwI%UPK@TtnRatqyW8PG`B=zOK zla=Hf{jO$-VOE)Rn89@O^F4?IO6Cr>iHVS+oFN?hL!zgio^7&WaT~*zn=Fu2&nWJg zUWVU0rx8bntZo!~6jPOcHRWeWHQ0)JnC&%UULVu;QgD$B>}f;IvsHE&fBujuCCSLXhmr~qz?E!6)5#tC__e0dohzm9d8N;w8GugNvryEL`S#3)WG23MGCA0vs z`${p0DqDex6cDmt#%?9X^ktPit&KhDJ{FPM@E#jmI$--{^bS2xN<$AQzjqGT=gP#r}j8mko3Pn(|7MwT?c_cxm3T$mMV4*ZI~KAo-EB-?|SxlHu&I3bvTBTdlcdL??E+c zS+s;nbEVz)%^Q+ z|JXFMFfrs^)yOFx&3sWCyQ4-Oz*ng`uBm#zC%dEM^msBbp!O5%i!bnEm z)=bG7R{!4B$XOyO?XzugEM$t$^Mu>4hdoH58pibQRgaX2h(kASf@6qog=J>I|JdbfUz($@h`U&Xqig@h5vYme_#0j6t6@Gx3_WW{@$bU8LB;^U`LNID z5Jg&%Lyw%hPo8T>IUk17Jx8ru{jAaZN{OxwsAnUn%3c~8trueV2<~l6CF5N)@9k>S z0=%mvr$&=cfIaooXFW~_(;50)&7h~-%W@XiUEZ2Fpg0iREb!FGM<|6K9p4YByxH(% zK+{F*Bw8Lt99Nc@sUoY#$<(_Ioz)F=Y=YqJbjP8zOUPfj1%N^C-7==*Hr$riV$&C5 z3`%E;2S&HJ5~cCieW-Ug3zJ%%Q_5y^#OnCb;!(56;F}Sd43?2m4Azi{FcpWlefq7^ z3^Ou-*RuQhhCKn{OGjob8PdW8Jb*k!%8R!I=^>JM&jMA~j>N@t%N+n(@KV8YBe2(r z!3&q?KDX$Pv`9K#wgA}-=3)8GNlJ@WM0>dNPc%TVnB^q>Wa5@DKRx;5DQQ0bbENPC zh))MsfC=ugb5x-*W`o!8?%lWdq^a-sl?6U~J%|0eZ$?ZTsmq6I1FmMhj@sZcS`8jk0T>1H_9_2T zx$5GVI`XAf0dB~B^nN9jvsI{?3c#I!yzmgm!Y=^rsQg=(!}PAGqj- zJJK<%&&bkFv2Gu7W1|S=FnW(C`{`zIVW~n`&4nsY(bmi;n8?tO^>PKFYdgw(ajU)S zh0Mj|!l7^ZYUfXa9!#0U7*wiFmw1)Z{);&-umua$oggwIp+2@wZuhx7Hk$0C|{nw1VvySdaRCkxhh(jfCCuw}v*8KsZz)#r7WRSP3 z#ffJJSVGpj)WoHhjcYL6!d;ZKjb=jZCGR7IsFc(NdE1`^#7I4 z&a2`$v4OscY~^;yDF+_>F*6ZvCNPl-$pIY_Ygzuy>xDMY$~xwI@=6>$(kNbg8_1*x z4X&fAp@JqQ(oZPTs3cNPu0PhhSxhn00!rAaY(G8Fddt_r8^va>g_S$Me^P_!L*v?w zB9G7X6AcE}$-=!t%+hwaErhP4*c(|j%RZ0aJ}Vm(ui6}bXmEPL`#{~7ds9WG<8ToY zT2_vkSf5*U^wi{085d3e%p1kxTq2Pgu;0!`)>wNBS=f}<8vnLbp#92@Ruvj?*q@Qs zIi4Lx(n~^R@~VtkWRhWEN!9J+5F>7Ckh=lghuG*aG%{+CvFT^eIKt{2H zT!hK+F(8UwCFP}OZs(}&MnS#+ByHpCNWrQ>%|mg*Lm!62NS0IaiWYB=qYIxeY!cKg z1202Qp2|j5nbZy_E>+zXP+#D%qzS~n%j8au`T*XKD?%ny4h01o%@Z5g61S=eW2#3v zezWStapS4UT_$|hIYE+CBaYk&FHx2`bgFDfA`cgTCq5gp?Y+F|mR3j@{9R_4C1z}{ z#0M&0$ihhQchcbfY8`eBD@`>NM3HK+`1qhP&%zT?wD_6Zd;6IQ)3*M4@wg-Hjy-F$ z`)jP?7J)5$jpUjey5BVi>XN7^uO(VxAo#Ag(m!Rg%5%mYP5_@q0|gfvCnXsL6qW)Ioz9q;{PX z8A1A#?cQng=T2~*IZILzO=VrLrZF26#A^yue59qD<*nIa09fIOP3;7JNi{qeY%=NK zEe~rPt`=v8cz1pdIHa&1W{p0+0c-+xmoHr z!I@8)=RvJFjLyq7@%m+>83Rq@-%W)qLPjia&^L&JL`zkrA>AU Date: Thu, 11 Mar 2021 12:16:49 -0800 Subject: [PATCH 3/4] added missing license headers --- .../PrometheusMetricsConfigurationTest.java | 12 ++++++++++++ .../db/migration/V7__new_queue_message_pk.sql | 16 ++++++++++++++++ .../resources/db/migration/V8__update_pk.sql | 16 ++++++++++++++++ .../V5__new_queue_message_pk.sql | 16 ++++++++++++++++ .../db/migration_postgres/V6__update_pk.sql | 16 ++++++++++++++++ 5 files changed, 76 insertions(+) diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/metrics/PrometheusMetricsConfigurationTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/metrics/PrometheusMetricsConfigurationTest.java index 6e6de3d047..b95f21dd54 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/metrics/PrometheusMetricsConfigurationTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/metrics/PrometheusMetricsConfigurationTest.java @@ -1,3 +1,15 @@ +/* + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package com.netflix.conductor.contribs.metrics; import java.lang.reflect.Field; diff --git a/mysql-persistence/src/main/resources/db/migration/V7__new_queue_message_pk.sql b/mysql-persistence/src/main/resources/db/migration/V7__new_queue_message_pk.sql index 641c2d9595..be18397f8c 100644 --- a/mysql-persistence/src/main/resources/db/migration/V7__new_queue_message_pk.sql +++ b/mysql-persistence/src/main/resources/db/migration/V7__new_queue_message_pk.sql @@ -1,3 +1,19 @@ +-- +-- Copyright 2021 Netflix, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + # no longer need separate index if pk is queue_name, message_id SET @idx_exists := (SELECT COUNT(INDEX_NAME) FROM information_schema.STATISTICS diff --git a/mysql-persistence/src/main/resources/db/migration/V8__update_pk.sql b/mysql-persistence/src/main/resources/db/migration/V8__update_pk.sql index f1ed4f7ad7..9b6f53c7a4 100644 --- a/mysql-persistence/src/main/resources/db/migration/V8__update_pk.sql +++ b/mysql-persistence/src/main/resources/db/migration/V8__update_pk.sql @@ -1,3 +1,19 @@ +-- +-- Copyright 2021 Netflix, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + DELIMITER $$ DROP PROCEDURE IF EXISTS `DropIndexIfExists`$$ CREATE PROCEDURE `DropIndexIfExists`(IN tableName VARCHAR(128), IN indexName VARCHAR(128)) diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V5__new_queue_message_pk.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V5__new_queue_message_pk.sql index 97e1e2a2fa..de0fc680d3 100644 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V5__new_queue_message_pk.sql +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V5__new_queue_message_pk.sql @@ -1,3 +1,19 @@ +-- +-- Copyright 2021 Netflix, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + -- no longer need separate index if pk is queue_name, message_id DROP INDEX IF EXISTS unique_queue_name_message_id; diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V6__update_pk.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V6__update_pk.sql index 3244598ae3..3d7d6cd239 100644 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V6__update_pk.sql +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V6__update_pk.sql @@ -1,3 +1,19 @@ +-- +-- Copyright 2021 Netflix, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + -- 1) queue_message DROP INDEX IF EXISTS unique_queue_name_message_id; ALTER TABLE queue_message DROP CONSTRAINT IF EXISTS queue_message_pkey; From 8b46925c61ea430d4b5f5b9a19350196ba43fae5 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 11 Mar 2021 15:00:56 -0800 Subject: [PATCH 4/4] removed unused method from IndexDAO --- CHANGELOG.md | 2 +- .../contribs/dao/index/NoopIndexDAO.java | 5 -- .../queue/amqp/AMQPObservableQueue.java | 21 ++------- .../queue/nats/NATSAbstractQueue.java | 32 +++---------- .../queue/sqs/SQSObservableQueue.java | 33 ++++--------- .../core/LifecycleAwareComponent.java | 34 ++++++++++++++ .../core/config/ConductorProperties.java | 2 +- .../queue/ConductorObservableQueue.java | 22 ++------- .../core/events/queue/ObservableQueue.java | 2 +- .../core/execution/WorkflowSweeper.java | 31 ++++-------- .../tasks/SystemTaskWorkerCoordinator.java | 47 +++++++------------ .../com/netflix/conductor/dao/IndexDAO.java | 9 ---- .../core/events/MockObservableQueue.java | 13 ----- .../es6/dao/index/ElasticSearchDAOV6.java | 19 -------- .../es6/dao/index/ElasticSearchRestDAOV6.java | 21 --------- .../es6/dao/index/TestElasticSearchDAOV6.java | 26 ---------- .../dao/index/TestElasticSearchRestDAOV6.java | 26 ---------- 17 files changed, 83 insertions(+), 262 deletions(-) create mode 100644 core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bf6d143d0..abea8d1ed1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,7 +131,7 @@ Modified properties in the `core` module: | task.queue.message.postponeSeconds | conductor.app.taskExecutionPostponeDuration | 60s | | workflow.taskExecLog.indexing.enabled | conductor.app.taskExecLogIndexingEnabled | true | | async.indexing.enabled | conductor.app.asyncIndexingEnabled | false | -| workflow.system.task.worker.thread.count | conductor.app.systemTaskWorkerThreadCount | 10 | +| workflow.system.task.worker.thread.count | conductor.app.systemTaskWorkerThreadCount | # available processors * 2 | | workflow.system.task.worker.callback.seconds | conductor.app.systemTaskWorkerCallbackDuration | 30s | | workflow.system.task.worker.poll.interval | conductor.app.systemTaskWorkerPollInterval | 50s | | workflow.system.task.worker.executionNameSpace | conductor.app.systemTaskWorkerExecutionNamespace | "" | diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java index 7e8103f6a3..58f95ddec1 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java @@ -138,9 +138,4 @@ public List getMessages(String queue) { public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { return Collections.emptyList(); } - - @Override - public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { - return Collections.emptyList(); - } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 3bb2d32916..5ead95fc4a 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -16,6 +16,7 @@ import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties; import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.metrics.Monitors; @@ -50,7 +51,7 @@ /** * @author Ritu Parathody */ -public class AMQPObservableQueue implements ObservableQueue { +public class AMQPObservableQueue extends LifecycleAwareComponent implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class); @@ -65,7 +66,6 @@ public class AMQPObservableQueue implements ObservableQueue { private Channel channel; private final Address[] addresses; protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); - private final AtomicBoolean running = new AtomicBoolean(); public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange, AMQPSettings settings, int batchSize, int pollTimeInMS) { @@ -132,7 +132,7 @@ public Observable observe() { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { if (!isRunning()) { - LOGGER.debug("Instance disabled, skip listening for messages from RabbitMQ"); + LOGGER.debug("Component stopped, skip listening for messages from RabbitMQ"); return Observable.from(Collections.emptyList()); } else { List available = new LinkedList<>(); @@ -269,21 +269,6 @@ public void close() { closeConnection(); } - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } - public static class Builder { private final Address[] addresses; diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java index 1e2ad76814..3e1175e741 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java @@ -12,15 +12,10 @@ */ package com.netflix.conductor.contribs.queue.nats; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import io.nats.client.NUID; -import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Scheduler; - import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -31,11 +26,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Scheduler; /** * @author Oleksiy Lysak */ -public abstract class NATSAbstractQueue implements ObservableQueue { +public abstract class NATSAbstractQueue extends LifecycleAwareComponent implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(NATSAbstractQueue.class); protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); @@ -52,8 +51,6 @@ public abstract class NATSAbstractQueue implements ObservableQueue { private boolean observable; private boolean isOpened; - private final AtomicBoolean running = new AtomicBoolean(); - NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) { this.queueURI = queueURI; this.queueType = queueType; @@ -97,7 +94,7 @@ public Observable observe() { Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { if (!isRunning()) { - LOGGER.debug("Instance disabled, skip listening for messages from NATS Queue"); + LOGGER.debug("Component stopped, skip listening for messages from NATS Queue"); return Observable.from(Collections.emptyList()); } else { List available = new LinkedList<>(); @@ -261,19 +258,4 @@ void ensureConnected() { abstract void closeSubs(); abstract void closeConn(); - - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java index 5360e11ccf..9daf9bdb31 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java @@ -37,16 +37,10 @@ import com.amazonaws.services.sqs.model.SendMessageBatchResult; import com.amazonaws.services.sqs.model.SetQueueAttributesResult; import com.google.common.annotations.VisibleForTesting; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.metrics.Monitors; -import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Scheduler; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,8 +49,13 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Scheduler; -public class SQSObservableQueue implements ObservableQueue { +public class SQSObservableQueue extends LifecycleAwareComponent implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(SQSObservableQueue.class); private static final String QUEUE_TYPE = "sqs"; @@ -68,7 +67,6 @@ public class SQSObservableQueue implements ObservableQueue { private final long pollTimeInMS; private final String queueURL; private final Scheduler scheduler; - private final AtomicBoolean running = new AtomicBoolean(); private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize, long pollTimeInMS, List accountsToAuthorize, Scheduler scheduler) { @@ -145,21 +143,6 @@ public int getVisibilityTimeoutInSeconds() { return visibilityTimeoutInSeconds; } - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } - public static class Builder { private String queueName; @@ -314,7 +297,7 @@ OnSubscribe getOnSubscribe() { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { if (!isRunning()) { - LOGGER.debug("Instance disabled, skip listening for messages from SQS"); + LOGGER.debug("Component stopped, skip listening for messages from SQS"); return Observable.from(Collections.emptyList()); } List messages = receiveMessages(); diff --git a/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java new file mode 100644 index 0000000000..f21bb47bd1 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core; + +import org.springframework.context.SmartLifecycle; + +public class LifecycleAwareComponent implements SmartLifecycle { + private volatile boolean running = false; + + @Override + public void start() { + running = true; + } + + @Override + public void stop() { + running = false; + } + + @Override + public boolean isRunning() { + return running; + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 808b9b6108..e3b30f178e 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -108,7 +108,7 @@ public class ConductorProperties { /** * The number of threads to be used within the threadpool for system task workers. */ - private int systemTaskWorkerThreadCount = 10; + private int systemTaskWorkerThreadCount = Runtime.getRuntime().availableProcessors() * 2; /** * The interval (in seconds) after which a system task will be checked by the system task worker for completion. diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java index 60e96a4fa5..cf00881d0b 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java @@ -12,6 +12,7 @@ */ package com.netflix.conductor.core.events.queue; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; @@ -19,7 +20,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ /** * An {@link ObservableQueue} implementation using the underlying {@link QueueDAO} implementation. */ -public class ConductorObservableQueue implements ObservableQueue { +public class ConductorObservableQueue extends LifecycleAwareComponent implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(ConductorObservableQueue.class); @@ -42,7 +42,6 @@ public class ConductorObservableQueue implements ObservableQueue { private final int longPollTimeout; private final int pollCount; private final Scheduler scheduler; - private final AtomicBoolean running = new AtomicBoolean(); ConductorObservableQueue(String queueName, QueueDAO queueDAO, ConductorProperties properties, Scheduler scheduler) { this.queueName = queueName; @@ -114,7 +113,7 @@ private OnSubscribe getOnSubscribe() { Observable interval = Observable.interval(pollTimeMS, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { if (!isRunning()) { - LOGGER.debug("Instance disabled, skip listening for messages from Conductor Queue"); + LOGGER.debug("Component stopped, skip listening for messages from Conductor Queue"); return Observable.from(Collections.emptyList()); } List messages = receiveMessages(); @@ -122,19 +121,4 @@ private OnSubscribe getOnSubscribe() { }).subscribe(subscriber::onNext, subscriber::onError); }; } - - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java index faa4082826..707f85cee2 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java @@ -17,7 +17,7 @@ import java.util.List; -public interface ObservableQueue extends SmartLifecycle { +public interface ObservableQueue { /** * @return An observable for the given queue diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java index c42c037598..b05e06a2ae 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java @@ -12,6 +12,7 @@ */ package com.netflix.conductor.core.execution; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.exception.ApplicationException; @@ -24,18 +25,16 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @ConditionalOnProperty(name = "conductor.workflow-sweeper.enabled", havingValue = "true", matchIfMissing = true) -public class WorkflowSweeper implements SmartLifecycle { +public class WorkflowSweeper extends LifecycleAwareComponent { private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowSweeper.class); @@ -43,7 +42,6 @@ public class WorkflowSweeper implements SmartLifecycle { private final ConductorProperties properties; private final QueueDAO queueDAO; private final int executorThreadPoolSize; - private final AtomicBoolean running = new AtomicBoolean(); private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); @@ -52,7 +50,11 @@ public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService ConductorProperties properties, QueueDAO queueDAO) { this.properties = properties; this.queueDAO = queueDAO; - this.executorThreadPoolSize = properties.getSweeperThreadCount() > 0 ? properties.getSweeperThreadCount() : 1; + this.executorThreadPoolSize = properties.getSweeperThreadCount(); + if (executorThreadPoolSize <= 0) { + throw new IllegalStateException("Cannot set workflow sweeper thread count to <=0. To disable workflow " + + "sweeper, set conductor.workflow-sweeper.enabled=false."); + } this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize); init(workflowExecutor, workflowRepairService); LOGGER.info("Workflow Sweeper Initialized"); @@ -61,8 +63,8 @@ public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService public void init(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) { ScheduledExecutorService deciderPool = Executors.newScheduledThreadPool(1); deciderPool.scheduleWithFixedDelay(() -> { - if (!running.get()) { - LOGGER.debug("Instance disabled, skip workflow sweep"); + if (!isRunning()) { + LOGGER.debug("Component stopped, skip workflow sweep"); } else { try { int currentQueueSize = queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE); @@ -129,19 +131,4 @@ public void sweep(List workflowIds, WorkflowExecutor workflowExecutor, future.get(); } } - - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index 4b82aa4e7e..0d3648a158 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -13,22 +13,13 @@ package com.netflix.conductor.core.execution.tasks; import com.google.common.annotations.VisibleForTesting; +import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionService; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.SmartLifecycle; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - import java.util.HashSet; import java.util.List; import java.util.Map; @@ -39,11 +30,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @ConditionalOnProperty(name="conductor.system-task-workers.enabled", havingValue = "true", matchIfMissing = true) -public class SystemTaskWorkerCoordinator implements SmartLifecycle { +public class SystemTaskWorkerCoordinator extends LifecycleAwareComponent { private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class); @@ -63,7 +61,6 @@ public class SystemTaskWorkerCoordinator implements SmartLifecycle { private final QueueDAO queueDAO; private final WorkflowExecutor workflowExecutor; private final ExecutionService executionService; - private final AtomicBoolean running = new AtomicBoolean(); public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, ConductorProperties properties, @@ -80,8 +77,11 @@ public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowE @EventListener(ApplicationReadyEvent.class) public void initSystemTaskExecutor() { - int threadCount = - properties.getSystemTaskWorkerThreadCount() > 0 ? properties.getSystemTaskWorkerThreadCount() : 1; + int threadCount = properties.getSystemTaskWorkerThreadCount(); + if (threadCount <= 0) { + throw new IllegalStateException("Cannot set system task worker thread count to <=0. To disable system " + + "task workers, set conductor.system-task-workers.enabled=false."); + } this.workflowSystemTasks.forEach(this::add); this.systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); new Thread(this::listen).start(); @@ -117,8 +117,8 @@ private void listen(String queueName) { } private void pollAndExecute(String queueName) { - if (!running.get()) { - LOGGER.debug("System Task Worker is DISABLED. Not polling for system task in queue : {}", queueName); + if (isRunning()) { + LOGGER.debug("Component stopped. Not polling for system task in queue : {}", queueName); return; } systemTaskExecutor.pollAndExecute(queueName); @@ -144,19 +144,4 @@ boolean isAsyncSystemTask(String queue) { } return false; } - - @Override - public void start() { - running.set(true); - } - - @Override - public void stop() { - running.set(false); - } - - @Override - public boolean isRunning() { - return running.get(); - } } diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index dbba5ba554..4812e59328 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -177,13 +177,4 @@ public interface IndexDAO { * @return List of worlflow Ids matching the pattern */ List searchArchivableWorkflows(String indexName, long archiveTtlDays); - - /** - * Search for RUNNING workflows changed in the last lastModifiedHoursAgoFrom to lastModifiedHoursAgoTo hours - * @param lastModifiedHoursAgoFrom - last updated date should be lastModifiedHoursAgoFrom hours ago or later - * @param lastModifiedHoursAgoTo - last updated date should be lastModifiedHoursAgoTo hours ago or earlier - * * - * @return List of workflow Ids matching the pattern - */ - List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo); } diff --git a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java index 490a3b26a7..3cf3f0ea16 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java +++ b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java @@ -77,17 +77,4 @@ public long size() { public String toString() { return "MockObservableQueue [uri=" + uri + ", name=" + name + ", type=" + type + "]"; } - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public boolean isRunning() { - return false; - } } diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java index e581c601fa..d7e14e8c5c 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java @@ -752,25 +752,6 @@ public List searchArchivableWorkflows(String indexName, long archiveTtlD return extractSearchIds(s); } - public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { - DateTime dateTime = new DateTime(); - QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("updateTime") - .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) - .must(QueryBuilders.rangeQuery("updateTime") - .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) - .must(QueryBuilders.termQuery("status", "RUNNING")); - - String docType = StringUtils.isBlank(docTypeOverride) ? WORKFLOW_DOC_TYPE : docTypeOverride; - SearchRequestBuilder s = elasticSearchClient.prepareSearch(workflowIndexName) - .setTypes(docType) - .setQuery(q) - .setSize(5000) - .addSort("updateTime", SortOrder.ASC); - - return extractSearchIds(s); - } - private UpdateRequest buildUpdateRequest(String id, byte[] doc, String indexName, String docType) { UpdateRequest req = new UpdateRequest(indexName, docType, id); req.doc(doc, XContentType.JSON); diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java index 8d2f61856f..9bcad334d0 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java @@ -887,27 +887,6 @@ public List searchArchivableWorkflows(String indexName, long archiveTtlD return workflowIds.getResults(); } - public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { - DateTime dateTime = new DateTime(); - QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("updateTime") - .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) - .must(QueryBuilders.rangeQuery("updateTime") - .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) - .must(QueryBuilders.termQuery("status", "RUNNING")); - - SearchResult workflowIds; - try { - workflowIds = searchObjectIds(workflowIndexName, q, 0, 5000, Collections.singletonList("updateTime:ASC"), - WORKFLOW_DOC_TYPE); - } catch (IOException e) { - LOGGER.error("Unable to communicate with ES to find recent running workflows", e); - return Collections.emptyList(); - } - - return workflowIds.getResults(); - } - private void indexObject(final String index, final String docType, final Object doc) { indexObject(index, docType, null, doc); } diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java index 9ce678cf86..e4832ba2b4 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java @@ -303,32 +303,6 @@ public void shouldAddIndexPrefixToIndexTemplate() throws Exception { assertEquals(json, content); } - @Test - public void shouldSearchRecentRunningWorkflows() throws Exception { - Workflow oldWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - oldWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - oldWorkflow.setUpdateTime(new DateTime().minusHours(2).toDate().getTime()); - - Workflow recentWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - recentWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - recentWorkflow.setUpdateTime(new DateTime().minusHours(1).toDate().getTime()); - - Workflow tooRecentWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - tooRecentWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - tooRecentWorkflow.setUpdateTime(new DateTime().toDate().getTime()); - - indexDAO.indexWorkflow(oldWorkflow); - indexDAO.indexWorkflow(recentWorkflow); - indexDAO.indexWorkflow(tooRecentWorkflow); - - Thread.sleep(1000); - - List ids = indexDAO.searchRecentRunningWorkflows(2, 1); - - assertEquals(1, ids.size()); - assertEquals(recentWorkflow.getWorkflowId(), ids.get(0)); - } - private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) { assertEquals(summary.getWorkflowType(), indexDAO.get(workflowId, "workflowType")); assertEquals(String.valueOf(summary.getVersion()), indexDAO.get(workflowId, "version")); diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java index 92ff756a9b..6316c6defd 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java @@ -282,32 +282,6 @@ public void shouldAddIndexPrefixToIndexTemplate() throws Exception { assertEquals(json, content); } - @Test - public void shouldSearchRecentRunningWorkflows() throws Exception { - Workflow oldWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - oldWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - oldWorkflow.setUpdateTime(new DateTime().minusHours(2).toDate().getTime()); - - Workflow recentWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - recentWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - recentWorkflow.setUpdateTime(new DateTime().minusHours(1).toDate().getTime()); - - Workflow tooRecentWorkflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow"); - tooRecentWorkflow.setStatus(Workflow.WorkflowStatus.RUNNING); - tooRecentWorkflow.setUpdateTime(new DateTime().toDate().getTime()); - - indexDAO.indexWorkflow(oldWorkflow); - indexDAO.indexWorkflow(recentWorkflow); - indexDAO.indexWorkflow(tooRecentWorkflow); - - Thread.sleep(1000); - - List ids = indexDAO.searchRecentRunningWorkflows(2, 1); - - assertEquals(1, ids.size()); - assertEquals(recentWorkflow.getWorkflowId(), ids.get(0)); - } - private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) { assertEquals(summary.getWorkflowType(), indexDAO.get(workflowId, "workflowType")); assertEquals(String.valueOf(summary.getVersion()), indexDAO.get(workflowId, "version"));