Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native histogram for tracking push requests size #6384

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Distributor: Add a `cortex_distributor_push_requests_uncompressed_size_bytes` native histogram to track uncompressed push requests in bytes for tenant and format. #6384
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand All @@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand Down Expand Up @@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
36 changes: 34 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type Distributor struct {

asyncExecutor util.AsyncExecutor

// metrics passed to push handler
PushHandlerMetrics *PushHandlerMetrics

// Map to track label sets from user.
labelSetTracker *labelSetTracker
}
Expand Down Expand Up @@ -180,6 +183,32 @@ type Config struct {
OTLPConfig OTLPConfig `yaml:"otlp"`
}

type PushHandlerMetrics struct {
pushRequestSizeBytes *prometheus.HistogramVec
}

func NewPushHandlerMetrics(reg prometheus.Registerer) *PushHandlerMetrics {
return &PushHandlerMetrics{
pushRequestSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_push_requests_uncompressed_size_bytes",
Help: "Histogram of push request's uncompressed size in bytes",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user", "format"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need the format parameter here. Why would we need different limits for different formats? Do we plan to have a new limit for RW 2.0?

I feel we should consolidate the request size limit into one #6333.

Copy link
Contributor Author

@SungJin1212 SungJin1212 Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no plan to something limit for PRW 2.0. It could be that we can add some limits. But, I thought different formats (prw1, prw2, and otlp) would have different body sizes.

So, I attached the format label so that the user can configure limit configs for different formats.

Copy link
Contributor

@yeya24 yeya24 Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to track different formats then for the same reason we need to differentiate compression formats as well as they have different sizes. OTLP has protobuf and JSON then we also need to differentiate that.

To simplify things I think it is good enough to use a single size limit and metric

Copy link
Contributor Author

@SungJin1212 SungJin1212 Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.
Then, de-attach format and make 'cortex_distributor_push_requests_uncompressed_size_bytestrack the uncompressed size of all of the formats. Next step is to deletedistributor.otlp-max-recv-msg-size` flags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's hear from other maintainers...
@friedrichg @danielblando @alanprot @CharlieTLe ?

}
}

func (m *PushHandlerMetrics) ObservePushRequestSize(user, format string, size float64) {
if m != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

m.pushRequestSizeBytes.WithLabelValues(user, format).Observe(size)
}
}

func (m *PushHandlerMetrics) deleteUserMetrics(user string) {
m.pushRequestSizeBytes.DeleteLabelValues(user)
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
Expand Down Expand Up @@ -384,8 +413,9 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),

validateMetrics: validation.NewValidateMetrics(reg),
asyncExecutor: util.NewNoOpExecutor(),
validateMetrics: validation.NewValidateMetrics(reg),
PushHandlerMetrics: NewPushHandlerMetrics(reg),
asyncExecutor: util.NewNoOpExecutor(),
}

d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet)
Expand Down Expand Up @@ -501,6 +531,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

d.PushHandlerMetrics.deleteUserMetrics(userID)

if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestMarshall(t *testing.T) {
plentySize = 1024 * 1024
)
req := cortexpb.WriteRequest{}
err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
_, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
require.Error(t, err)
err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
_, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
require.NoError(t, err)
require.Equal(t, numSeries, len(req.Timeseries))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler {
ctx := r.Context()
var req client.ReadRequest
logger := util_log.WithContext(r.Context(), logger)
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ const (
)

// ParseProtoReader parses a compressed proto from an io.Reader.
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) (int, error) {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
}
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
if err != nil {
return err
return 0, err
}

if sp != nil {
Expand All @@ -171,10 +171,10 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
err = proto.NewBuffer(body).Unmarshal(req)
}
if err != nil {
return err
return 0, err
}

return nil
return len(body), nil
}

func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestParseProtoReader(t *testing.T) {
reader = bytesBuffered{Buffer: &buf}
}

err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
_, err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
if tt.expectErr {
assert.NotNil(t, err)
return
Expand Down
24 changes: 19 additions & 5 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, metrics *distributor.PushHandlerMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util_log.WithContext(ctx, util_log.Logger)
Expand All @@ -51,7 +51,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
return
}

req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize, userID, metrics)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -99,7 +99,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
})
}

func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int, userID string, metrics *distributor.PushHandlerMetrics) (pmetricotlp.ExportRequest, error) {
expectedSize := int(r.ContentLength)
if expectedSize > maxSize {
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
Expand All @@ -124,7 +124,17 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
otlpReqProto := otlpProtoMessage{req: &req}
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)

bodySize, err := util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
if err != nil {
return req, err
}

if metrics != nil {
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
}

return req, nil
}
case jsonContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
Expand All @@ -143,11 +153,15 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
_, err := buf.ReadFrom(reader)
bodySize, err := buf.ReadFrom(reader)
if err != nil {
return req, err
}

if metrics != nil {
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
}

return req, req.UnmarshalJSON(buf.Bytes())
}
default:
Expand Down
41 changes: 39 additions & 2 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -305,7 +308,7 @@ func BenchmarkOTLPWriteHandler(b *testing.B) {
mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc)
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc, nil)

b.Run("json with no compression", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "")
Expand Down Expand Up @@ -384,32 +387,61 @@ func TestOTLPWriteHandler(t *testing.T) {
expectedStatusCode int
expectedErrMsg string
encodingType string
expectedMetrics string
}{
{
description: "Test proto format write with no compression",
maxRecvMsgSize: 10000,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test proto format write with gzip",
maxRecvMsgSize: 10000,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test json format write with no compression",
maxRecvMsgSize: 10000,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test json format write with gzip",
maxRecvMsgSize: 10000,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "request too big than maxRecvMsgSize (proto) with no compression",
Expand Down Expand Up @@ -458,14 +490,19 @@ func TestOTLPWriteHandler(t *testing.T) {
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
require.NoError(t, err)
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push)
reg := prometheus.NewRegistry()
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, distributor.NewPushHandlerMetrics(reg))

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(t, test.expectedStatusCode, resp.StatusCode)

if test.expectedMetrics != "" {
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_distributor_push_requests"))
}

if test.expectedErrMsg != "" {
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
Expand Down
21 changes: 19 additions & 2 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ import (
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
)

const (
formatRemoteWrite1 = "prw1"
formatOTLP = "otlp"
)

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushMetrics *distributor.PushHandlerMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand All @@ -28,14 +35,24 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
logger = log.WithSourceIPs(source, logger)
}
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return
}

var req cortexpb.PreallocWriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
bodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if pushMetrics != nil {
pushMetrics.ObservePushRequestSize(userID, formatRemoteWrite1, float64(bodySize))
}

req.SkipLabelNameValidation = false
if req.Source == 0 {
req.Source = cortexpb.API
Expand Down
Loading
Loading