Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(aggregated_metrics): Add metrics to aggreated metrics #14986

Merged
merged 10 commits into from
Nov 22, 2024
105 changes: 100 additions & 5 deletions pkg/pattern/aggregation/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,121 @@ package aggregation
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/util/constants"
)

type ChunkMetrics struct {
type Metrics struct {
reg prometheus.Registerer

chunks *prometheus.GaugeVec
samples *prometheus.CounterVec

// push operation
pushErrors *prometheus.CounterVec
pushRetries *prometheus.CounterVec
pushSuccesses *prometheus.CounterVec
payloadSize *prometheus.HistogramVec

// Batch metrics
streamsPerPush *prometheus.HistogramVec
entriesPerPush *prometheus.HistogramVec
servicesTracked *prometheus.GaugeVec

writeTimeout *prometheus.CounterVec
}

func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics {
return &ChunkMetrics{
func NewMetrics(r prometheus.Registerer) *Metrics {
var m Metrics
m.reg = r

m = Metrics{
chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "metric_chunks",
Help: "The total number of chunks in memory.",
}, []string{"service_name"}),
samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "metric_samples",
Help: "The total number of samples in memory.",
}, []string{"service_name"}),
pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_errors_total",
Help: "Total number of errors when pushing metrics to Loki.",
}, []string{"tenant_id", "error_type"}),

pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_retries_total",
Help: "Total number of retries when pushing metrics to Loki.",
}, []string{"tenant_id"}),

pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_successes_total",
Help: "Total number of successful pushes to Loki.",
}, []string{"tenant_id"}),

// Batch metrics
payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "push_payload_bytes",
Help: "Size of push payloads in bytes.",
Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576},
}, []string{"tenant_id"}),

streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "streams_per_push",
Help: "Number of streams in each push request.",
Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
}, []string{"tenant_id"}),

entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "entries_per_push",
Help: "Number of entries in each push request.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"tenant_id"}),

servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "services_tracked",
Help: "Number of unique services being tracked.",
}, []string{"tenant_id"}),
writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "pattern_ingester",
Name: "write_timeouts_total",
Help: "Total number of write timeouts.",
}, []string{"tenant_id"}),
}

if m.reg != nil {
m.reg.MustRegister(
m.chunks,
m.samples,
m.pushErrors,
m.pushRetries,
m.pushSuccesses,
m.payloadSize,
m.streamsPerPush,
m.entriesPerPush,
m.servicesTracked,
m.writeTimeout,
)
}

return &m
}
28 changes: 23 additions & 5 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Push struct {
backoff *backoff.Config

entries entries

metrics *Metrics
}

type entry struct {
Expand Down Expand Up @@ -108,6 +112,7 @@ func NewPush(
useTLS bool,
backoffCfg *backoff.Config,
logger log.Logger,
registrer prometheus.Registerer,
) (*Push, error) {
client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled())
if err != nil {
Expand Down Expand Up @@ -142,6 +147,7 @@ func NewPush(
entries: entries{
entries: make([]entry, 0),
},
metrics: NewMetrics(registrer),
}

go p.run(pushPeriod)
Expand Down Expand Up @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {

payload = snappy.Encode(nil, payload)

p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams)))
p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries)))
p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(serviceLimit))

sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
Expand Down Expand Up @@ -267,7 +277,7 @@ func (p *Push) run(pushPeriod time.Duration) {
break
}

if status > 0 && status != 429 && status/100 != 5 {
if status > 0 && util.IsRateLimited(status) && !util.IsServerError(status) {
level.Error(p.logger).Log("msg", "failed to send entry, server rejected push with a non-retryable status code", "status", status, "err", err)
pushTicker.Reset(pushPeriod)
break
Expand Down Expand Up @@ -302,6 +312,8 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
defer sp.Finish()

req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload))
p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload)))

if err != nil {
return -1, fmt.Errorf("failed to create push request: %w", err)
}
Expand All @@ -320,23 +332,29 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {

resp, err = p.httpClient.Do(req)
if err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc()
}
return -1, fmt.Errorf("failed to push payload: %w", err)
}
status := resp.StatusCode
if status/100 != 2 {
statusCode := resp.StatusCode
if util.IsError(statusCode) {
errType := util.ErrorTypeFromHTTPStatus(statusCode)

scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line)
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, statusCode, line)
p.metrics.pushErrors.WithLabelValues(p.tenantID, errType).Inc()
}

if err := resp.Body.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close response body", "error", err)
}

return status, err
return statusCode, err
}

func AggregatedMetricEntry(
Expand Down
4 changes: 3 additions & 1 deletion pkg/pattern/aggregation/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) {
false,
&backoff,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand All @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) {
"user", "secret",
false,
&backoff,
log.NewNopLogger(),
log.NewNopLogger(), nil,
)
require.NoError(t, err)
ts, payload := testPayload()
Expand Down Expand Up @@ -123,6 +124,7 @@ func Test_Push(t *testing.T) {
quit: make(chan struct{}),
backoff: &backoff,
entries: entries{},
metrics: NewMetrics(nil),
}

lbls1 := labels.New(labels.Label{Name: "test", Value: "test"})
Expand Down
1 change: 1 addition & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
aggCfg.UseTLS,
&aggCfg.BackoffConfig,
i.logger,
i.registerer,
)
if err != nil {
return nil, err
Expand Down
32 changes: 32 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (

const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"

const (
HTTPRateLimited = "rate_limited"
HTTPServerError = "server_error"
HTTPErrorUnknown = "unknown"
HTTPClientError = "client_error"
)

// IsRequestBodyTooLarge returns true if the error is "http: request body too large".
func IsRequestBodyTooLarge(err error) bool {
return err != nil && strings.Contains(err.Error(), "http: request body too large")
Expand Down Expand Up @@ -307,3 +314,28 @@ func IsValidURL(endpoint string) bool {

return u.Scheme != "" && u.Host != ""
}

func ErrorTypeFromHTTPStatus(status int) string {
errorType := HTTPErrorUnknown
if status == 429 {
errorType = HTTPRateLimited
} else if status/100 == 5 {
errorType = HTTPServerError
} else if status/100 != 2 {
errorType = HTTPClientError
}

return errorType
}

func IsError(status int) bool {
return status/200 != 0
}

func IsServerError(status int) bool {
return status/100 == 5
}

func IsRateLimited(status int) bool {
return status == 429
}
62 changes: 62 additions & 0 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,65 @@ func TestIsRequestBodyTooLargeRegression(t *testing.T) {
_, err := io.ReadAll(http.MaxBytesReader(httptest.NewRecorder(), io.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4})), 1))
assert.True(t, util.IsRequestBodyTooLarge(err))
}

func TestErrorTypeFromHTTPStatus(t *testing.T) {
tests := []struct {
name string
status int
expectedResult string
}{
{
name: "rate limited error",
status: 429,
expectedResult: util.HTTPRateLimited,
},
{
name: "server error - 500",
status: 500,
expectedResult: util.HTTPServerError,
},
{
name: "server error - 503",
status: 503,
expectedResult: util.HTTPServerError,
},
{
name: "client error - 400",
status: 400,
expectedResult: util.HTTPClientError,
},
{
name: "client error - 404",
status: 404,
expectedResult: util.HTTPClientError,
},
{
name: "success status should return unknown - 200",
status: 200,
expectedResult: util.HTTPErrorUnknown,
},
{
name: "success status should return unknown - 201",
status: 201,
expectedResult: util.HTTPErrorUnknown,
},
{
name: "invalid status should return unknown - 600",
status: 600,
expectedResult: util.HTTPClientError,
},
{
name: "invalid status should return unknown - -1",
status: -1,
expectedResult: util.HTTPClientError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := util.ErrorTypeFromHTTPStatus(tt.status)
assert.Equal(t, tt.expectedResult, result, "ErrorTypeFromHTTPStatus(%d) = %s; want %s",
tt.status, result, tt.expectedResult)
})
}
}
Loading