diff --git a/pkg/pattern/aggregation/metrics.go b/pkg/pattern/aggregation/metrics.go index d777af50b813..344d2255f6fb 100644 --- a/pkg/pattern/aggregation/metrics.go +++ b/pkg/pattern/aggregation/metrics.go @@ -3,26 +3,121 @@ package aggregation import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/util/constants" ) -type ChunkMetrics struct { +type Metrics struct { + reg prometheus.Registerer + chunks *prometheus.GaugeVec samples *prometheus.CounterVec + + // push operation + pushErrors *prometheus.CounterVec + pushRetries *prometheus.CounterVec + pushSuccesses *prometheus.CounterVec + payloadSize *prometheus.HistogramVec + + // Batch metrics + streamsPerPush *prometheus.HistogramVec + entriesPerPush *prometheus.HistogramVec + servicesTracked *prometheus.GaugeVec + + writeTimeout *prometheus.CounterVec } -func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics { - return &ChunkMetrics{ +func NewMetrics(r prometheus.Registerer) *Metrics { + var m Metrics + m.reg = r + + m = Metrics{ chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, + Namespace: constants.Loki, Subsystem: "pattern_ingester", Name: "metric_chunks", Help: "The total number of chunks in memory.", }, []string{"service_name"}), samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, + Namespace: constants.Loki, Subsystem: "pattern_ingester", Name: "metric_samples", Help: "The total number of samples in memory.", }, []string{"service_name"}), + pushErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "push_errors_total", + Help: "Total number of errors when pushing metrics to Loki.", + }, []string{"tenant_id", "error_type"}), + + pushRetries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "push_retries_total", + Help: "Total number of retries when pushing metrics to Loki.", + }, []string{"tenant_id"}), + + pushSuccesses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "push_successes_total", + Help: "Total number of successful pushes to Loki.", + }, []string{"tenant_id"}), + + // Batch metrics + payloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "push_payload_bytes", + Help: "Size of push payloads in bytes.", + Buckets: []float64{1024, 4096, 16384, 65536, 262144, 1048576}, + }, []string{"tenant_id"}), + + streamsPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "streams_per_push", + Help: "Number of streams in each push request.", + Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}, + }, []string{"tenant_id"}), + + entriesPerPush: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "entries_per_push", + Help: "Number of entries in each push request.", + Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000}, + }, []string{"tenant_id"}), + + servicesTracked: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "services_tracked", + Help: "Number of unique services being tracked.", + }, []string{"tenant_id"}), + writeTimeout: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "pattern_ingester", + Name: "write_timeouts_total", + Help: "Total number of write timeouts.", + }, []string{"tenant_id"}), } + + if m.reg != nil { + m.reg.MustRegister( + m.chunks, + m.samples, + m.pushErrors, + m.pushRetries, + m.pushSuccesses, + m.payloadSize, + m.streamsPerPush, + m.entriesPerPush, + m.servicesTracked, + m.writeTimeout, + ) + } + + return &m } diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index a282913fe508..52b291fda54f 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -16,6 +17,7 @@ import ( "github.com/go-kit/log/level" "github.com/golang/snappy" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -71,6 +73,8 @@ type Push struct { backoff *backoff.Config entries entries + + metrics *Metrics } type entry struct { @@ -108,6 +112,7 @@ func NewPush( useTLS bool, backoffCfg *backoff.Config, logger log.Logger, + registrer prometheus.Registerer, ) (*Push, error) { client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled()) if err != nil { @@ -142,6 +147,7 @@ func NewPush( entries: entries{ entries: make([]entry, 0), }, + metrics: NewMetrics(registrer), } go p.run(pushPeriod) @@ -222,6 +228,10 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) { payload = snappy.Encode(nil, payload) + p.metrics.streamsPerPush.WithLabelValues(p.tenantID).Observe(float64(len(streams))) + p.metrics.entriesPerPush.WithLabelValues(p.tenantID).Observe(float64(len(entries))) + p.metrics.servicesTracked.WithLabelValues(p.tenantID).Set(float64(serviceLimit)) + sp.LogKV( "event", "build aggregated metrics payload", "num_service", len(entriesByStream), @@ -267,7 +277,7 @@ func (p *Push) run(pushPeriod time.Duration) { break } - if status > 0 && status != 429 && status/100 != 5 { + if status > 0 && util.IsRateLimited(status) && !util.IsServerError(status) { level.Error(p.logger).Log("msg", "failed to send entry, server rejected push with a non-retryable status code", "status", status, "err", err) pushTicker.Reset(pushPeriod) break @@ -302,6 +312,8 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) { defer sp.Finish() req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload)) + p.metrics.payloadSize.WithLabelValues(p.tenantID).Observe(float64(len(payload))) + if err != nil { return -1, fmt.Errorf("failed to create push request: %w", err) } @@ -320,23 +332,29 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) { resp, err = p.httpClient.Do(req) if err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + p.metrics.writeTimeout.WithLabelValues(p.tenantID).Inc() + } return -1, fmt.Errorf("failed to push payload: %w", err) } - status := resp.StatusCode - if status/100 != 2 { + statusCode := resp.StatusCode + if util.IsError(statusCode) { + errType := util.ErrorTypeFromHTTPStatus(statusCode) + scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen)) line := "" if scanner.Scan() { line = scanner.Text() } - err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line) + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, statusCode, line) + p.metrics.pushErrors.WithLabelValues(p.tenantID, errType).Inc() } if err := resp.Body.Close(); err != nil { level.Error(p.logger).Log("msg", "failed to close response body", "error", err) } - return status, err + return statusCode, err } func AggregatedMetricEntry( diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go index 149b54a97715..a0a4974278bc 100644 --- a/pkg/pattern/aggregation/push_test.go +++ b/pkg/pattern/aggregation/push_test.go @@ -58,6 +58,7 @@ func Test_Push(t *testing.T) { false, &backoff, log.NewNopLogger(), + nil, ) require.NoError(t, err) ts, payload := testPayload() @@ -82,7 +83,7 @@ func Test_Push(t *testing.T) { "user", "secret", false, &backoff, - log.NewNopLogger(), + log.NewNopLogger(), nil, ) require.NoError(t, err) ts, payload := testPayload() @@ -123,6 +124,7 @@ func Test_Push(t *testing.T) { quit: make(chan struct{}), backoff: &backoff, entries: entries{}, + metrics: NewMetrics(nil), } lbls1 := labels.New(labels.Label{Name: "test", Value: "test"}) diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 90e69f843333..1b88387d7b55 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -403,6 +403,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / aggCfg.UseTLS, &aggCfg.BackoffConfig, i.logger, + i.registerer, ) if err != nil { return nil, err diff --git a/pkg/util/http.go b/pkg/util/http.go index 3fdfca6df24f..2d9b8a7e2903 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -23,6 +23,13 @@ import ( const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)" +const ( + HTTPRateLimited = "rate_limited" + HTTPServerError = "server_error" + HTTPErrorUnknown = "unknown" + HTTPClientError = "client_error" +) + // IsRequestBodyTooLarge returns true if the error is "http: request body too large". func IsRequestBodyTooLarge(err error) bool { return err != nil && strings.Contains(err.Error(), "http: request body too large") @@ -307,3 +314,28 @@ func IsValidURL(endpoint string) bool { return u.Scheme != "" && u.Host != "" } + +func ErrorTypeFromHTTPStatus(status int) string { + errorType := HTTPErrorUnknown + if status == 429 { + errorType = HTTPRateLimited + } else if status/100 == 5 { + errorType = HTTPServerError + } else if status/100 != 2 { + errorType = HTTPClientError + } + + return errorType +} + +func IsError(status int) bool { + return status/200 != 0 +} + +func IsServerError(status int) bool { + return status/100 == 5 +} + +func IsRateLimited(status int) bool { + return status == 429 +} diff --git a/pkg/util/http_test.go b/pkg/util/http_test.go index d032085db502..c41ba0eb1525 100644 --- a/pkg/util/http_test.go +++ b/pkg/util/http_test.go @@ -218,3 +218,65 @@ func TestIsRequestBodyTooLargeRegression(t *testing.T) { _, err := io.ReadAll(http.MaxBytesReader(httptest.NewRecorder(), io.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4})), 1)) assert.True(t, util.IsRequestBodyTooLarge(err)) } + +func TestErrorTypeFromHTTPStatus(t *testing.T) { + tests := []struct { + name string + status int + expectedResult string + }{ + { + name: "rate limited error", + status: 429, + expectedResult: util.HTTPRateLimited, + }, + { + name: "server error - 500", + status: 500, + expectedResult: util.HTTPServerError, + }, + { + name: "server error - 503", + status: 503, + expectedResult: util.HTTPServerError, + }, + { + name: "client error - 400", + status: 400, + expectedResult: util.HTTPClientError, + }, + { + name: "client error - 404", + status: 404, + expectedResult: util.HTTPClientError, + }, + { + name: "success status should return unknown - 200", + status: 200, + expectedResult: util.HTTPErrorUnknown, + }, + { + name: "success status should return unknown - 201", + status: 201, + expectedResult: util.HTTPErrorUnknown, + }, + { + name: "invalid status should return unknown - 600", + status: 600, + expectedResult: util.HTTPClientError, + }, + { + name: "invalid status should return unknown - -1", + status: -1, + expectedResult: util.HTTPClientError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := util.ErrorTypeFromHTTPStatus(tt.status) + assert.Equal(t, tt.expectedResult, result, "ErrorTypeFromHTTPStatus(%d) = %s; want %s", + tt.status, result, tt.expectedResult) + }) + } +}