From 9465a46e65f420cfcf291ef6be0d1215f42bd6c2 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 27 Mar 2023 13:18:04 -0700 Subject: [PATCH 1/2] support task domains in the worker spring configuration --- .../client/spring/SpringWorkerConfiguration.java | 6 ++++++ .../src/test/resources/application.properties | 3 ++- .../executor/task/AnnotatedWorkerExecutor.java | 14 ++++++++++++++ .../executor/task/WorkerConfiguration.java | 4 ++++ .../conductor/sdk/workflow/task/WorkerTask.java | 2 ++ 5 files changed, 28 insertions(+), 1 deletion(-) diff --git a/client-spring/src/main/java/com/netflix/conductor/client/spring/SpringWorkerConfiguration.java b/client-spring/src/main/java/com/netflix/conductor/client/spring/SpringWorkerConfiguration.java index e1c7bc6647..6d44e779c7 100644 --- a/client-spring/src/main/java/com/netflix/conductor/client/spring/SpringWorkerConfiguration.java +++ b/client-spring/src/main/java/com/netflix/conductor/client/spring/SpringWorkerConfiguration.java @@ -35,4 +35,10 @@ public int getThreadCount(String taskName) { String key = "conductor.worker." + taskName + ".threadCount"; return environment.getProperty(key, Integer.class, 0); } + + @Override + public String getDomain(String taskName) { + String key = "conductor.worker." + taskName + ".domain"; + return environment.getProperty(key, String.class, null); + } } diff --git a/client-spring/src/test/resources/application.properties b/client-spring/src/test/resources/application.properties index c2e163cda7..65c47dde42 100644 --- a/client-spring/src/test/resources/application.properties +++ b/client-spring/src/test/resources/application.properties @@ -1,2 +1,3 @@ conductor.client.rootUri=http://localhost:8080/api/ -conductor.worker.hello.threadCount=100 \ No newline at end of file +conductor.worker.hello.threadCount=100 +conductor.worker.hello_again.domain=test \ No newline at end of file diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java index a72cda27b0..cb9ccc548c 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java @@ -15,6 +15,8 @@ import java.lang.reflect.Method; import java.util.*; +import com.google.common.base.Strings; +import com.netflix.conductor.common.utils.EnvUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,8 @@ public class AnnotatedWorkerExecutor { private Map workerToPollingInterval = new HashMap<>(); + private Map workerDomains = new HashMap<>(); + private Map workerClassObjs = new HashMap<>(); private static Set scannedPackages = new HashSet<>(); @@ -163,6 +167,14 @@ private void addMethod(WorkerTask annotation, Method method, Object bean) { } workerToPollingInterval.put(name, pollingInterval); + String domain = workerConfiguration.getDomain(name); + if(Strings.isNullOrEmpty(domain)) { + domain = annotation.domain(); + } + if(!Strings.isNullOrEmpty(domain)) { + workerDomains.put(name, domain); + } + workerClassObjs.put(name, bean); workerExecutors.put(name, method); LOGGER.info( @@ -187,10 +199,12 @@ public void startPolling() { } LOGGER.info("Starting workers with threadCount {}", workerToThreadCount); + LOGGER.info("Worker domains {}", workerDomains); taskRunner = new TaskRunnerConfigurer.Builder(taskClient, executors) .withTaskThreadCount(workerToThreadCount) + .withTaskToDomain(workerDomains) .build(); taskRunner.init(); diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java index 81b3032c80..61e01c0a04 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/WorkerConfiguration.java @@ -29,4 +29,8 @@ public int getPollingInterval(String taskName) { public int getThreadCount(String taskName) { return 0; } + + public String getDomain(String taskName) { + return null; + } } diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/task/WorkerTask.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/task/WorkerTask.java index 032f55c208..409b8e74fc 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/task/WorkerTask.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/task/WorkerTask.java @@ -27,4 +27,6 @@ int threadCount() default 1; int pollingInterval() default 100; + + String domain() default ""; } From 544379834a8c15bd395dbb9f3c5c5bb19cc94fd9 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 27 Mar 2023 13:34:42 -0700 Subject: [PATCH 2/2] apply formatting --- .../workflow/executor/task/AnnotatedWorkerExecutor.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java index cb9ccc548c..4116098647 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java @@ -15,8 +15,6 @@ import java.lang.reflect.Method; import java.util.*; -import com.google.common.base.Strings; -import com.netflix.conductor.common.utils.EnvUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +24,7 @@ import com.netflix.conductor.sdk.workflow.task.WorkerTask; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.reflect.ClassPath; public class AnnotatedWorkerExecutor { @@ -168,10 +167,10 @@ private void addMethod(WorkerTask annotation, Method method, Object bean) { workerToPollingInterval.put(name, pollingInterval); String domain = workerConfiguration.getDomain(name); - if(Strings.isNullOrEmpty(domain)) { + if (Strings.isNullOrEmpty(domain)) { domain = annotation.domain(); } - if(!Strings.isNullOrEmpty(domain)) { + if (!Strings.isNullOrEmpty(domain)) { workerDomains.put(name, domain); }