Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
added metric to track distribution of number of tasks in a workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Oct 23, 2020
1 parent f924d0e commit 118dd5d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,10 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
}
}

// metric to track the distribution of number of tasks within a workflow
Monitors.recordNumTasksInWorkflow(workflow.getTasks().size() + tasks.size(), workflow.getWorkflowName(),
String.valueOf(workflow.getWorkflowVersion()));

// Save the tasks in the DAO
createdTasks = executionDAOFacade.createTasks(tasks);

Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/com/netflix/conductor/metrics/Monitors.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.netflix.servo.monitor.BasicStopwatch;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
Expand All @@ -43,6 +44,8 @@ public class Monitors {

private static final Map<String, Map<Map<String, String>, Gauge>> gauges = new ConcurrentHashMap<>();

private static final Map<String, Map<Map<String, String>, DistributionSummary>> distributionSummaries = new ConcurrentHashMap<>();

public static final String classQualifier = "WorkflowMonitor";

private Monitors() {
Expand Down Expand Up @@ -90,6 +93,18 @@ private static void gauge(String className, String name, long measurement, Strin
getGauge(className, name, additionalTags).set(measurement);
}

/**
* Records a value for an event as a distribution summary. Unlike a gauge, this is sampled multiple times during a
* minute or everytime a new value is recorded.
*
* @param className
* @param name
* @param additionalTags
*/
private static void distributionSummary(String className, String name, long value, String... additionalTags) {
getDistributionSummary(className, name, additionalTags).record(value);
}

private static Timer getTimer(String className, String name, String... additionalTags) {
Map<String, String> tags = toMap(className, additionalTags);
return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Expand All @@ -116,6 +131,15 @@ private static Gauge getGauge(String className, String name, String... additiona
});
}

private static DistributionSummary getDistributionSummary(String className, String name, String... additionalTags) {
Map<String, String> tags = toMap(className, additionalTags);

return distributionSummaries.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Id id = registry.createId(name, tags);
return registry.distributionSummary(id);
});
}

private static Map<String, String> toMap(String className, String... additionalTags) {
Map<String, String> tags = new HashMap<>();
tags.put("class", className);
Expand Down Expand Up @@ -182,6 +206,10 @@ public static void recordRunningWorkflows(long count, String name, String versio
gauge(classQualifier, "workflow_running", count, "workflowName", name, "version", version, "ownerApp", ""+ownerApp);
}

public static void recordNumTasksInWorkflow(long count, String name, String version) {
distributionSummary(classQualifier, "tasks_in_workflow", count, "workflowName", name, "version", version);
}

public static void recordTaskTimeout(String taskType) {
counter(classQualifier, "task_timeout", "taskType", taskType);
}
Expand Down

0 comments on commit 118dd5d

Please sign in to comment.