diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d5842e9b23..4012c1e57b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1415,6 +1415,10 @@ boolean scheduleTask(Workflow workflow, List tasks) { } } + // metric to track the distribution of number of tasks within a workflow + Monitors.recordNumTasksInWorkflow(workflow.getTasks().size() + tasks.size(), workflow.getWorkflowName(), + String.valueOf(workflow.getWorkflowVersion())); + // Save the tasks in the DAO createdTasks = executionDAOFacade.createTasks(tasks); diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 8f3a4055df..b224f8e95c 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -18,6 +18,7 @@ import com.netflix.servo.monitor.BasicStopwatch; import com.netflix.servo.monitor.Stopwatch; import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Gauge; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; @@ -43,6 +44,8 @@ public class Monitors { private static final Map, Gauge>> gauges = new ConcurrentHashMap<>(); + private static final Map, DistributionSummary>> distributionSummaries = new ConcurrentHashMap<>(); + public static final String classQualifier = "WorkflowMonitor"; private Monitors() { @@ -90,6 +93,18 @@ private static void gauge(String className, String name, long measurement, Strin getGauge(className, name, additionalTags).set(measurement); } + /** + * Records a value for an event as a distribution summary. Unlike a gauge, this is sampled multiple times during a + * minute or everytime a new value is recorded. + * + * @param className + * @param name + * @param additionalTags + */ + private static void distributionSummary(String className, String name, long value, String... additionalTags) { + getDistributionSummary(className, name, additionalTags).record(value); + } + private static Timer getTimer(String className, String name, String... additionalTags) { Map tags = toMap(className, additionalTags); return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> { @@ -116,6 +131,15 @@ private static Gauge getGauge(String className, String name, String... additiona }); } + private static DistributionSummary getDistributionSummary(String className, String name, String... additionalTags) { + Map tags = toMap(className, additionalTags); + + return distributionSummaries.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> { + Id id = registry.createId(name, tags); + return registry.distributionSummary(id); + }); + } + private static Map toMap(String className, String... additionalTags) { Map tags = new HashMap<>(); tags.put("class", className); @@ -182,6 +206,10 @@ public static void recordRunningWorkflows(long count, String name, String versio gauge(classQualifier, "workflow_running", count, "workflowName", name, "version", version, "ownerApp", ""+ownerApp); } + public static void recordNumTasksInWorkflow(long count, String name, String version) { + distributionSummary(classQualifier, "tasks_in_workflow", count, "workflowName", name, "version", version); + } + public static void recordTaskTimeout(String taskType) { counter(classQualifier, "task_timeout", "taskType", taskType); }