diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java index 8190f8390..81c2ec474 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -30,6 +31,7 @@ import com.netflix.conductor.client.http.ConductorClient; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.client.metrics.MetricsCollector; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; @@ -79,7 +81,8 @@ public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) { public TaskRunnerConfigurer taskRunnerConfigurer(Environment env, TaskClient taskClient, ClientProperties clientProperties, - List workers) { + List workers, + Optional metricsCollector) { Map taskThreadCount = new HashMap<>(); for (Worker worker : workers) { String key = "conductor.worker." + worker.getTaskDefName() + ".threadCount"; @@ -94,15 +97,16 @@ public TaskRunnerConfigurer taskRunnerConfigurer(Environment env, clientProperties.setTaskThreadCount(taskThreadCount); } - return new TaskRunnerConfigurer.Builder(taskClient, workers) + TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, workers) .withTaskThreadCount(clientProperties.getTaskThreadCount()) .withThreadCount(clientProperties.getThreadCount()) .withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis()) .withUpdateRetryCount(clientProperties.getUpdateRetryCount()) .withTaskToDomain(clientProperties.getTaskToDomain()) .withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds()) - .withTaskPollTimeout(clientProperties.getTaskPollTimeout()) - .build(); + .withTaskPollTimeout(clientProperties.getTaskPollTimeout()); + metricsCollector.ifPresent(builder::withMetricsCollector); + return builder.build(); } @Bean diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorWorkerAutoConfiguration.java b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorWorkerAutoConfiguration.java index f40b1017d..399708e8d 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorWorkerAutoConfiguration.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorWorkerAutoConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.metrics.MetricsCollector; import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration; @@ -44,8 +45,12 @@ public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) { ApplicationContext applicationContext = refreshedEvent.getApplicationContext(); Environment environment = applicationContext.getEnvironment(); WorkerConfiguration configuration = new SpringWorkerConfiguration(environment); - AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration); + AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration); + String[] beanNames = applicationContext.getBeanNamesForType(MetricsCollector.class); + if (beanNames.length > 0) { + annotatedWorkerExecutor.setMetricsCollector(applicationContext.getBean(MetricsCollector.class)); + } Map beans = applicationContext.getBeansWithAnnotation(Component.class); beans.values().forEach(annotatedWorkerExecutor::addBean); annotatedWorkerExecutor.startPolling(); diff --git a/conductor-clients/java/conductor-java-sdk/gradle.properties b/conductor-clients/java/conductor-java-sdk/gradle.properties index d1de052e7..dfecf49a9 100644 --- a/conductor-clients/java/conductor-java-sdk/gradle.properties +++ b/conductor-clients/java/conductor-java-sdk/gradle.properties @@ -1 +1 @@ -version=4.0.2-beta +version=4.0.4 diff --git a/conductor-clients/java/conductor-java-sdk/sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java b/conductor-clients/java/conductor-java-sdk/sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java index f1b34aa13..2125e0f58 100644 --- a/conductor-clients/java/conductor-java-sdk/sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java +++ b/conductor-clients/java/conductor-java-sdk/sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java @@ -26,6 +26,7 @@ import com.netflix.conductor.client.automator.TaskRunnerConfigurer; import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.metrics.MetricsCollector; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.sdk.workflow.task.WorkerTask; @@ -50,6 +51,8 @@ public class AnnotatedWorkerExecutor { protected Map workerDomains = new HashMap<>(); + private MetricsCollector metricsCollector; + private static final Set scannedPackages = new HashSet<>(); private final WorkerConfiguration workerConfiguration; @@ -197,12 +200,14 @@ public void startPolling() { LOGGER.info("Starting workers with threadCount {}", workerToThreadCount); LOGGER.info("Worker domains {}", workerDomains); - taskRunner = - new TaskRunnerConfigurer.Builder(taskClient, workers) - .withTaskThreadCount(workerToThreadCount) - .withTaskToDomain(workerDomains) - .build(); + var builder = new TaskRunnerConfigurer.Builder(taskClient, workers) + .withTaskThreadCount(workerToThreadCount) + .withTaskToDomain(workerDomains); + if (metricsCollector != null) { + builder.withMetricsCollector(metricsCollector); + } + taskRunner = builder.build(); taskRunner.init(); } @@ -215,4 +220,12 @@ List getWorkers() { TaskRunnerConfigurer getTaskRunner() { return taskRunner; } + + public MetricsCollector getMetricsCollector() { + return metricsCollector; + } + + public void setMetricsCollector(MetricsCollector metricsCollector) { + this.metricsCollector = metricsCollector; + } }