diff --git a/client-spring/src/main/java/com/netflix/conductor/client/spring/ClientProperties.java b/client-spring/src/main/java/com/netflix/conductor/client/spring/ClientProperties.java index 5ca9fbdf0d..b53c8c1079 100644 --- a/client-spring/src/main/java/com/netflix/conductor/client/spring/ClientProperties.java +++ b/client-spring/src/main/java/com/netflix/conductor/client/spring/ClientProperties.java @@ -33,6 +33,8 @@ public class ClientProperties { private Map taskToDomain = new HashMap<>(); + private int shutdownGracePeriodSeconds = 10; + public String getRootUri() { return rootUri; } @@ -80,4 +82,12 @@ public Map getTaskToDomain() { public void setTaskToDomain(Map taskToDomain) { this.taskToDomain = taskToDomain; } + + public int getShutdownGracePeriodSeconds() { + return shutdownGracePeriodSeconds; + } + + public void setShutdownGracePeriodSeconds(int shutdownGracePeriodSeconds) { + this.shutdownGracePeriodSeconds = shutdownGracePeriodSeconds; + } } diff --git a/client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java b/client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java index 7d3610222e..2effe90901 100644 --- a/client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java +++ b/client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java @@ -58,6 +58,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer( .withSleepWhenRetry((int)clientProperties.getSleepWhenRetryDuration().toMillis()) .withUpdateRetryCount(clientProperties.getUpdateRetryCount()) .withTaskToDomain(clientProperties.getTaskToDomain()) + .withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds()) .withEurekaClient(eurekaClient) .build(); } diff --git a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java index 1feeb2882b..ed54841f0e 100644 --- a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java +++ b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java @@ -126,13 +126,9 @@ void pollAndExecute(Worker worker) { } } - void shutdown() { - shutdownExecutorService(executorService); - } - - void shutdownExecutorService(ExecutorService executorService) { - int timeout = 10; + void shutdownExecutorService(ExecutorService executorService, int timeout) { try { + executorService.shutdown(); if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { LOGGER.debug("tasks completed, shutting down"); } else { diff --git a/client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java b/client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java index 3956330fb4..3519bf019f 100644 --- a/client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java +++ b/client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java @@ -38,6 +38,7 @@ public class TaskRunnerConfigurer { private final int sleepWhenRetry; private final int updateRetryCount; private final int threadCount; + private final int shutdownGracePeriodSeconds; private final String workerNamePrefix; private final Map taskToDomain; @@ -56,6 +57,7 @@ private TaskRunnerConfigurer(Builder builder) { this.taskToDomain = builder.taskToDomain; builder.workers.forEach(workers::add); this.threadCount = (builder.threadCount == -1) ? workers.size() : builder.threadCount; + this.shutdownGracePeriodSeconds = builder.shutdownGracePeriodSeconds; } /** @@ -67,6 +69,7 @@ public static class Builder { private int sleepWhenRetry = 500; private int updateRetryCount = 3; private int threadCount = -1; + private int shutdownGracePeriodSeconds = 10; private final Iterable workers; private EurekaClient eurekaClient; private final TaskClient taskClient; @@ -121,6 +124,18 @@ public Builder withThreadCount(int threadCount) { return this; } + /** + * @param shutdownGracePeriodSeconds waiting seconds before forcing shutdown of your worker + * @return Builder instance + */ + public Builder withShutdownGracePeriodSeconds(int shutdownGracePeriodSeconds) { + if (shutdownGracePeriodSeconds < 1) { + throw new IllegalArgumentException("Seconds of shutdownGracePeriod cannot be less than 1"); + } + this.shutdownGracePeriodSeconds = shutdownGracePeriodSeconds; + return this; + } + /** * @param eurekaClient Eureka client - used to identify if the server is in discovery or not. When the server * goes out of discovery, the polling is terminated. If passed null, discovery check is not @@ -156,6 +171,13 @@ public int getThreadCount() { return threadCount; } + /** + * @return seconds before forcing shutdown of worker + */ + public int getShutdownGracePeriodSeconds() { + return shutdownGracePeriodSeconds; + } + /** * @return sleep time in millisecond before task update retry is done when receiving error from the Conductor server */ @@ -195,6 +217,6 @@ public synchronized void init() { * worker, during process termination. */ public void shutdown() { - taskPollExecutor.shutdownExecutorService(scheduledExecutorService); + taskPollExecutor.shutdownExecutorService(scheduledExecutorService, shutdownGracePeriodSeconds); } } diff --git a/client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java b/client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java index 170de338dc..efb1d9ef84 100644 --- a/client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java +++ b/client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java @@ -53,11 +53,13 @@ public void testThreadPool() { assertEquals(3, configurer.getThreadCount()); assertEquals(500, configurer.getSleepWhenRetry()); assertEquals(3, configurer.getUpdateRetryCount()); + assertEquals(10, configurer.getShutdownGracePeriodSeconds()); configurer = new TaskRunnerConfigurer.Builder(new TaskClient(), Collections.singletonList(worker)) .withThreadCount(100) .withSleepWhenRetry(100) .withUpdateRetryCount(10) + .withShutdownGracePeriodSeconds(15) .withWorkerNamePrefix("test-worker-") .build(); assertEquals(100, configurer.getThreadCount()); @@ -65,6 +67,7 @@ public void testThreadPool() { assertEquals(100, configurer.getThreadCount()); assertEquals(100, configurer.getSleepWhenRetry()); assertEquals(10, configurer.getUpdateRetryCount()); + assertEquals(15, configurer.getShutdownGracePeriodSeconds()); assertEquals("test-worker-", configurer.getWorkerNamePrefix()); } diff --git a/docs/docs/gettingstarted/client.md b/docs/docs/gettingstarted/client.md index 25ad301248..a70a6135a1 100644 --- a/docs/docs/gettingstarted/client.md +++ b/docs/docs/gettingstarted/client.md @@ -34,6 +34,7 @@ Initialize the Builder with the following: | withSleepWhenRetry | Time in milliseconds, for which the thread should sleep when task update call fails, before retrying the operation. | 500 | | withUpdateRetryCount | Number of attempts to be made when updating task status when update status call fails. | 3 | | withWorkerNamePrefix | String prefix that will be used for all the workers. | workflow-worker- | +| withShutdownGracePeriodSeconds | Waiting seconds before forcing shutdown of your worker | 10 | Once an instance is created, call `init()` method to initialize the TaskPollExecutor and begin the polling and execution of tasks.