Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(aggregated_metrics): Add metrics to aggreated metrics #14986

Merged
merged 10 commits into from
Nov 22, 2024
99 changes: 96 additions & 3 deletions pkg/pattern/aggregation/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,31 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ChunkMetrics struct {
type Metrics struct {
reg prometheus.Registerer

chunks *prometheus.GaugeVec
samples *prometheus.CounterVec

// push operation
pushErrors *prometheus.CounterVec
pushRetries *prometheus.CounterVec
pushSuccesses *prometheus.CounterVec
payloadSize *prometheus.HistogramVec

// Batch metrics
streamsPerPush *prometheus.HistogramVec
entriesPerPush *prometheus.HistogramVec
servicesTracked *prometheus.GaugeVec

writeTimeout *prometheus.CounterVec
}

func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics {
return &ChunkMetrics{
func NewMetrics(r prometheus.Registerer, metricsNamespace string) *Metrics {
var m Metrics
m.reg = r

m = Metrics{
chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Expand All @@ -24,5 +42,80 @@ func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMet
Name: "metric_samples",
Help: "The total number of samples in memory.",
}, []string{"service_name"}),
pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_errors_total",
Help: "Total number of errors when pushing metrics to Loki.",
}, []string{"tenant_id", "error_type"}),

pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_retries_total",
Help: "Total number of retries when pushing metrics to Loki.",
}, []string{"tenant_id"}),

pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_successes_total",
Help: "Total number of successful pushes to Loki.",
}, []string{"tenant_id"}),

// Batch metrics
payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "push_payload_bytes",
Help: "Size of push payloads in bytes.",
Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576},
}, []string{"tenant_id"}),

streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "streams_per_push",
Help: "Number of streams in each push request.",
Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
}, []string{"tenant_id"}),

entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "entries_per_push",
Help: "Number of entries in each push request.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"tenant_id"}),

servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "services_tracked",
Help: "Number of unique services being tracked.",
}, []string{"tenant_id"}),
writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "write_timeouts_total",
Help: "Total number of write timeouts.",
}, []string{"tenant_id"}),
}

if m.reg != nil {
m.reg.MustRegister(
m.chunks,
m.samples,
m.pushErrors,
m.pushRetries,
m.pushSuccesses,
m.payloadSize,
m.streamsPerPush,
m.entriesPerPush,
m.servicesTracked,
m.writeTimeout,
)
}

return &m
}
38 changes: 38 additions & 0 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Push struct {
backoff *backoff.Config

entries entries

metrics *Metrics
}

type entry struct {
Expand Down Expand Up @@ -108,6 +112,7 @@ func NewPush(
useTLS bool,
backoffCfg *backoff.Config,
logger log.Logger,
registrer prometheus.Registerer,
) (*Push, error) {
client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled())
if err != nil {
Expand Down Expand Up @@ -142,6 +147,7 @@ func NewPush(
entries: entries{
entries: make([]entry, 0),
},
metrics: NewMetrics(registrer, "pattern_ingester"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the namespace be loki here? we already have the subsystem as pattern_ingester, so I think this would make the metric have pattern_ingester in it twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, missed this one. Fixed.

}

go p.run(pushPeriod)
Expand Down Expand Up @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {

payload = snappy.Encode(nil, payload)

p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams)))
p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries)))
p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(len(entriesByStream)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between number of streams and the length of the entries by stream map? won't those always be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, was supposed to count services tracked.


sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
Expand Down Expand Up @@ -287,6 +297,31 @@ func (p *Push) run(pushPeriod time.Duration) {
}
}

func (p *Push) sendPayload(ctx context.Context, payload []byte) (int, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any references to this new function, is it being used?


status, err := p.send(ctx, payload)
if err != nil {
errorType := "unknown"
if status == 429 {
errorType = "rate_limited"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we move these to constants so their easier to find, and test against?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, moved this to constants

} else if status/100 == 5 {
errorType = "server_error"
} else if status/100 != 2 {
errorType = "client_error"
}
p.metrics.pushErrors.WithLabelValues(p.tenantID, errorType).Inc()
} else {
p.metrics.pushSuccesses.WithLabelValues(p.tenantID).Inc()
}
if err != nil {
return 0, err
}

p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload)))

return status, err
}

// send makes one attempt to send the payload to Loki
func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
var (
Expand Down Expand Up @@ -320,6 +355,9 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {

resp, err = p.httpClient.Do(req)
if err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc()
}
return -1, fmt.Errorf("failed to push payload: %w", err)
}
status := resp.StatusCode
Expand Down
4 changes: 3 additions & 1 deletion pkg/pattern/aggregation/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) {
false,
&backoff,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand All @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) {
"user", "secret",
false,
&backoff,
log.NewNopLogger(),
log.NewNopLogger(), nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand Down Expand Up @@ -123,6 +124,7 @@ func Test_Push(t *testing.T) {
quit: make(chan struct{}),
backoff: &backoff,
entries: entries{},
metrics: NewMetrics(nil, ""),
}

lbls1 := labels.New(labels.Label{Name: "test", Value: "test"})
Expand Down
1 change: 1 addition & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
aggCfg.UseTLS,
&aggCfg.BackoffConfig,
i.logger,
i.registerer,
)
if err != nil {
return nil, err
Expand Down
Loading