Skip to content

Commit

Permalink
feat: flag to enable gauge replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
mcfedr committed Sep 1, 2023
1 parent b2868dd commit 1141a76
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func Execute() {
rootCmd.PersistentFlags().StringVar(&cfg.ApiListen, "apiListen", ":80", "Listen for API requests on this host/port.")
rootCmd.PersistentFlags().StringVar(&cfg.LifecycleListen, "lifecycleListen", ":8888", "Listen for lifecycle requests (health, metrics) on this host/port")
rootCmd.PersistentFlags().StringVar(&cfg.CorsDomain, "cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
rootCmd.PersistentFlags().StringVar(&cfg.GaugeBehavior, "gaugeBehavior", "sum", "How gauges are aggregated.")

if err := rootCmd.Execute(); err != nil {
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func startFunc(cmd *cobra.Command, args []string) error {
Accounts: cfg.AuthUsers,
}

routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen)
routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen, cfg.GaugeBehavior)

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Server struct {
LifecycleListen string
CorsDomain string
AuthUsers []string
GaugeBehavior string
}

const (
Expand Down
23 changes: 21 additions & 2 deletions metrics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

type metricFamily struct {
*dto.MetricFamily
lock sync.RWMutex
lock sync.RWMutex
options *aggregateOptions
}

type Aggregate struct {
Expand All @@ -28,9 +29,17 @@ type Aggregate struct {

type ignoredLabels []string

type gaugeBehavior string

const (
sumBehavior gaugeBehavior = "sum"
replaceBehavior = "replace"
)

type aggregateOptions struct {
ignoredLabels ignoredLabels
metricTTLDuration *time.Duration
gaugeBehavior gaugeBehavior
}

type aggregateOptionsFunc func(a *Aggregate)
Expand All @@ -47,6 +56,16 @@ func SetTTLMetricTime(duration *time.Duration) aggregateOptionsFunc {
}
}

func SetGaugeBehavior(behavior string) aggregateOptionsFunc {
return func(a *Aggregate) {
if behavior == replaceBehavior {
a.options.gaugeBehavior = replaceBehavior
} else {
a.options.gaugeBehavior = sumBehavior
}
}
}

func NewAggregate(opts ...aggregateOptionsFunc) *Aggregate {
a := &Aggregate{
families: map[string]*metricFamily{},
Expand Down Expand Up @@ -91,7 +110,7 @@ func (a *Aggregate) setFamilyOrGetExistingFamily(familyName string, family *dto.
defer a.familiesLock.Unlock()
existingFamily, ok := a.families[familyName]
if !ok {
a.families[familyName] = &metricFamily{MetricFamily: family}
a.families[familyName] = &metricFamily{MetricFamily: family, options: &a.options}
return nil
}
return existingFamily
Expand Down
15 changes: 10 additions & 5 deletions metrics/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
func mergeMetric(ty dto.MetricType, a, b *dto.Metric, options *aggregateOptions) *dto.Metric {
switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Expand All @@ -82,9 +82,14 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
}

case dto.MetricType_GAUGE:
// No very meaningful way for us to merge gauges. We'll sum them
// and clear out any gauges on scrape, as a best approximation, but
// this relies on client pushing with the same interval as we scrape.
if options.gaugeBehavior == replaceBehavior {
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Value: float64ptr(*b.Gauge.Value),
},
}
}
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Expand Down Expand Up @@ -143,7 +148,7 @@ func (mf *metricFamily) mergeFamily(b *dto.MetricFamily) error {
newMetric = append(newMetric, b.Metric[j])
j++
} else {
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j])
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j], mf.options)
if merged != nil {
newMetric = append(newMetric, merged)
}
Expand Down
4 changes: 2 additions & 2 deletions routers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/zapier/prom-aggregation-gateway/metrics"
)

func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string) {
func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string, gaugeBehavior string) {
sigChannel := make(chan os.Signal, 1)
signal.Notify(sigChannel, syscall.SIGTERM, syscall.SIGINT)

agg := metrics.NewAggregate()
agg := metrics.NewAggregate(metrics.SetGaugeBehavior(gaugeBehavior))

promMetricsConfig := promMetrics.Config{
Registry: metrics.PromRegistry,
Expand Down

0 comments on commit 1141a76

Please sign in to comment.