Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stale Prometheus metrics #311

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions backend/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,68 @@
package backend

import (
"fmt"
"log"
"net/http"
"regexp"
"strings"
"sync"

"github.com/buildkite/buildkite-agent-metrics/v5/collector"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
camel = regexp.MustCompile("(^[^A-Z0-9]*|[A-Z0-9]*)([A-Z0-9][^A-Z]+|$)")
)
var camelCaseRE = regexp.MustCompile("(^[^A-Z0-9]*|[A-Z0-9]*)([A-Z0-9][^A-Z]+|$)")

// Prometheus this holds a list of prometheus gauges which have been created, one for each metric
// that we want to expose. These are created on the fly as we receive metrics from the agent.
// Prometheus this holds a list of prometheus gauges which have been created,
// one for each metric that we want to expose. These are created and registered
// in NewPrometheusBackend.
//
// Note: these metrics are not unique to a cluster / queue, as these labels are added to the
// value when it is set.
// Note: these metrics are not unique to a cluster / queue, as these labels are
// added to the value when it is set.
type Prometheus struct {
totals map[string]*prometheus.GaugeVec
queues map[string]*prometheus.GaugeVec
oldQueues map[string]map[string]struct{} // cluster -> set of queues in cluster from last collect
}

var (
promSingletonOnce sync.Once
promSingleton *Prometheus
)

// NewPrometheusBackend creates an instance of Prometheus and creates and
// registers all the metrics gauges. Because Prometheus metrics must be unique,
// it manages a singleton instance rather than creating a new backend for each
// call.
func NewPrometheusBackend() *Prometheus {
return &Prometheus{
promSingletonOnce.Do(createPromSingleton)
return promSingleton
}

func createPromSingleton() {
promSingleton = &Prometheus{
totals: make(map[string]*prometheus.GaugeVec),
queues: make(map[string]*prometheus.GaugeVec),
oldQueues: make(map[string]map[string]struct{}),
}

for _, name := range collector.AllMetrics {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "buildkite_total_" + camelToUnderscore(name),
Help: "Buildkite Total: " + name,
}, []string{"cluster"})
prometheus.MustRegister(gauge)
promSingleton.totals[name] = gauge

gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "buildkite_queues_" + camelToUnderscore(name),
Help: "Buildkite Queues: " + name,
}, []string{"queue", "cluster"})
prometheus.MustRegister(gauge)
promSingleton.queues[name] = gauge
}
}

// Serve runs a Prometheus metrics HTTP server.
Expand All @@ -43,23 +72,22 @@ func (p *Prometheus) Serve(path, addr string) {
log.Fatal(http.ListenAndServe(addr, m))
}

// Collect receives a set of metrics from the agent and creates or updates the prometheus gauges
// Collect receives a set of metrics from the agent and updates the gauges.
//
// Note: This is called once per agent token per interval
func (p *Prometheus) Collect(r *collector.Result) error {
for name, value := range r.Totals {
gauge, ok := p.totals[name]
if !ok { // first time this metric has been seen so create a new gauge
gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: fmt.Sprintf("buildkite_total_%s", camelToUnderscore(name)),
Help: fmt.Sprintf("Buildkite Total: %s", name),
}, []string{"cluster"})
prometheus.MustRegister(gauge)
p.totals[name] = gauge
}

// note that r.Cluster will be empty for unclustered agents, this label will be dropped by prometheus
gauge.With(prometheus.Labels{"cluster": r.Cluster}).Set(float64(value))
// Ranging over all gauges and searching Totals / Queues for values ensures
// that metrics that are not in this collection are reset to 0.

for name, gauge := range p.totals {
value := r.Totals[name] // 0 if missing

// note that r.Cluster will be empty for unclustered agents, this label
// will be dropped by prometheus
gauge.With(prometheus.Labels{
"cluster": r.Cluster,
}).Set(float64(value))
}

currentQueues := make(map[string]struct{})
Expand All @@ -68,18 +96,11 @@ func (p *Prometheus) Collect(r *collector.Result) error {
currentQueues[queue] = struct{}{}
delete(oldQueues, queue) // still current

for name, value := range counts {
gauge, ok := p.queues[name]
if !ok { // first time this metric has been seen so create a new gauge
gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: fmt.Sprintf("buildkite_queues_%s", camelToUnderscore(name)),
Help: fmt.Sprintf("Buildkite Queues: %s", name),
}, []string{"queue", "cluster"})
prometheus.MustRegister(gauge)
p.queues[name] = gauge
}

// note that r.Cluster will be empty for unclustered agents, this label will be dropped by prometheus
for name, gauge := range p.queues {
value := counts[name] // 0 if missing

// note that r.Cluster will be empty for unclustered agents, this
// label will be dropped by prometheus
gauge.With(prometheus.Labels{
"cluster": r.Cluster,
"queue": queue,
Expand All @@ -105,7 +126,7 @@ func (p *Prometheus) Collect(r *collector.Result) error {

func camelToUnderscore(s string) string {
var a []string
for _, sub := range camel.FindAllStringSubmatch(s, -1) {
for _, sub := range camelCaseRE.FindAllStringSubmatch(s, -1) {
if sub[1] != "" {
a = append(a, sub[1])
}
Expand Down
53 changes: 23 additions & 30 deletions backend/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,29 @@ import (
dto "github.com/prometheus/client_model/go"
)

const (
runningBuildsCount = iota
scheduledBuildsCount
runningJobsCount
scheduledJobsCount
unfinishedJobsCount
idleAgentCount
busyAgentCount
totalAgentCount
var (
fakeTotals = make(map[string]int)
fakeDefaultQueue = make(map[string]int)
fakeDeployQueue = make(map[string]int)
)

func init() {
for i, metric := range collector.AllMetrics {
fakeTotals[metric] = i
fakeDefaultQueue[metric] = i + 100
fakeDeployQueue[metric] = i + 200
}
}

func newTestResult(t *testing.T) *collector.Result {
t.Helper()
totals := map[string]int{
"RunningBuildsCount": runningBuildsCount,
"ScheduledBuildsCount": scheduledBuildsCount,
"RunningJobsCount": runningJobsCount,
"ScheduledJobsCount": scheduledJobsCount,
"UnfinishedJobsCount": unfinishedJobsCount,
"IdleAgentCount": idleAgentCount,
"BusyAgentCount": busyAgentCount,
"TotalAgentCount": totalAgentCount,
}

res := &collector.Result{
Totals: totals,
Totals: fakeTotals,
Cluster: "test_cluster",
Queues: map[string]map[string]int{
"default": totals,
"deploy": totals,
"default": fakeDefaultQueue,
"deploy": fakeDeployQueue,
},
}
return res
Expand Down Expand Up @@ -100,7 +93,7 @@ func TestCollect(t *testing.T) {
wantMetrics: []promMetric{
{
Labels: map[string]string{"cluster": "test_cluster"},
Value: runningJobsCount,
Value: float64(fakeTotals[collector.RunningJobsCount]),
},
},
},
Expand All @@ -112,29 +105,29 @@ func TestCollect(t *testing.T) {
wantMetrics: []promMetric{
{
Labels: map[string]string{"cluster": "test_cluster"},
Value: scheduledJobsCount,
Value: float64(fakeTotals[collector.ScheduledJobsCount]),
},
},
},
{
group: "Queues",
metricName: "buildkite_queues_scheduled_builds_count",
wantHelp: "Buildkite Queues: ScheduledBuildsCount",
metricName: "buildkite_queues_unfinished_jobs_count",
wantHelp: "Buildkite Queues: UnfinishedJobsCount",
wantType: dto.MetricType_GAUGE,
wantMetrics: []promMetric{
{
Labels: map[string]string{
"cluster": "test_cluster",
"queue": "default",
},
Value: scheduledBuildsCount,
Value: float64(fakeDefaultQueue[collector.UnfinishedJobsCount]),
},
{
Labels: map[string]string{
"cluster": "test_cluster",
"queue": "deploy",
},
Value: scheduledBuildsCount,
Value: float64(fakeDeployQueue[collector.UnfinishedJobsCount]),
},
},
},
Expand All @@ -149,14 +142,14 @@ func TestCollect(t *testing.T) {
"cluster": "test_cluster",
"queue": "default",
},
Value: idleAgentCount,
Value: float64(fakeDefaultQueue[collector.IdleAgentCount]),
},
{
Labels: map[string]string{
"cluster": "test_cluster",
"queue": "deploy",
},
Value: idleAgentCount,
Value: float64(fakeDeployQueue[collector.IdleAgentCount]),
},
},
},
Expand Down
11 changes: 11 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ const (
PollDurationHeader = `Buildkite-Agent-Metrics-Poll-Duration`
)

var AllMetrics = []string{
ScheduledJobsCount,
RunningJobsCount,
UnfinishedJobsCount,
WaitingJobsCount,
IdleAgentCount,
BusyAgentCount,
TotalAgentCount,
BusyAgentPercentage,
}

var ErrUnauthorized = errors.New("unauthorized")

var traceLog = log.New(os.Stderr, "TRACE", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile|log.Lmsgprefix)
Expand Down