Skip to content

Commit

Permalink
chore(aggregated_metrics): Add metrics to aggreated metrics (#14986)
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi authored Nov 22, 2024
1 parent 62e7d61 commit d3d31f1
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 11 deletions.
105 changes: 100 additions & 5 deletions pkg/pattern/aggregation/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,121 @@ package aggregation
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/util/constants"
)

type ChunkMetrics struct {
type Metrics struct {
reg prometheus.Registerer

chunks *prometheus.GaugeVec
samples *prometheus.CounterVec

// push operation
pushErrors *prometheus.CounterVec
pushRetries *prometheus.CounterVec
pushSuccesses *prometheus.CounterVec
payloadSize *prometheus.HistogramVec

// Batch metrics
streamsPerPush *prometheus.HistogramVec
entriesPerPush *prometheus.HistogramVec
servicesTracked *prometheus.GaugeVec

writeTimeout *prometheus.CounterVec
}

func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics {
return &ChunkMetrics{
func NewMetrics(r prometheus.Registerer) *Metrics {
var m Metrics
m.reg = r

m = Metrics{
chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "metric_chunks",
Help: "The total number of chunks in memory.",
}, []string{"service_name"}),
samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "metric_samples",
Help: "The total number of samples in memory.",
}, []string{"service_name"}),
pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_errors_total",
Help: "Total number of errors when pushing metrics to Loki.",
}, []string{"tenant_id", "error_type"}),

pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_retries_total",
Help: "Total number of retries when pushing metrics to Loki.",
}, []string{"tenant_id"}),

pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_successes_total",
Help: "Total number of successful pushes to Loki.",
}, []string{"tenant_id"}),

// Batch metrics
payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_payload_bytes",
Help: "Size of push payloads in bytes.",
Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576},
}, []string{"tenant_id"}),

streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "streams_per_push",
Help: "Number of streams in each push request.",
Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
}, []string{"tenant_id"}),

entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "entries_per_push",
Help: "Number of entries in each push request.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"tenant_id"}),

servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "services_tracked",
Help: "Number of unique services being tracked.",
}, []string{"tenant_id"}),
writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "write_timeouts_total",
Help: "Total number of write timeouts.",
}, []string{"tenant_id"}),
}

if m.reg != nil {
m.reg.MustRegister(
m.chunks,
m.samples,
m.pushErrors,
m.pushRetries,
m.pushSuccesses,
m.payloadSize,
m.streamsPerPush,
m.entriesPerPush,
m.servicesTracked,
m.writeTimeout,
)
}

return &m
}
28 changes: 23 additions & 5 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Push struct {
backoff *backoff.Config

entries entries

metrics *Metrics
}

type entry struct {
Expand Down Expand Up @@ -108,6 +112,7 @@ func NewPush(
useTLS bool,
backoffCfg *backoff.Config,
logger log.Logger,
registrer prometheus.Registerer,
) (*Push, error) {
client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled())
if err != nil {
Expand Down Expand Up @@ -142,6 +147,7 @@ func NewPush(
entries: entries{
entries: make([]entry, 0),
},
metrics: NewMetrics(registrer),
}

go p.run(pushPeriod)
Expand Down Expand Up @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {

payload = snappy.Encode(nil, payload)

p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams)))
p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries)))
p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(serviceLimit))

sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
Expand Down Expand Up @@ -267,7 +277,7 @@ func (p *Push) run(pushPeriod time.Duration) {
break
}

if status > 0 && status != 429 && status/100 != 5 {
if status > 0 && util.IsRateLimited(status) && !util.IsServerError(status) {
level.Error(p.logger).Log("msg", "failed to send entry, server rejected push with a non-retryable status code", "status", status, "err", err)
pushTicker.Reset(pushPeriod)
break
Expand Down Expand Up @@ -302,6 +312,8 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
defer sp.Finish()

req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload))
p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload)))

if err != nil {
return -1, fmt.Errorf("failed to create push request: %w", err)
}
Expand All @@ -320,23 +332,29 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {

resp, err = p.httpClient.Do(req)
if err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc()
}
return -1, fmt.Errorf("failed to push payload: %w", err)
}
status := resp.StatusCode
if status/100 != 2 {
statusCode := resp.StatusCode
if util.IsError(statusCode) {
errType := util.ErrorTypeFromHTTPStatus(statusCode)

scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line)
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, statusCode, line)
p.metrics.pushErrors.WithLabelValues(p.tenantID, errType).Inc()
}

if err := resp.Body.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close response body", "error", err)
}

return status, err
return statusCode, err
}

func AggregatedMetricEntry(
Expand Down
4 changes: 3 additions & 1 deletion pkg/pattern/aggregation/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) {
false,
&backoff,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand All @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) {
"user", "secret",
false,
&backoff,
log.NewNopLogger(),
log.NewNopLogger(), nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand Down Expand Up @@ -123,6 +124,7 @@ func Test_Push(t *testing.T) {
quit: make(chan struct{}),
backoff: &backoff,
entries: entries{},
metrics: NewMetrics(nil),
}

lbls1 := labels.New(labels.Label{Name: "test", Value: "test"})
Expand Down
1 change: 1 addition & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
aggCfg.UseTLS,
&aggCfg.BackoffConfig,
i.logger,
i.registerer,
)
if err != nil {
return nil, err
Expand Down
32 changes: 32 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (

const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"

const (
HTTPRateLimited = "rate_limited"
HTTPServerError = "server_error"
HTTPErrorUnknown = "unknown"
HTTPClientError = "client_error"
)

// IsRequestBodyTooLarge returns true if the error is "http: request body too large".
func IsRequestBodyTooLarge(err error) bool {
return err != nil && strings.Contains(err.Error(), "http: request body too large")
Expand Down Expand Up @@ -307,3 +314,28 @@ func IsValidURL(endpoint string) bool {

return u.Scheme != "" && u.Host != ""
}

func ErrorTypeFromHTTPStatus(status int) string {
errorType := HTTPErrorUnknown
if status == 429 {
errorType = HTTPRateLimited
} else if status/100 == 5 {
errorType = HTTPServerError
} else if status/100 != 2 {
errorType = HTTPClientError
}

return errorType
}

func IsError(status int) bool {
return status/200 != 0
}

func IsServerError(status int) bool {
return status/100 == 5
}

func IsRateLimited(status int) bool {
return status == 429
}
62 changes: 62 additions & 0 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,65 @@ func TestIsRequestBodyTooLargeRegression(t *testing.T) {
_, err := io.ReadAll(http.MaxBytesReader(httptest.NewRecorder(), io.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4})), 1))
assert.True(t, util.IsRequestBodyTooLarge(err))
}

func TestErrorTypeFromHTTPStatus(t *testing.T) {
tests := []struct {
name string
status int
expectedResult string
}{
{
name: "rate limited error",
status: 429,
expectedResult: util.HTTPRateLimited,
},
{
name: "server error - 500",
status: 500,
expectedResult: util.HTTPServerError,
},
{
name: "server error - 503",
status: 503,
expectedResult: util.HTTPServerError,
},
{
name: "client error - 400",
status: 400,
expectedResult: util.HTTPClientError,
},
{
name: "client error - 404",
status: 404,
expectedResult: util.HTTPClientError,
},
{
name: "success status should return unknown - 200",
status: 200,
expectedResult: util.HTTPErrorUnknown,
},
{
name: "success status should return unknown - 201",
status: 201,
expectedResult: util.HTTPErrorUnknown,
},
{
name: "invalid status should return unknown - 600",
status: 600,
expectedResult: util.HTTPClientError,
},
{
name: "invalid status should return unknown - -1",
status: -1,
expectedResult: util.HTTPClientError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := util.ErrorTypeFromHTTPStatus(tt.status)
assert.Equal(t, tt.expectedResult, result, "ErrorTypeFromHTTPStatus(%d) = %s; want %s",
tt.status, result, tt.expectedResult)
})
}
}

0 comments on commit d3d31f1

Please sign in to comment.