From b813e04587735b3485528a30d17feebf2a459252 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 18 Dec 2024 20:23:29 +0900 Subject: [PATCH] Add native histogram for tracking push requests size Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/api/api.go | 10 ++++---- pkg/distributor/distributor.go | 36 ++++++++++++++++++++++++-- pkg/ingester/client/client_test.go | 4 +-- pkg/querier/remote_read.go | 2 +- pkg/util/http.go | 8 +++--- pkg/util/http_test.go | 2 +- pkg/util/push/otlp.go | 24 +++++++++++++---- pkg/util/push/otlp_test.go | 41 ++++++++++++++++++++++++++++-- pkg/util/push/push.go | 21 +++++++++++++-- pkg/util/push/push_test.go | 26 ++++++++++++++++--- 11 files changed, 147 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7da08bf6ea..69c43697d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [FEATURE] Ruler: Add support for per-user external labels #6340 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 +* [ENHANCEMENT] Distributor: Add a `cortex_distributor_push_requests_uncompressed_size_bytes` native histogram to track uncompressed push requests in bytes for tenant and format. #6384 * [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370 * [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355 * [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333 diff --git a/pkg/api/api.go b/pkg/api/api.go index 13843c3e64..692cc8bed9 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -277,8 +277,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") - a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST") + a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") @@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eb25969978..5ec41eed5c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -129,6 +129,9 @@ type Distributor struct { asyncExecutor util.AsyncExecutor + // metrics passed to push handler + PushHandlerMetrics *PushHandlerMetrics + // Map to track label sets from user. labelSetTracker *labelSetTracker } @@ -180,6 +183,32 @@ type Config struct { OTLPConfig OTLPConfig `yaml:"otlp"` } +type PushHandlerMetrics struct { + pushRequestSizeBytes *prometheus.HistogramVec +} + +func NewPushHandlerMetrics(reg prometheus.Registerer) *PushHandlerMetrics { + return &PushHandlerMetrics{ + pushRequestSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_distributor_push_requests_uncompressed_size_bytes", + Help: "Histogram of push request's uncompressed size in bytes", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMinResetDuration: 1 * time.Hour, + NativeHistogramMaxBucketNumber: 100, + }, []string{"user", "format"}), + } +} + +func (m *PushHandlerMetrics) ObservePushRequestSize(user, format string, size float64) { + if m != nil { + m.pushRequestSizeBytes.WithLabelValues(user, format).Observe(size) + } +} + +func (m *PushHandlerMetrics) deleteUserMetrics(user string) { + m.pushRequestSizeBytes.DeleteLabelValues(user) +} + type InstanceLimits struct { MaxIngestionRate float64 `yaml:"max_ingestion_rate"` MaxInflightPushRequests int `yaml:"max_inflight_push_requests"` @@ -384,8 +413,9 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Help: "Unix timestamp of latest received sample per user.", }, []string{"user"}), - validateMetrics: validation.NewValidateMetrics(reg), - asyncExecutor: util.NewNoOpExecutor(), + validateMetrics: validation.NewValidateMetrics(reg), + PushHandlerMetrics: NewPushHandlerMetrics(reg), + asyncExecutor: util.NewNoOpExecutor(), } d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet) @@ -501,6 +531,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) { d.nonHASamples.DeleteLabelValues(userID) d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID) + d.PushHandlerMetrics.deleteUserMetrics(userID) + if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil { level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err) } diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 55e9a40a9e..866f6126f9 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -43,9 +43,9 @@ func TestMarshall(t *testing.T) { plentySize = 1024 * 1024 ) req := cortexpb.WriteRequest{} - err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy) + _, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy) require.Error(t, err) - err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy) + _, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy) require.NoError(t, err) require.Equal(t, numSeries, len(req.Timeseries)) } diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index a7e86b96e5..f5995a5947 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -21,7 +21,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler { ctx := r.Context() var req client.ReadRequest logger := util_log.WithContext(r.Context(), logger) - if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil { + if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil { level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/pkg/util/http.go b/pkg/util/http.go index 09b6aea9fe..f0877f514f 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -148,14 +148,14 @@ const ( ) // ParseProtoReader parses a compressed proto from an io.Reader. -func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error { +func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) (int, error) { sp := opentracing.SpanFromContext(ctx) if sp != nil { sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]")) } body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp) if err != nil { - return err + return 0, err } if sp != nil { @@ -171,10 +171,10 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi err = proto.NewBuffer(body).Unmarshal(req) } if err != nil { - return err + return 0, err } - return nil + return len(body), nil } func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) { diff --git a/pkg/util/http_test.go b/pkg/util/http_test.go index e4de5b6b96..2ce7e32124 100644 --- a/pkg/util/http_test.go +++ b/pkg/util/http_test.go @@ -193,7 +193,7 @@ func TestParseProtoReader(t *testing.T) { reader = bytesBuffered{Buffer: &buf} } - err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression) + _, err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression) if tt.expectErr { assert.NotNil(t, err) return diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 9d0bcb1fba..95e7016bd9 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -34,7 +34,7 @@ const ( ) // OTLPHandler is a http.Handler which accepts OTLP metrics. -func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, metrics *distributor.PushHandlerMetrics) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := util_log.WithContext(ctx, util_log.Logger) @@ -51,7 +51,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri return } - req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize) + req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize, userID, metrics) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -99,7 +99,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri }) } -func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) { +func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int, userID string, metrics *distributor.PushHandlerMetrics) (pmetricotlp.ExportRequest, error) { expectedSize := int(r.ContentLength) if expectedSize > maxSize { return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize) @@ -124,7 +124,17 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) ( decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) { req := pmetricotlp.NewExportRequest() otlpReqProto := otlpProtoMessage{req: &req} - return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType) + + bodySize, err := util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType) + if err != nil { + return req, err + } + + if metrics != nil { + metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize)) + } + + return req, nil } case jsonContentType: decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) { @@ -143,11 +153,15 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) ( if expectedSize > 0 { buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation } - _, err := buf.ReadFrom(reader) + bodySize, err := buf.ReadFrom(reader) if err != nil { return req, err } + if metrics != nil { + metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize)) + } + return req, req.UnmarshalJSON(buf.Bytes()) } default: diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index a16b62fae6..2e1d69d7bc 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -7,10 +7,13 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -305,7 +308,7 @@ func BenchmarkOTLPWriteHandler(b *testing.B) { mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } - handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc) + handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc, nil) b.Run("json with no compression", func(b *testing.B) { req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "") @@ -384,12 +387,20 @@ func TestOTLPWriteHandler(t *testing.T) { expectedStatusCode int expectedErrMsg string encodingType string + expectedMetrics string }{ { description: "Test proto format write with no compression", maxRecvMsgSize: 10000, contentType: pbContentType, expectedStatusCode: http.StatusOK, + expectedMetrics: ` + # HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes + # TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram + cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1 + cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665 + cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1 + `, }, { description: "Test proto format write with gzip", @@ -397,12 +408,26 @@ func TestOTLPWriteHandler(t *testing.T) { contentType: pbContentType, expectedStatusCode: http.StatusOK, encodingType: "gzip", + expectedMetrics: ` + # HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes + # TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram + cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1 + cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665 + cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1 + `, }, { description: "Test json format write with no compression", maxRecvMsgSize: 10000, contentType: jsonContentType, expectedStatusCode: http.StatusOK, + expectedMetrics: ` + # HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes + # TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram + cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1 + cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568 + cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1 + `, }, { description: "Test json format write with gzip", @@ -410,6 +435,13 @@ func TestOTLPWriteHandler(t *testing.T) { contentType: jsonContentType, expectedStatusCode: http.StatusOK, encodingType: "gzip", + expectedMetrics: ` + # HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes + # TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram + cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1 + cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568 + cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1 + `, }, { description: "request too big than maxRecvMsgSize (proto) with no compression", @@ -458,7 +490,8 @@ func TestOTLPWriteHandler(t *testing.T) { push := verifyOTLPWriteRequestHandler(t, cortexpb.API) overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) require.NoError(t, err) - handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push) + reg := prometheus.NewRegistry() + handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, distributor.NewPushHandlerMetrics(reg)) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -466,6 +499,10 @@ func TestOTLPWriteHandler(t *testing.T) { resp := recorder.Result() require.Equal(t, test.expectedStatusCode, resp.StatusCode) + if test.expectedMetrics != "" { + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_distributor_push_requests")) + } + if test.expectedErrMsg != "" { b, err := io.ReadAll(resp.Body) require.NoError(t, err) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 9cabb39522..6d6c749a0c 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -9,15 +9,22 @@ import ( "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/log" ) +const ( + formatRemoteWrite1 = "prw1" + formatOTLP = "otlp" +) + // Func defines the type of the push. It is similar to http.HandlerFunc. type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushMetrics *distributor.PushHandlerMetrics) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -28,14 +35,24 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F logger = log.WithSourceIPs(source, logger) } } + + userID, err := tenant.TenantID(ctx) + if err != nil { + return + } + var req cortexpb.PreallocWriteRequest - err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + bodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } + if pushMetrics != nil { + pushMetrics.ObservePushRequestSize(userID, formatRemoteWrite1, float64(bodySize)) + } + req.SkipLabelNameValidation = false if req.Source == 0 { req.Source = cortexpb.API diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index b806011a61..964b2701cd 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -5,31 +5,48 @@ import ( "context" "net/http" "net/http/httptest" + "strings" "testing" "time" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/distributor" ) func TestHandler_remoteWrite(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := distributor.NewPushHandlerMetrics(reg) + req := createRequest(t, createPrometheusRemoteWriteProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.API), metrics) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) + + expectedMetrics := ` + # HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes + # TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram + cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="prw1",user="fake",le="+Inf"} 1 + cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="prw1",user="fake"} 40 + cortex_distributor_push_requests_uncompressed_size_bytes_count{format="prw1",user="fake"} 1 + ` + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_distributor_push_requests")) } func TestHandler_cortexWriteRequest(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false)) resp := httptest.NewRecorder() sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.RULE), nil) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -40,7 +57,7 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { createRequest(t, createCortexWriteRequestProtobuf(t, false)), } { resp := httptest.NewRecorder() - handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -61,7 +78,8 @@ func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_ func createRequest(t *testing.T, protobuf []byte) *http.Request { t.Helper() inoutBytes := snappy.Encode(nil, protobuf) - req, err := http.NewRequest("POST", "http://localhost/", bytes.NewReader(inoutBytes)) + ctx := user.InjectOrgID(context.Background(), "fake") + req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost/", bytes.NewReader(inoutBytes)) require.NoError(t, err) req.Header.Add("Content-Encoding", "snappy") req.Header.Set("Content-Type", "application/x-protobuf")