From 3a958c8a835bff81648d2f51a38f7118c9f68c05 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 6 Nov 2024 14:46:11 +0900 Subject: [PATCH] Add remote write v2 HA tracker and relabel Signed-off-by: SungJin1212 --- pkg/cortexpbv2/compatv2.go | 4 +- pkg/distributor/distributor.go | 84 ++++++- pkg/distributor/distributor_prw2_test.go | 296 ++++++++++++++++++++++- pkg/distributor/distributor_test.go | 3 +- pkg/ingester/ingester.go | 2 +- 5 files changed, 369 insertions(+), 20 deletions(-) diff --git a/pkg/cortexpbv2/compatv2.go b/pkg/cortexpbv2/compatv2.go index 99703f5686..1c02ccc456 100644 --- a/pkg/cortexpbv2/compatv2.go +++ b/pkg/cortexpbv2/compatv2.go @@ -8,14 +8,14 @@ import ( ) // ToWriteRequestV2 converts matched slices of Labels, Samples, and Histograms into a WriteRequest proto. -func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, additionalSymbols ...string) *WriteRequest { +func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, help ...string) *WriteRequest { st := writev2.NewSymbolTable() labelRefs := make([][]uint32, 0, len(lbls)) for _, lbl := range lbls { labelRefs = append(labelRefs, st.SymbolizeLabels(lbl, nil)) } - for _, s := range additionalSymbols { + for _, s := range help { st.Symbolize(s) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index fcf6f174ed..7c798956f9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/httpgrpc" @@ -515,6 +516,17 @@ func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 { return h } +// Remove the label labelname from a slice of LabelPairs if it exists. +func removeLabelV2(labelName string, labels *labels.Labels) { + for i := 0; i < len(*labels); i++ { + pair := (*labels)[i] + if pair.Name == labelName { + *labels = append((*labels)[:i], (*labels)[i+1:]...) + return + } + } +} + // Remove the label labelname from a slice of LabelPairs if it exists. func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) { for i := 0; i < len(*labels); i++ { @@ -619,11 +631,9 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri nil } -func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits) ([]uint32, []cortexpbv2.TimeSeries, int64, int64, int64, int64, error, error) { +func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits, b labels.ScratchBuilder, removeReplica bool) ([]uint32, []cortexpbv2.TimeSeries, int64, int64, int64, int64, error, error) { pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeysV2") defer pSpan.Finish() - - b := labels.NewScratchBuilder(0) // For each timeseries or samples, we compute a hash to distribute across ingesters; // check each sample/metadata and discard if outside limits. validatedTimeseries := make([]cortexpbv2.TimeSeries, 0, len(req.Timeseries)) @@ -643,6 +653,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W } }() + st := writev2.NewSymbolTable() // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() @@ -656,12 +667,37 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W } lbs := ts.ToLabels(&b, req.Symbols) - las := cortexpb.FromLabelsToLabelAdapters(lbs) - // TODO(Sungjin1212): Implement relabel - // TODO(Sunghin1212): Implement ha tracker + if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 { + l, _ := relabel.Process(lbs, mrc...) + if len(l) == 0 { + // all labels are gone, samples will be discarded + d.validateMetrics.DiscardedSamples.WithLabelValues( + validation.DroppedByRelabelConfiguration, + userID, + ).Add(float64(len(ts.Samples) + len(ts.Histograms))) + + // all labels are gone, exemplars will be discarded + d.validateMetrics.DiscardedExemplars.WithLabelValues( + validation.DroppedByRelabelConfiguration, + userID, + ).Add(float64(len(ts.Exemplars))) + continue + } + lbs = l + } + + // If we found both the cluster and replica labels, we only want to include the cluster label when + // storing series in Cortex. If we kept the replica label we would end up with another series for the same + // series we're trying to dedupe when HA tracking moves over to a different replica. + if removeReplica { + removeLabelV2(limits.HAReplicaLabel, &lbs) + } - if len(las) == 0 { + for _, labelName := range limits.DropLabels { + removeLabelV2(labelName, &lbs) + } + if len(lbs) == 0 { d.validateMetrics.DiscardedSamples.WithLabelValues( validation.DroppedByUserConfigurationOverride, userID, @@ -674,6 +710,10 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W continue } + // update label refs + ts.LabelsRefs = st.SymbolizeLabels(lbs, nil) + las := cortexpb.FromLabelsToLabelAdapters(lbs) + // We rely on sorted labels in different places: // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns // different tokens, which is bad. @@ -714,6 +754,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W validatedHistogramSamples += len(ts.Histograms) validatedExemplars += len(ts.Exemplars) } + return seriesKeys, validatedTimeseries, int64(validatedMetadata), int64(validatedFloatSamples), int64(validatedHistogramSamples), int64(validatedExemplars), firstPartialErr, nil } @@ -917,12 +958,37 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) } } + b := labels.NewScratchBuilder(0) + removeReplica := false // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID) - // TODO(Sungjin1212): Add ha tracker + if limits.AcceptHASamples && len(req.Timeseries) > 0 { + cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, cortexpb.FromLabelsToLabelAdapters(req.Timeseries[0].ToLabels(&b, req.Symbols))) + removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits) + if err != nil { + // TODO(Sungjin1212): reuse timeseries slice + + if errors.Is(err, ha.ReplicasNotMatchError{}) { + // These samples have been deduped. + d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples)) + return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error()) + } + + if errors.Is(err, ha.TooManyReplicaGroupsError{}) { + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples)) + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + return nil, err + } + // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. + if !removeReplica { // False, Nil + d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples)) + } + } - seriesKeys, validatedTimeseries, validatedMetadatas, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeysV2(ctx, req, userID, limits) + seriesKeys, validatedTimeseries, validatedMetadatas, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeysV2(ctx, req, userID, limits, b, removeReplica) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_prw2_test.go b/pkg/distributor/distributor_prw2_test.go index 4feb7122c3..7b56391dbd 100644 --- a/pkg/distributor/distributor_prw2_test.go +++ b/pkg/distributor/distributor_prw2_test.go @@ -10,10 +10,6 @@ import ( "testing" "time" - ring_client "github.com/cortexproject/cortex/pkg/ring/client" - "github.com/cortexproject/cortex/pkg/ring/kv" - "github.com/cortexproject/cortex/pkg/ring/kv/consul" - "github.com/cortexproject/cortex/pkg/util/services" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -33,10 +29,15 @@ import ( "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + ring_client "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -45,8 +46,266 @@ var ( emptyResponseV2 = &cortexpbv2.WriteResponse{} ) -// TODO(Sungjin1212): Add PushHAInstances Test after implement PRW2 HA tracker -// TODO(Sungjin1212): Add TestDistributor_Push_LabelRemoval, TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError Test after implement PRW2 relabel +func TestDistributorPRW2_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "user") + type testcase struct { + inputSeries labels.Labels + expectedSeries labels.Labels + removeReplica bool + removeLabels []string + } + + tc := testcase{ + removeReplica: true, + removeLabels: []string{"__name__"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{}, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + limits.AcceptHASamples = tc.removeReplica + + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + // Push the series to the distributor + req := mockWriteRequestV2([]labels.Labels{tc.inputSeries}, 1, 1, false) + _, err = ds[0].PushV2(ctx, req) + require.Error(t, err) + assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error()) +} + +func TestDistributorPRW2_Push_LabelRemoval(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "user") + + type testcase struct { + inputSeries labels.Labels + expectedSeries labels.Labels + removeReplica bool + removeLabels []string + exemplars []cortexpbv2.Exemplar + } + + cases := []testcase{ + // Remove both cluster and replica label. + { + removeReplica: true, + removeLabels: []string{"cluster"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + }, + }, + // Remove multiple labels and replica. + { + removeReplica: true, + removeLabels: []string{"foo", "some"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + {Name: "foo", Value: "bar"}, + {Name: "some", Value: "thing"}, + }, + expectedSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + }, + }, + // Don't remove any labels. + { + removeReplica: false, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "__replica__", Value: "two"}, + {Name: "cluster", Value: "one"}, + }, + expectedSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "__replica__", Value: "two"}, + {Name: "cluster", Value: "one"}, + }, + }, + // No labels left. + { + removeReplica: true, + removeLabels: []string{"cluster"}, + inputSeries: labels.Labels{ + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{}, + exemplars: []cortexpbv2.Exemplar{ + {LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 0}, + {LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 0}, + }, + }, + } + + for _, tc := range cases { + for _, histogram := range []bool{true, false} { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + limits.AcceptHASamples = tc.removeReplica + + expectedDiscardedSamples := 0 + expectedDiscardedExemplars := 0 + if tc.expectedSeries.Len() == 0 { + expectedDiscardedSamples = 1 + expectedDiscardedExemplars = len(tc.exemplars) + // Allow series with no labels to ingest + limits.EnforceMetricName = false + } + + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + // Push the series to the distributor + req := mockWriteRequestV2([]labels.Labels{tc.inputSeries}, 1, 1, histogram) + req.Timeseries[0].Exemplars = tc.exemplars + _, err = ds[0].PushV2(ctx, req) + require.NoError(t, err) + + actualDiscardedSamples := testutil.ToFloat64(ds[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.DroppedByUserConfigurationOverride, "user")) + actualDiscardedExemplars := testutil.ToFloat64(ds[0].validateMetrics.DiscardedExemplars.WithLabelValues(validation.DroppedByUserConfigurationOverride, "user")) + require.Equal(t, float64(expectedDiscardedSamples), actualDiscardedSamples) + require.Equal(t, float64(expectedDiscardedExemplars), actualDiscardedExemplars) + + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + expectedSeries := 1 + if tc.expectedSeries.Len() == 0 { + expectedSeries = 0 + } + assert.Equal(t, expectedSeries, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + } + } + } + } +} + +func TestDistributorPRW2_PushHAInstances(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "user") + + for i, tc := range []struct { + enableTracker bool + acceptedReplica string + testReplica string + cluster string + samples int + expectedResponse *cortexpbv2.WriteResponse + expectedCode int32 + }{ + { + enableTracker: true, + acceptedReplica: "instance0", + testReplica: "instance0", + cluster: "cluster0", + samples: 5, + expectedResponse: emptyResponseV2, + }, + // The 202 indicates that we didn't accept this sample. + { + enableTracker: true, + acceptedReplica: "instance2", + testReplica: "instance0", + cluster: "cluster0", + samples: 5, + expectedCode: 202, + }, + // If the HA tracker is disabled we should still accept samples that have both labels. + { + enableTracker: false, + acceptedReplica: "instance0", + testReplica: "instance0", + cluster: "cluster0", + samples: 5, + expectedResponse: emptyResponseV2, + }, + // Using very long replica label value results in validation error. + { + enableTracker: true, + acceptedReplica: "instance0", + testReplica: "instance1234567890123456789012345678901234567890", + cluster: "cluster0", + samples: 5, + expectedResponse: emptyResponseV2, + expectedCode: 400, + }, + } { + for _, shardByAllLabels := range []bool{true, false} { + tc := tc + shardByAllLabels := shardByAllLabels + for _, enableHistogram := range []bool{true, false} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v, histogram=%v)", i, shardByAllLabels, enableHistogram), func(t *testing.T) { + t.Parallel() + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.AcceptHASamples = true + limits.MaxLabelValueLength = 15 + + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: shardByAllLabels, + limits: &limits, + enableTracker: tc.enableTracker, + }) + + d := ds[0] + + userID, err := tenant.TenantID(ctx) + assert.NoError(t, err) + err = d.HATracker.CheckReplica(ctx, userID, tc.cluster, tc.acceptedReplica, time.Now()) + assert.NoError(t, err) + + request := makeWriteRequestHAV2(tc.samples, tc.testReplica, tc.cluster, enableHistogram) + response, err := d.PushV2(ctx, request) + assert.Equal(t, tc.expectedResponse, response) + + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + if ok { + assert.Equal(t, tc.expectedCode, httpResp.Code) + } else if tc.expectedCode != 0 { + assert.Fail(t, "expected HTTP status code", tc.expectedCode) + } + }) + } + } + } +} func BenchmarkDistributorPRW2_Push(b *testing.B) { const ( @@ -2228,3 +2487,28 @@ func mockWriteRequestV2(lbls []labels.Labels, value int64, timestamp int64, hist return cortexpbv2.ToWriteRequestV2(lbls, samples, histograms, nil, cortexpbv2.API) } + +func makeWriteRequestHAV2(samples int, replica, cluster string, histogram bool) *cortexpbv2.WriteRequest { + request := &cortexpbv2.WriteRequest{} + st := writev2.NewSymbolTable() + for i := 0; i < samples; i++ { + ts := cortexpbv2.TimeSeries{ + LabelsRefs: st.SymbolizeLabels(labels.Labels{{Name: "__name__", Value: "foo"}, {Name: "__replica__", Value: replica}, {Name: "bar", Value: "baz"}, {Name: "cluster", Value: cluster}, {Name: "sample", Value: fmt.Sprintf("%d", i)}}, nil), + } + if histogram { + ts.Histograms = []cortexpbv2.Histogram{ + cortexpbv2.HistogramToHistogramProto(int64(i), tsdbutil.GenerateTestHistogram(i)), + } + } else { + ts.Samples = []cortexpbv2.Sample{ + { + Value: float64(i), + Timestamp: int64(i), + }, + } + } + request.Timeseries = append(request.Timeseries, ts) + } + request.Symbols = st.Symbols() + return request +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b0a6c0d1b1..a04bc72333 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3095,8 +3095,7 @@ func (i *mockIngester) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest, b := labels.NewScratchBuilder(0) - for j := range req.Timeseries { - series := req.Timeseries[j] + for _, series := range req.Timeseries { tsLabels := series.ToLabels(&b, req.Symbols) labels := cortexpb.FromLabelsToLabelAdapters(tsLabels) hash := shardByAllLabels(orgid, labels) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 760c809748..4d6fbe3e94 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1196,8 +1196,8 @@ func (i *Ingester) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*c // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + b := labels.NewScratchBuilder(0) for _, ts := range req.Timeseries { - b := labels.NewScratchBuilder(0) tsLabels := ts.ToLabels(&b, req.Symbols) seriesLabels := cortexpb.FromLabelsToLabelAdapters(tsLabels)