Skip to content

Commit

Permalink
Add prw2 push distributor benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 6, 2024
1 parent fe26a6b commit ba60d7a
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 1 deletion.
309 changes: 308 additions & 1 deletion pkg/distributor/distributor_prw2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"testing"
"time"

ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -25,6 +31,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
Expand All @@ -40,7 +47,307 @@ var (

// TODO(Sungjin1212): Add PushHAInstances Test after implement PRW2 HA tracker
// TODO(Sungjin1212): Add TestDistributor_Push_LabelRemoval, TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError Test after implement PRW2 relabel
// TODO(Sungjin1212): Add BenchmarkDistributor_Push Benchmark

func BenchmarkDistributorPRW2_Push(b *testing.B) {
const (
numSeriesPerRequest = 1000
)
ctx := user.InjectOrgID(context.Background(), "user")

tests := map[string]struct {
prepareConfig func(limits *validation.Limits)
prepareSeries func() ([]labels.Labels, []cortexpbv2.Sample)
expectedErr string
}{
"all samples successfully pushed": {
prepareConfig: func(limits *validation.Limits) {},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "",
},
"ingestion rate limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.IngestionRate = 1
limits.IngestionBurstSize = 1
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "ingestion rate limit",
},
"too many labels limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelNamesPerSeries = 30
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 1; i < 31; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "series has too many labels",
},
"max label name length limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelNameLength = 1024
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

// Add a label with a very long name.
lbls.Set(fmt.Sprintf("xxx_%0.2000d", 1), "xxx")

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "label name too long",
},
"max label value length limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelValueLength = 1024
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

// Add a label with a very long value.
lbls.Set("xxx", fmt.Sprintf("xxx_%0.2000d", 1))

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "label value too long",
},
"max label size bytes per series limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLabelsSizeBytes = 1024
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

// Add a label with a very long value.
lbls.Set("xxx", fmt.Sprintf("xxx_%0.2000d", 1))

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "labels size bytes exceeded",
},
"timestamp too old": {
prepareConfig: func(limits *validation.Limits) {
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = model.Duration(time.Hour)
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().Add(-2*time.Hour).UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "timestamp too old",
},
"timestamp too new": {
prepareConfig: func(limits *validation.Limits) {
limits.CreationGracePeriod = model.Duration(time.Minute)
},
prepareSeries: func() ([]labels.Labels, []cortexpbv2.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpbv2.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpbv2.Sample{
Value: float64(i),
Timestamp: time.Now().Add(time.Hour).UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "timestamp too new",
},
}

tg := ring.NewRandomTokenGenerator()

for testName, testData := range tests {
b.Run(testName, func(b *testing.B) {

// Create an in-memory KV store for the ring with 1 ingester registered.
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
b.Cleanup(func() { assert.NoError(b, closer.Close()) })

err := kvStore.CAS(context.Background(), ingester.RingKey,
func(_ interface{}) (interface{}, bool, error) {
d := &ring.Desc{}
d.AddIngester("ingester-1", "127.0.0.1", "", tg.GenerateTokens(d, "ingester-1", "", 128, true), ring.ACTIVE, time.Now())
return d, true, nil
},
)
require.NoError(b, err)

ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{Mock: kvStore},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 1,
}, ingester.RingKey, ingester.RingKey, nil, nil)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingestersRing))
b.Cleanup(func() {
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), ingestersRing))
})

test.Poll(b, time.Second, 1, func() interface{} {
return ingestersRing.InstancesCount()
})

// Prepare the distributor configuration.
var distributorCfg Config
var clientConfig client.Config
limits := validation.Limits{}
flagext.DefaultValues(&distributorCfg, &clientConfig, &limits)

limits.IngestionRate = 10000000 // Unlimited.
testData.prepareConfig(&limits)

distributorCfg.ShardByAllLabels = true
distributorCfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return &noopIngester{}, nil
}

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(b, err)

// Start the distributor.
distributor, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor))

b.Cleanup(func() {
require.NoError(b, services.StopAndAwaitTerminated(context.Background(), distributor))
})

// Prepare the series to remote write before starting the benchmark.
metrics, samples := testData.prepareSeries()

// Run the benchmark.
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
_, err := distributor.PushV2(ctx, cortexpbv2.ToWriteRequestV2(metrics, samples, nil, nil, cortexpbv2.API))
if testData.expectedErr == "" && err != nil {
b.Fatalf("no error expected but got %v", err)
}
if testData.expectedErr != "" && (err == nil || !strings.Contains(err.Error(), testData.expectedErr)) {
b.Fatalf("expected %v error but got %v", testData.expectedErr, err)
}
}
})
}
}

func TestDistributorPRW2_Push(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,10 @@ func (i *noopIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWr
return nil, nil
}

func (i *noopIngester) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) {
return nil, nil
}

func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return nil, nil
}
Expand Down

0 comments on commit ba60d7a

Please sign in to comment.