Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
[Feature] Support to specify shutdown grace period time for worker (#…
Browse files Browse the repository at this point in the history
…2631)

* [Feature] Support to specify shutdown grace period time for worker

* Update client doc and spring client for shutdownGracePeriodSeconds
  • Loading branch information
Howaric Kown authored Dec 20, 2021
1 parent c1d68dc commit a3001ee
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClientProperties {

private Map<String, String> taskToDomain = new HashMap<>();

private int shutdownGracePeriodSeconds = 10;

public String getRootUri() {
return rootUri;
}
Expand Down Expand Up @@ -80,4 +82,12 @@ public Map<String, String> getTaskToDomain() {
public void setTaskToDomain(Map<String, String> taskToDomain) {
this.taskToDomain = taskToDomain;
}

public int getShutdownGracePeriodSeconds() {
return shutdownGracePeriodSeconds;
}

public void setShutdownGracePeriodSeconds(int shutdownGracePeriodSeconds) {
this.shutdownGracePeriodSeconds = shutdownGracePeriodSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer(
.withSleepWhenRetry((int)clientProperties.getSleepWhenRetryDuration().toMillis())
.withUpdateRetryCount(clientProperties.getUpdateRetryCount())
.withTaskToDomain(clientProperties.getTaskToDomain())
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
.withEurekaClient(eurekaClient)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,9 @@ void pollAndExecute(Worker worker) {
}
}

void shutdown() {
shutdownExecutorService(executorService);
}

void shutdownExecutorService(ExecutorService executorService) {
int timeout = 10;
void shutdownExecutorService(ExecutorService executorService, int timeout) {
try {
executorService.shutdown();
if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
LOGGER.debug("tasks completed, shutting down");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TaskRunnerConfigurer {
private final int sleepWhenRetry;
private final int updateRetryCount;
private final int threadCount;
private final int shutdownGracePeriodSeconds;
private final String workerNamePrefix;
private final Map<String/*taskType*/, String/*domain*/> taskToDomain;

Expand All @@ -56,6 +57,7 @@ private TaskRunnerConfigurer(Builder builder) {
this.taskToDomain = builder.taskToDomain;
builder.workers.forEach(workers::add);
this.threadCount = (builder.threadCount == -1) ? workers.size() : builder.threadCount;
this.shutdownGracePeriodSeconds = builder.shutdownGracePeriodSeconds;
}

/**
Expand All @@ -67,6 +69,7 @@ public static class Builder {
private int sleepWhenRetry = 500;
private int updateRetryCount = 3;
private int threadCount = -1;
private int shutdownGracePeriodSeconds = 10;
private final Iterable<Worker> workers;
private EurekaClient eurekaClient;
private final TaskClient taskClient;
Expand Down Expand Up @@ -121,6 +124,18 @@ public Builder withThreadCount(int threadCount) {
return this;
}

/**
* @param shutdownGracePeriodSeconds waiting seconds before forcing shutdown of your worker
* @return Builder instance
*/
public Builder withShutdownGracePeriodSeconds(int shutdownGracePeriodSeconds) {
if (shutdownGracePeriodSeconds < 1) {
throw new IllegalArgumentException("Seconds of shutdownGracePeriod cannot be less than 1");
}
this.shutdownGracePeriodSeconds = shutdownGracePeriodSeconds;
return this;
}

/**
* @param eurekaClient Eureka client - used to identify if the server is in discovery or not. When the server
* goes out of discovery, the polling is terminated. If passed null, discovery check is not
Expand Down Expand Up @@ -156,6 +171,13 @@ public int getThreadCount() {
return threadCount;
}

/**
* @return seconds before forcing shutdown of worker
*/
public int getShutdownGracePeriodSeconds() {
return shutdownGracePeriodSeconds;
}

/**
* @return sleep time in millisecond before task update retry is done when receiving error from the Conductor server
*/
Expand Down Expand Up @@ -195,6 +217,6 @@ public synchronized void init() {
* worker, during process termination.
*/
public void shutdown() {
taskPollExecutor.shutdownExecutorService(scheduledExecutorService);
taskPollExecutor.shutdownExecutorService(scheduledExecutorService, shutdownGracePeriodSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ public void testThreadPool() {
assertEquals(3, configurer.getThreadCount());
assertEquals(500, configurer.getSleepWhenRetry());
assertEquals(3, configurer.getUpdateRetryCount());
assertEquals(10, configurer.getShutdownGracePeriodSeconds());

configurer = new TaskRunnerConfigurer.Builder(new TaskClient(), Collections.singletonList(worker))
.withThreadCount(100)
.withSleepWhenRetry(100)
.withUpdateRetryCount(10)
.withShutdownGracePeriodSeconds(15)
.withWorkerNamePrefix("test-worker-")
.build();
assertEquals(100, configurer.getThreadCount());
configurer.init();
assertEquals(100, configurer.getThreadCount());
assertEquals(100, configurer.getSleepWhenRetry());
assertEquals(10, configurer.getUpdateRetryCount());
assertEquals(15, configurer.getShutdownGracePeriodSeconds());
assertEquals("test-worker-", configurer.getWorkerNamePrefix());
}

Expand Down
1 change: 1 addition & 0 deletions docs/docs/gettingstarted/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Initialize the Builder with the following:
| withSleepWhenRetry | Time in milliseconds, for which the thread should sleep when task update call fails, before retrying the operation. | 500 |
| withUpdateRetryCount | Number of attempts to be made when updating task status when update status call fails. | 3 |
| withWorkerNamePrefix | String prefix that will be used for all the workers. | workflow-worker- |
| withShutdownGracePeriodSeconds | Waiting seconds before forcing shutdown of your worker | 10 |

Once an instance is created, call `init()` method to initialize the TaskPollExecutor and begin the polling and execution of tasks.

Expand Down

0 comments on commit a3001ee

Please sign in to comment.