From cbdd36a41282bf85ab1007ca1a6a8a227608c40b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Nov 2024 02:41:38 -0800 Subject: [PATCH] feat: blockbuilder component (#14621) Signed-off-by: Owen Diehl Co-authored-by: Ashwanth Goli --- docs/sources/shared/configuration.md | 51 ++ pkg/blockbuilder/controller.go | 305 +++++++ pkg/blockbuilder/metrics.go | 152 ++++ pkg/blockbuilder/pipeline.go | 99 +++ pkg/blockbuilder/pipeline_test.go | 87 ++ pkg/blockbuilder/plan.txt | 34 + pkg/blockbuilder/slimgester.go | 811 ++++++++++++++++++ pkg/blockbuilder/storage.go | 154 ++++ pkg/blockbuilder/storage_test.go | 37 + pkg/blockbuilder/tsdb.go | 328 +++++++ pkg/kafka/partition/reader.go | 33 + pkg/kafka/partition/reader_service.go | 22 +- pkg/loki/loki.go | 9 + pkg/loki/modules.go | 65 ++ .../stores/shipper/indexshipper/shipper.go | 5 + .../indexshipper/tsdb/compactor_test.go | 24 +- .../shipper/indexshipper/tsdb/identifier.go | 10 +- .../shipper/indexshipper/tsdb/index/index.go | 4 + .../tsdb/index_shipper_querier.go | 4 +- .../shipper/indexshipper/tsdb/manager.go | 27 +- .../shipper/indexshipper/tsdb/manager_test.go | 71 ++ .../stores/shipper/indexshipper/tsdb/store.go | 13 + 22 files changed, 2300 insertions(+), 45 deletions(-) create mode 100644 pkg/blockbuilder/controller.go create mode 100644 pkg/blockbuilder/metrics.go create mode 100644 pkg/blockbuilder/pipeline.go create mode 100644 pkg/blockbuilder/pipeline_test.go create mode 100644 pkg/blockbuilder/plan.txt create mode 100644 pkg/blockbuilder/slimgester.go create mode 100644 pkg/blockbuilder/storage.go create mode 100644 pkg/blockbuilder/storage_test.go create mode 100644 pkg/blockbuilder/tsdb.go create mode 100644 pkg/storage/stores/shipper/indexshipper/tsdb/manager_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 203d7130232a..5fd2850f229d 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -137,6 +137,57 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # itself to a key value store. [ingester: ] +block_builder: + # How many flushes can happen concurrently + # CLI flag: -blockbuilder.concurrent-flushes + [concurrent_flushes: | default = 1] + + # How many workers to process writes, defaults to number of available cpus + # CLI flag: -blockbuilder.concurrent-writers + [concurrent_writers: | default = 1] + + # The targeted _uncompressed_ size in bytes of a chunk block When this + # threshold is exceeded the head block will be cut and compressed inside the + # chunk. + # CLI flag: -blockbuilder.chunks-block-size + [chunk_block_size: | default = 256KB] + + # A target _compressed_ size in bytes for chunks. This is a desired size not + # an exact size, chunks may be slightly bigger or significantly smaller if + # they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 + # creates chunks with a fixed 10 blocks, a non zero value will create chunks + # with a variable number of blocks to meet the target size. + # CLI flag: -blockbuilder.chunk-target-size + [chunk_target_size: | default = 1536KB] + + # The algorithm to use for compressing chunk. (none, gzip, lz4-64k, snappy, + # lz4-256k, lz4-1M, lz4, flate, zstd) + # CLI flag: -blockbuilder.chunk-encoding + [chunk_encoding: | default = "snappy"] + + # The maximum duration of a timeseries chunk in memory. If a timeseries runs + # for longer than this, the current chunk will be flushed to the store and a + # new chunk created. + # CLI flag: -blockbuilder.max-chunk-age + [max_chunk_age: | default = 2h] + + # The interval at which to run. + # CLI flag: -blockbuilder.interval + [interval: | default = 10m] + + backoff_config: + # Minimum delay when backing off. + # CLI flag: -blockbuilder.backoff..backoff-min-period + [min_period: | default = 100ms] + + # Maximum delay when backing off. + # CLI flag: -blockbuilder.backoff..backoff-max-period + [max_period: | default = 10s] + + # Number of times to backoff and retry before failing. + # CLI flag: -blockbuilder.backoff..backoff-retries + [max_retries: | default = 10] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/controller.go b/pkg/blockbuilder/controller.go new file mode 100644 index 000000000000..f252b3d65744 --- /dev/null +++ b/pkg/blockbuilder/controller.go @@ -0,0 +1,305 @@ +package blockbuilder + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/dskit/backoff" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + + "github.com/grafana/loki/pkg/push" +) + +// [min,max) +type Offsets struct { + Min, Max int64 +} + +type Job struct { + Partition int32 + Offsets Offsets +} + +// Interface required for interacting with queue partitions. +type PartitionController interface { + Topic() string + Partition() int32 + // Returns the highest committed offset from the consumer group + HighestCommittedOffset(ctx context.Context) (int64, error) + // Returns the highest available offset in the partition + HighestPartitionOffset(ctx context.Context) (int64, error) + // Returns the earliest available offset in the partition + EarliestPartitionOffset(ctx context.Context) (int64, error) + // Commits the offset to the consumer group. + Commit(context.Context, int64) error + // Process will run load batches at a time and send them to channel, + // so it's advised to not buffer the channel for natural backpressure. + // As a convenience, it returns the last seen offset, which matches + // the final record sent on the channel. + Process(context.Context, Offsets, chan<- []AppendInput) (int64, error) + + Close() error +} + +// PartitionJobController loads a single job a time, bound to a given +// * topic +// * partition +// * offset_step_len: the number of offsets each job to contain. e.g. "10" could yield a job w / min=15, max=25 +// +// At a high level, it watches a source topic/partition (where log data is ingested) and a "committed" topic/partition. +// The "committed" partition corresponds to the offsets from the source partition which have been committed to object storage. +// In essence, the following loop is performed +// 1. load the most recent record from the "committed" partition. This contains the highest msg offset in the "source" partition +// that has been committed to object storage. We'll call that $START_POS. +// 2. Create a job with `min=$START_POS+1,end=$START_POS+1+$STEP_LEN` +// 3. Sometime later when the job has been processed, we'll commit the final processed offset from the "source" partition (which +// will be <= $END_POS) to the "committed" partition. +// +// NB(owen-d): In our case, "source" is the partition +// +// containing log data and "committed" is the consumer group +type PartitionJobController struct { + stepLen int64 + part partition.ReaderIfc + backoff backoff.Config + decoder *kafka.Decoder +} + +func NewPartitionJobController( + controller partition.ReaderIfc, + backoff backoff.Config, +) (*PartitionJobController, error) { + decoder, err := kafka.NewDecoder() + if err != nil { + return nil, err + } + return &PartitionJobController{ + stepLen: 1000, // Default step length of 1000 offsets per job + part: controller, + backoff: backoff, + decoder: decoder, + }, nil +} + +func (l *PartitionJobController) HighestCommittedOffset(ctx context.Context) (int64, error) { + return withBackoff( + ctx, + l.backoff, + func() (int64, error) { + return l.part.FetchLastCommittedOffset(ctx) + }, + ) +} + +func (l *PartitionJobController) HighestPartitionOffset(ctx context.Context) (int64, error) { + return withBackoff( + ctx, + l.backoff, + func() (int64, error) { + return l.part.FetchPartitionOffset(ctx, partition.KafkaEndOffset) + }, + ) +} + +func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (int64, error) { + return withBackoff( + ctx, + l.backoff, + func() (int64, error) { + return l.part.FetchPartitionOffset(ctx, partition.KafkaStartOffset) + }, + ) +} + +func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) { + l.part.SetOffsetForConsumption(offsets.Min) + + var ( + lastOffset = offsets.Min - 1 + boff = backoff.New(ctx, l.backoff) + err error + ) + + for boff.Ongoing() { + var records []partition.Record + records, err = l.part.Poll(ctx) + if err != nil { + boff.Wait() + continue + } + + if len(records) == 0 { + // No more records available + break + } + + // Reset backoff on successful poll + boff.Reset() + + converted := make([]AppendInput, 0, len(records)) + for _, record := range records { + offset := records[len(records)-1].Offset + if offset >= offsets.Max { + break + } + lastOffset = offset + + stream, labels, err := l.decoder.Decode(record.Content) + if err != nil { + return 0, fmt.Errorf("failed to decode record: %w", err) + } + if len(stream.Entries) == 0 { + continue + } + + converted = append(converted, AppendInput{ + tenant: record.TenantID, + labels: labels, + labelsStr: stream.Labels, + entries: stream.Entries, + }) + + select { + case ch <- converted: + case <-ctx.Done(): + return 0, ctx.Err() + } + } + } + + return lastOffset, err +} + +// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition +// Returns whether an applicable job exists, the job, and an error +func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) { + // Read the most recent committed offset + committedOffset, err := l.HighestCommittedOffset(ctx) + if err != nil { + return false, Job{}, err + } + + earliestOffset, err := l.EarliestPartitionOffset(ctx) + if err != nil { + return false, Job{}, err + } + + startOffset := committedOffset + 1 + if startOffset < earliestOffset { + startOffset = earliestOffset + } + + highestOffset, err := l.HighestPartitionOffset(ctx) + if err != nil { + return false, Job{}, err + } + if highestOffset == committedOffset { + return false, Job{}, nil + } + + // Create the job with the calculated offsets + job := Job{ + Partition: l.part.Partition(), + Offsets: Offsets{ + Min: startOffset, + Max: min(startOffset+l.stepLen, highestOffset), + }, + } + + return true, job, nil +} + +// implement a dummy controller which can be parameterized to +// deterministically simulate partitions +type dummyPartitionController struct { + topic string + partition int32 + committed int64 + highest int64 + numTenants int // number of unique tenants to simulate + streamsPerTenant int // number of streams per tenant + entriesPerOffset int // coefficient for entries per offset +} + +// used in testing +// nolint:revive +func NewDummyPartitionController(topic string, partition int32, highest int64) *dummyPartitionController { + return &dummyPartitionController{ + topic: topic, + partition: partition, + committed: 0, // always starts at zero + highest: highest, + numTenants: 2, // default number of tenants + streamsPerTenant: 2, // default streams per tenant + entriesPerOffset: 1, // default entries per offset coefficient + } +} + +func (d *dummyPartitionController) Topic() string { + return d.topic +} + +func (d *dummyPartitionController) Partition() int32 { + return d.partition +} + +func (d *dummyPartitionController) HighestCommittedOffset(_ context.Context) (int64, error) { + return d.committed, nil +} + +func (d *dummyPartitionController) HighestPartitionOffset(_ context.Context) (int64, error) { + return d.highest, nil +} + +func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error { + d.committed = offset + return nil +} + +func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) { + for i := int(offsets.Min); i < int(offsets.Max); i++ { + batch := d.createBatch(i) + select { + case <-ctx.Done(): + return int64(i - 1), ctx.Err() + case ch <- batch: + } + } + return offsets.Max - 1, nil +} + +// creates (tenants*streams) inputs +func (d *dummyPartitionController) createBatch(offset int) []AppendInput { + result := make([]AppendInput, 0, d.numTenants*d.streamsPerTenant) + for i := 0; i < d.numTenants; i++ { + tenant := fmt.Sprintf("tenant-%d", i) + for j := 0; j < d.streamsPerTenant; j++ { + lbls := labels.Labels{ + {Name: "stream", Value: fmt.Sprintf("stream-%d", j)}, + } + entries := make([]push.Entry, d.entriesPerOffset) + for k := 0; k < d.entriesPerOffset; k++ { + entries[k] = push.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("tenant=%d stream=%d line=%d offset=%d", i, j, k, offset), + } + } + result = append(result, AppendInput{ + tenant: tenant, + labels: lbls, + labelsStr: lbls.String(), + entries: entries, + }) + } + } + return result +} + +func (d *dummyPartitionController) Close() error { + return nil +} diff --git a/pkg/blockbuilder/metrics.go b/pkg/blockbuilder/metrics.go new file mode 100644 index 000000000000..31679e34f446 --- /dev/null +++ b/pkg/blockbuilder/metrics.go @@ -0,0 +1,152 @@ +package blockbuilder + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/analytics" + "github.com/grafana/loki/v3/pkg/util/constants" +) + +type SlimgesterMetrics struct { + chunkUtilization prometheus.Histogram + chunkEntries prometheus.Histogram + chunkSize prometheus.Histogram + chunkCompressionRatio prometheus.Histogram + chunksPerTenant *prometheus.CounterVec + chunkSizePerTenant *prometheus.CounterVec + chunkAge prometheus.Histogram + chunkEncodeTime prometheus.Histogram + chunksFlushFailures prometheus.Counter + chunksFlushedPerReason *prometheus.CounterVec + chunkLifespan prometheus.Histogram + chunksEncoded *prometheus.CounterVec + chunkDecodeFailures *prometheus.CounterVec + flushedChunksStats *analytics.Counter + flushedChunksBytesStats *analytics.Statistics + flushedChunksLinesStats *analytics.Statistics + flushedChunksAgeStats *analytics.Statistics + flushedChunksLifespanStats *analytics.Statistics + flushedChunksUtilizationStats *analytics.Statistics + + chunksCreatedTotal prometheus.Counter + samplesPerChunk prometheus.Histogram + blocksPerChunk prometheus.Histogram + chunkCreatedStats *analytics.Counter +} + +func NewSlimgesterMetrics(r prometheus.Registerer) *SlimgesterMetrics { + return &SlimgesterMetrics{ + chunkUtilization: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_utilization", + Help: "Distribution of stored chunk utilization (when stored).", + Buckets: prometheus.LinearBuckets(0, 0.2, 6), + }), + chunkEntries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_entries", + Help: "Distribution of stored lines per chunk (when stored).", + Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200 + }), + chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_size_bytes", + Help: "Distribution of stored chunk sizes (when stored).", + Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB) + }), + chunkCompressionRatio: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_compression_ratio", + Help: "Compression ratio of chunks (when stored).", + Buckets: prometheus.LinearBuckets(.75, 2, 10), + }), + chunksPerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunks_stored_total", + Help: "Total stored chunks per tenant.", + }, []string{"tenant"}), + chunkSizePerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_stored_bytes_total", + Help: "Total bytes stored in chunks per tenant.", + }, []string{"tenant"}), + chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_age_seconds", + Help: "Distribution of chunk ages (when stored).", + // with default settings chunks should flush between 5 min and 12 hours + // so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr + Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600}, + }), + chunkEncodeTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_encode_time_seconds", + Help: "Distribution of chunk encode times.", + // 10ms to 10s. + Buckets: prometheus.ExponentialBuckets(0.01, 4, 6), + }), + chunksFlushFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunks_flush_failures_total", + Help: "Total number of flush failures.", + }), + chunksFlushedPerReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunks_flushed_total", + Help: "Total flushed chunks per reason.", + }, []string{"reason"}), + chunkLifespan: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_bounds_hours", + Help: "Distribution of chunk end-start durations.", + // 1h -> 8hr + Buckets: prometheus.LinearBuckets(1, 1, 8), + }), + chunksEncoded: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunks_encoded_total", + Help: "The total number of chunks encoded in the ingester.", + }, []string{"user"}), + chunkDecodeFailures: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunk_decode_failures_total", + Help: "The number of freshly encoded chunks that failed to decode.", + }, []string{"user"}), + flushedChunksStats: analytics.NewCounter("slimgester_flushed_chunks"), + flushedChunksBytesStats: analytics.NewStatistics("slimgester_flushed_chunks_bytes"), + flushedChunksLinesStats: analytics.NewStatistics("slimgester_flushed_chunks_lines"), + flushedChunksAgeStats: analytics.NewStatistics( + "slimgester_flushed_chunks_age_seconds", + ), + flushedChunksLifespanStats: analytics.NewStatistics( + "slimgester_flushed_chunks_lifespan_seconds", + ), + flushedChunksUtilizationStats: analytics.NewStatistics( + "slimgester_flushed_chunks_utilization", + ), + chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "slimgester_chunks_created_total", + Help: "The total number of chunks created in the ingester.", + }), + samplesPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "slimgester", + Name: "samples_per_chunk", + Help: "The number of samples in a chunk.", + + Buckets: prometheus.LinearBuckets(4096, 2048, 6), + }), + blocksPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "slimgester", + Name: "blocks_per_chunk", + Help: "The number of blocks in a chunk.", + + Buckets: prometheus.ExponentialBuckets(5, 2, 6), + }), + + chunkCreatedStats: analytics.NewCounter("slimgester_chunk_created"), + } +} diff --git a/pkg/blockbuilder/pipeline.go b/pkg/blockbuilder/pipeline.go new file mode 100644 index 000000000000..494763d8c83f --- /dev/null +++ b/pkg/blockbuilder/pipeline.go @@ -0,0 +1,99 @@ +package blockbuilder + +import ( + "context" + + "github.com/grafana/dskit/multierror" + "golang.org/x/sync/errgroup" +) + +type stage struct { + name string + parallelism int + grp *errgroup.Group + ctx context.Context + fn func(context.Context) error + cleanup func(context.Context) error // optional; will be called once the underlying group returns +} + +// pipeline is a sequence of n different stages. +type pipeline struct { + ctx context.Context // base context + // we use a separate errgroup for stage dispatch/collection + // and inherit stage-specific groups from this ctx to + // propagate cancellation + grp *errgroup.Group + stages []stage +} + +func newPipeline(ctx context.Context) *pipeline { + stagesGrp, ctx := errgroup.WithContext(ctx) + return &pipeline{ + ctx: ctx, + grp: stagesGrp, + } +} + +func (p *pipeline) AddStageWithCleanup( + name string, + parallelism int, + fn func(context.Context) error, + cleanup func(context.Context) error, +) { + grp, ctx := errgroup.WithContext(p.ctx) + p.stages = append(p.stages, stage{ + name: name, + parallelism: parallelism, + fn: fn, + cleanup: cleanup, + ctx: ctx, + grp: grp, + }) +} + +func (p *pipeline) AddStage( + name string, + parallelism int, + fn func(context.Context) error, +) { + p.AddStageWithCleanup(name, parallelism, fn, nil) +} + +func (p *pipeline) Run() error { + + for i := range p.stages { + // we're using this in subsequent async closures; + // assign it directly in-loop + s := p.stages[i] + + // spin up n workers for each stage using that stage's + // error group. + for j := 0; j < s.parallelism; j++ { + s.grp.Go(func() error { + return s.fn(s.ctx) + }) + } + + // Using the pipeline's err group, await the stage finish, + // calling any necessary cleanup fn + // NB: by using the pipeline's errgroup here, we propagate + // failures to downstream stage contexts, so once a single stage + // fails, the others will be notified. + p.grp.Go(func() error { + var errs multierror.MultiError + errs.Add(s.grp.Wait()) + if s.cleanup != nil { + // NB: we use the pipeline's context for the cleanup call b/c + // the stage's context is cancelled once `Wait` returns. + // That's ok. cleanup is always called for a relevant stage + // and just needs to know if _other_ stages failed at this point + errs.Add(s.cleanup(p.ctx)) + } + + return errs.Err() + }) + } + + // finish all stages + return p.grp.Wait() +} diff --git a/pkg/blockbuilder/pipeline_test.go b/pkg/blockbuilder/pipeline_test.go new file mode 100644 index 000000000000..9ec69d2006eb --- /dev/null +++ b/pkg/blockbuilder/pipeline_test.go @@ -0,0 +1,87 @@ +package blockbuilder + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +type testStage struct { + parallelism int + fn func(context.Context) error + cleanup func(context.Context) error +} + +func TestPipeline(t *testing.T) { + tests := []struct { + name string + stages []testStage + expectedErr error + }{ + { + name: "single stage success", + stages: []testStage{ + { + parallelism: 1, + fn: func(_ context.Context) error { + return nil + }, + }, + }, + }, + { + name: "multiple stages success", + stages: []testStage{ + { + parallelism: 2, + fn: func(_ context.Context) error { + return nil + }, + }, + { + parallelism: 1, + fn: func(_ context.Context) error { + return nil + }, + }, + }, + }, + { + name: "stage error propagates", + stages: []testStage{ + { + parallelism: 1, + fn: func(_ context.Context) error { + return errors.New("stage error") + }, + }, + }, + expectedErr: errors.New("stage error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := newPipeline(context.Background()) + + for i, stage := range tt.stages { + if stage.cleanup != nil { + p.AddStageWithCleanup(fmt.Sprint(i), stage.parallelism, stage.fn, stage.cleanup) + } else { + p.AddStage(fmt.Sprint(i), stage.parallelism, stage.fn) + } + } + + err := p.Run() + if tt.expectedErr != nil { + require.Error(t, err) + require.Equal(t, tt.expectedErr.Error(), err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/blockbuilder/plan.txt b/pkg/blockbuilder/plan.txt new file mode 100644 index 000000000000..a3a80a735fa4 --- /dev/null +++ b/pkg/blockbuilder/plan.txt @@ -0,0 +1,34 @@ +# Purpose +blockbuilder is responsible for consuming ingested data in the queue (kafka, etc) and writing it in an optimized form to long term storage. While this should always remain true, it can be built and iterated upon in phases. First, let's look at the simplest possible architecture: + +* [interface] loads "jobs": partitions+offset ranges in kafka +* For each job, process data, building the storage format +* [interface] Upon completion (inc flushing to storage), commit work + * e.g. update consumer group processed offset in kafka +* consumes + +# First Impl: Alongside existing multi-zone ingester writers +Goal: modify ingester architecture towards RF1, but don't actually write to storage yet, b/c we haven't solved coordinating interim reads/writes. +Deliverable: RF1 metrics proof +* run replicas==partitions (from ingesters) +* run every $INTERVAL (5m?), +* slim down ingester write path + * remove disk (all WALs). + * ignore limits if too complex (for now) + * /dev/null backend + + +# TODO improvements +* metadata store + * include offsets committed for coordination b/w ingester-readers & long term storage +* planner/scheduler+worker architecture +* shuffle sharding + + +# Things to solve +* limits application +* job sizing -- coordinate kafka offsets w/ underlying bytes added? + * ideally we can ask kafka for "the next 1GB" in a partition, but to do this we'd need the kafka offsets (auto-incremented integers for messages within a partition) to be derived from the message size. Right now, different batch sizes can cause kafka msgs to have very different sizes. + * idea: another set of partitions to store offsets->datasize? Sounds shitty tbh & breaks the consistency bound on writes (what if kafka acks first write but doesnt ack the second?) + * what if we stored byte counter metadata in kafka records so we could O(log(n)) seek an offset range w/ the closest $SIZE + * Likely a reasonable perf tradeoff as this isn't called often (only in job planner in the future). \ No newline at end of file diff --git a/pkg/blockbuilder/slimgester.go b/pkg/blockbuilder/slimgester.go new file mode 100644 index 000000000000..705b47444a6f --- /dev/null +++ b/pkg/blockbuilder/slimgester.go @@ -0,0 +1,811 @@ +package blockbuilder + +import ( + "bytes" + "context" + "flag" + "fmt" + "math" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" + "github.com/grafana/loki/v3/pkg/ingester" + "github.com/grafana/loki/v3/pkg/storage/chunk" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util" + "github.com/grafana/loki/v3/pkg/util/flagext" + util_log "github.com/grafana/loki/v3/pkg/util/log" + + "github.com/grafana/loki/pkg/push" +) + +const ( + flushReasonFull = "full" + flushReasonMaxAge = "max_age" + onePointFiveMB = 3 << 19 +) + +type Config struct { + ConcurrentFlushes int `yaml:"concurrent_flushes"` + ConcurrentWriters int `yaml:"concurrent_writers"` + BlockSize flagext.ByteSize `yaml:"chunk_block_size"` + TargetChunkSize flagext.ByteSize `yaml:"chunk_target_size"` + ChunkEncoding string `yaml:"chunk_encoding"` + parsedEncoding compression.Codec `yaml:"-"` // placeholder for validated encoding + MaxChunkAge time.Duration `yaml:"max_chunk_age"` + Interval time.Duration `yaml:"interval"` + Backoff backoff.Config `yaml:"backoff_config"` +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.ConcurrentFlushes, prefix+"concurrent-flushes", 1, "How many flushes can happen concurrently") + f.IntVar(&cfg.ConcurrentWriters, prefix+"concurrent-writers", 1, "How many workers to process writes, defaults to number of available cpus") + _ = cfg.BlockSize.Set("256KB") + f.Var(&cfg.BlockSize, prefix+"chunks-block-size", "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") + _ = cfg.TargetChunkSize.Set(fmt.Sprint(onePointFiveMB)) + f.Var(&cfg.TargetChunkSize, prefix+"chunk-target-size", "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") + f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs())) + f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.") + f.DurationVar(&cfg.Interval, prefix+"interval", 10*time.Minute, "The interval at which to run.") + cfg.Backoff.RegisterFlagsWithPrefix(prefix+"backoff.", f) +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(flags *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("blockbuilder.", flags) +} + +func (cfg *Config) Validate() error { + enc, err := compression.ParseCodec(cfg.ChunkEncoding) + if err != nil { + return err + } + cfg.parsedEncoding = enc + return nil +} + +// BlockBuilder is a slimmed-down version of the ingester, intended to +// ingest logs without WALs. Broadly, it accumulates logs into per-tenant chunks in the same way the existing ingester does, +// without a WAL. Index (TSDB) creation is also not an out-of-band procedure and must be called directly. In essence, this +// allows us to buffer data, flushing chunks to storage as necessary, and then when ready to commit this, relevant TSDBs (one per period) are created and flushed to storage. This allows an external caller to prepare a batch of data, build relevant chunks+indices, ensure they're flushed, and then return. As long as chunk+index creation is deterministic, this operation is also +// idempotent, making retries simple and impossible to introduce duplicate data. +// It contains the following methods: +// - `Append(context.Context, logproto.PushRequest) error` +// Adds a push request to ingested data. May flush existing chunks when they're full/etc. +// - `Commit(context.Context) error` +// Serializes (cuts) any buffered data into chunks, flushes them to storage, then creates + flushes TSDB indices +// containing all chunk references. Finally, clears internal state. +type BlockBuilder struct { + services.Service + + id string + cfg Config + periodicConfigs []config.PeriodConfig + + metrics *SlimgesterMetrics + logger log.Logger + + store stores.ChunkWriter + objStore *MultiStore + jobController *PartitionJobController +} + +func NewBlockBuilder( + id string, + cfg Config, + periodicConfigs []config.PeriodConfig, + store stores.ChunkWriter, + objStore *MultiStore, + logger log.Logger, + reg prometheus.Registerer, + jobController *PartitionJobController, +) (*BlockBuilder, + error) { + i := &BlockBuilder{ + id: id, + cfg: cfg, + periodicConfigs: periodicConfigs, + metrics: NewSlimgesterMetrics(reg), + logger: logger, + store: store, + objStore: objStore, + jobController: jobController, + } + + i.Service = services.NewBasicService(nil, i.running, nil) + return i, nil +} + +func (i *BlockBuilder) running(ctx context.Context) error { + ticker := time.NewTicker(i.cfg.Interval) + defer ticker.Stop() + + // run once in beginning + select { + case <-ctx.Done(): + return nil + default: + _, err := i.runOne(ctx) + if err != nil { + return err + } + } + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + skipped, err := i.runOne(ctx) + level.Info(i.logger).Log( + "msg", "completed block builder run", "skipped", + "skipped", skipped, + "err", err, + ) + if err != nil { + return err + } + } + } +} + +// runOne performs a single +func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) { + + exists, job, err := i.jobController.LoadJob(ctx) + if err != nil { + return false, err + } + + if !exists { + level.Info(i.logger).Log("msg", "no available job to process") + return true, nil + } + + logger := log.With( + i.logger, + "partition", job.Partition, + "job_min_offset", job.Offsets.Min, + "job_max_offset", job.Offsets.Max, + ) + + level.Debug(logger).Log("msg", "beginning job") + + indexer := newTsdbCreator() + appender := newAppender(i.id, + i.cfg, + i.periodicConfigs, + i.store, + i.objStore, + logger, + i.metrics, + ) + + var lastOffset int64 + p := newPipeline(ctx) + + // Pipeline stage 1: Process the job offsets and write records to inputCh + // This stage reads from the partition and feeds records into the input channel + // When complete, it stores the last processed offset and closes the channel + inputCh := make(chan []AppendInput) + p.AddStageWithCleanup( + "load records", + 1, + func(ctx context.Context) error { + lastOffset, err = i.jobController.Process(ctx, job.Offsets, inputCh) + return err + }, + func(ctx context.Context) error { + level.Debug(logger).Log( + "msg", "finished loading records", + "ctx_error", ctx.Err(), + ) + close(inputCh) + return nil + }, + ) + + // Stage 2: Process input records and generate chunks + // This stage receives AppendInput batches, appends them to appropriate instances, + // and forwards any cut chunks to the chunks channel for flushing. + // ConcurrentWriters workers process inputs in parallel to maximize throughput. + flush := make(chan *chunk.Chunk) + p.AddStageWithCleanup( + "appender", + i.cfg.ConcurrentWriters, + func(ctx context.Context) error { + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case inputs, ok := <-inputCh: + // inputs are finished; we're done + if !ok { + return nil + } + + for _, input := range inputs { + cut, err := appender.Append(ctx, input) + if err != nil { + level.Error(logger).Log("msg", "failed to append records", "err", err) + return err + } + + for _, chk := range cut { + select { + case <-ctx.Done(): + return ctx.Err() + case flush <- chk: + } + } + } + } + } + }, + func(ctx context.Context) (err error) { + defer func() { + level.Debug(logger).Log( + "msg", "finished appender", + "err", err, + "ctx_error", ctx.Err(), + ) + }() + defer close(flush) + + // once we're done appending, cut all remaining chunks. + chks, err := appender.CutRemainingChunks(ctx) + if err != nil { + return err + } + + for _, chk := range chks { + select { + case <-ctx.Done(): + return ctx.Err() + case flush <- chk: + } + } + return nil + }, + ) + + // Stage 3: Flush chunks to storage + // This stage receives chunks from the chunks channel and flushes them to storage + // using ConcurrentFlushes workers for parallel processing + p.AddStage( + "flusher", + i.cfg.ConcurrentFlushes, + func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case chk, ok := <-flush: + if !ok { + return nil + } + if _, err := withBackoff( + ctx, + i.cfg.Backoff, // retry forever + func() (res struct{}, err error) { + err = i.store.PutOne(ctx, chk.From, chk.Through, *chk) + if err != nil { + i.metrics.chunksFlushFailures.Inc() + return + } + appender.reportFlushedChunkStatistics(chk) + + // write flushed chunk to index + approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10)) + meta := index.ChunkMeta{ + Checksum: chk.ChunkRef.Checksum, + MinTime: int64(chk.ChunkRef.From), + MaxTime: int64(chk.ChunkRef.Through), + KB: uint32(approxKB), + Entries: uint32(chk.Data.Entries()), + } + err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta}) + return + }, + ); err != nil { + return err + } + } + } + }, + ) + + err = p.Run() + level.Debug(logger).Log( + "msg", "finished chunk creation", + "err", err, + ) + if err != nil { + return false, err + } + + var ( + nodeName = i.id + tableRanges = config.GetIndexStoreTableRanges(types.TSDBType, i.periodicConfigs) + ) + + built, err := indexer.create(ctx, nodeName, tableRanges) + if err != nil { + return false, err + } + + for _, db := range built { + u := newUploader(i.objStore) + if err := u.Put(ctx, db); err != nil { + level.Error(util_log.Logger).Log( + "msg", "failed to upload tsdb", + "path", db.id.Path(), + ) + + return false, err + } + + level.Debug(logger).Log( + "msg", "uploaded tsdb", + "name", db.id.Name(), + ) + } + + if lastOffset <= job.Offsets.Min { + return false, nil + } + + if err = i.jobController.part.Commit(ctx, lastOffset); err != nil { + level.Error(logger).Log( + "msg", "failed to commit offset", + "last_offset", lastOffset, + "err", err, + ) + return false, err + } + + // log success + level.Info(logger).Log( + "msg", "successfully processed and committed batch", + "last_offset", lastOffset, + ) + + return false, nil +} + +type Appender struct { + id string + cfg Config + periodicConfigs []config.PeriodConfig + + metrics *SlimgesterMetrics + logger log.Logger + + instances map[string]*instance + instancesMtx sync.RWMutex + + store stores.ChunkWriter + objStore *MultiStore +} + +// Writer is a single use construct for building chunks +// for from a set of records. It's an independent struct to ensure its +// state is not reused across jobs. +func newAppender( + id string, + cfg Config, + periodicConfigs []config.PeriodConfig, + store stores.ChunkWriter, + objStore *MultiStore, + logger log.Logger, + metrics *SlimgesterMetrics, +) *Appender { + return &Appender{ + id: id, + cfg: cfg, + periodicConfigs: periodicConfigs, + metrics: metrics, + logger: logger, + instances: make(map[string]*instance), + store: store, + objStore: objStore, + } +} + +// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process. +func (w *Appender) reportFlushedChunkStatistics( + ch *chunk.Chunk, +) { + byt, err := ch.Encoded() + if err != nil { + level.Error(w.logger).Log("msg", "failed to encode flushed wire chunk", "err", err) + return + } + sizePerTenant := w.metrics.chunkSizePerTenant.WithLabelValues(ch.UserID) + countPerTenant := w.metrics.chunksPerTenant.WithLabelValues(ch.UserID) + + reason := flushReasonFull + from, through := ch.From.Time(), ch.Through.Time() + if through.Sub(from) > w.cfg.MaxChunkAge { + reason = flushReasonMaxAge + } + + w.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1) + + compressedSize := float64(len(byt)) + uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data) + + if ok && compressedSize > 0 { + w.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) + } + + utilization := ch.Data.Utilization() + w.metrics.chunkUtilization.Observe(utilization) + + numEntries := ch.Data.Entries() + w.metrics.chunkEntries.Observe(float64(numEntries)) + w.metrics.chunkSize.Observe(compressedSize) + sizePerTenant.Add(compressedSize) + countPerTenant.Inc() + + w.metrics.chunkAge.Observe(time.Since(from).Seconds()) + w.metrics.chunkLifespan.Observe(through.Sub(from).Hours()) + + w.metrics.flushedChunksBytesStats.Record(compressedSize) + w.metrics.flushedChunksLinesStats.Record(float64(numEntries)) + w.metrics.flushedChunksUtilizationStats.Record(utilization) + w.metrics.flushedChunksAgeStats.Record(time.Since(from).Seconds()) + w.metrics.flushedChunksLifespanStats.Record(through.Sub(from).Seconds()) + w.metrics.flushedChunksStats.Inc(1) +} + +func (w *Appender) CutRemainingChunks(ctx context.Context) ([]*chunk.Chunk, error) { + var chunks []*chunk.Chunk + w.instancesMtx.Lock() + defer w.instancesMtx.Unlock() + + for _, inst := range w.instances { + + // wrap in anonymous fn to make lock release more straightforward + if err := func() error { + inst.streams.mtx.Lock() + defer inst.streams.mtx.Unlock() + + for _, stream := range inst.streams.byLabels { + + // wrap in anonymous fn to make lock release more straightforward + if err := func() error { + stream.chunkMtx.Lock() + defer stream.chunkMtx.Unlock() + if stream.chunk != nil { + cut, err := stream.closeChunk() + if err != nil { + return err + } + encoded, err := inst.encodeChunk(ctx, stream, cut) + if err != nil { + return err + } + chunks = append(chunks, encoded) + } + return nil + + }(); err != nil { + return err + } + + } + return nil + + }(); err != nil { + return nil, err + } + + } + + return chunks, nil +} + +type AppendInput struct { + tenant string + // both labels & labelsStr are populated to prevent duplicating conversion work in multiple places + labels labels.Labels + labelsStr string + entries []push.Entry +} + +func (w *Appender) Append(ctx context.Context, input AppendInput) ([]*chunk.Chunk, error) { + // use rlock so multiple appends can be called on same instance. + // re-check after using regular lock if it didnt exist. + w.instancesMtx.RLock() + inst, ok := w.instances[input.tenant] + w.instancesMtx.RUnlock() + if !ok { + w.instancesMtx.Lock() + inst, ok = w.instances[input.tenant] + if !ok { + inst = newInstance(w.cfg, input.tenant, w.metrics, w.periodicConfigs, w.logger) + w.instances[input.tenant] = inst + } + w.instancesMtx.Unlock() + } + + closed, err := inst.Push(ctx, input) + return closed, err +} + +// instance is a slimmed down version from the ingester pkg +type instance struct { + cfg Config + tenant string + buf []byte // buffer used to compute fps. + mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free + metrics *SlimgesterMetrics + streams *streamsMap + logger log.Logger + + periods []config.PeriodConfig +} + +func newInstance( + cfg Config, + tenant string, + metrics *SlimgesterMetrics, + periods []config.PeriodConfig, + logger log.Logger, +) *instance { + streams := newStreamsMap() + return &instance{ + cfg: cfg, + tenant: tenant, + buf: make([]byte, 0, 1024), + mapper: ingester.NewFPMapper(streams.getLabelsFromFingerprint), + metrics: metrics, + streams: streams, + logger: logger, + periods: periods, + } +} + +func newStreamsMap() *streamsMap { + return &streamsMap{ + byLabels: make(map[string]*stream), + byFp: make(map[model.Fingerprint]*stream), + } +} + +type streamsMap struct { + // labels -> stream + byLabels map[string]*stream + byFp map[model.Fingerprint]*stream + mtx sync.RWMutex +} + +// For performs an operation on an existing stream, creating it if it wasn't previously present. +func (m *streamsMap) For( + ls string, + createFn func() (*stream, error), + fn func(*stream) error, +) error { + // first use read lock in case the stream exists + m.mtx.RLock() + if s, ok := m.byLabels[ls]; ok { + err := fn(s) + m.mtx.RUnlock() + return err + } + m.mtx.RUnlock() + + // Stream wasn't found, acquire write lock to create it + m.mtx.Lock() + defer m.mtx.Unlock() + + // Double check it wasn't created while we were upgrading the lock + if s, ok := m.byLabels[ls]; ok { + return fn(s) + } + + // Create new stream + s, err := createFn() + if err != nil { + return err + } + + m.byLabels[ls] = s + m.byFp[s.fp] = s + return fn(s) +} + +// Return labels associated with given fingerprint. Used by fingerprint mapper. +func (m *streamsMap) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels { + + if s, ok := m.byFp[fp]; ok { + return s.ls + } + return nil +} + +func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint { + var fp uint64 + fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...) + return i.mapper.MapFP(model.Fingerprint(fp), ls) +} + +// Push will iterate over the given streams present in the PushRequest and attempt to store them. +func (i *instance) Push( + ctx context.Context, + input AppendInput, +) (closed []*chunk.Chunk, err error) { + err = i.streams.For( + input.labelsStr, + func() (*stream, error) { + fp := i.getHashForLabels(input.labels) + return newStream(fp, input.labels, i.cfg, i.metrics), nil + }, + func(stream *stream) error { + xs, err := stream.Push(input.entries) + if err != nil { + return err + } + + if len(xs) > 0 { + for _, x := range xs { + // encodeChunk mutates the chunk so we must pass by reference + chk, err := i.encodeChunk(ctx, stream, x) + if err != nil { + return err + } + closed = append(closed, chk) + } + } + return err + }, + ) + + return closed, err +} + +// encodeChunk encodes a chunk.Chunk. +func (i *instance) encodeChunk(ctx context.Context, stream *stream, mc *chunkenc.MemChunk) (*chunk.Chunk, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + start := time.Now() + + firstTime, lastTime := util.RoundToMilliseconds(mc.Bounds()) + chk := chunk.NewChunk( + i.tenant, stream.fp, stream.ls, + chunkenc.NewFacade(mc, stream.blockSize, stream.targetChunkSize), + firstTime, + lastTime, + ) + + chunkBytesSize := mc.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header + if err := chk.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize)), i.logger); err != nil { + if !errors.Is(err, chunk.ErrChunkDecode) { + return nil, fmt.Errorf("chunk encoding: %w", err) + } + + i.metrics.chunkDecodeFailures.WithLabelValues(chk.UserID).Inc() + } + i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds()) + i.metrics.chunksEncoded.WithLabelValues(chk.UserID).Inc() + return &chk, nil +} + +type stream struct { + fp model.Fingerprint + ls labels.Labels + + chunkFormat byte + codec compression.Codec + blockSize int + targetChunkSize int + + chunkMtx sync.RWMutex + chunk *chunkenc.MemChunk + metrics *SlimgesterMetrics +} + +func newStream(fp model.Fingerprint, ls labels.Labels, cfg Config, metrics *SlimgesterMetrics) *stream { + return &stream{ + fp: fp, + ls: ls, + + chunkFormat: chunkenc.ChunkFormatV4, + codec: cfg.parsedEncoding, + blockSize: cfg.BlockSize.Val(), + targetChunkSize: cfg.TargetChunkSize.Val(), + + metrics: metrics, + } +} + +func (s *stream) Push(entries []push.Entry) (closed []*chunkenc.MemChunk, err error) { + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() + + if s.chunk == nil { + s.chunk = s.NewChunk() + } + + // bytesAdded, err := s.storeEntries(ctx, toStore, usageTracker) + for i := 0; i < len(entries); i++ { + + // cut the chunk if the new addition overflows target size + if !s.chunk.SpaceFor(&entries[i]) { + cut, err := s.closeChunk() + if err != nil { + return nil, err + } + closed = append(closed, cut) + } + + if _, err = s.chunk.Append(&entries[i]); err != nil { + return closed, errors.Wrap(err, "appending entry") + } + } + + return closed, nil +} + +func (s *stream) closeChunk() (*chunkenc.MemChunk, error) { + if err := s.chunk.Close(); err != nil { + return nil, errors.Wrap(err, "closing chunk") + } + + s.metrics.samplesPerChunk.Observe(float64(s.chunk.Size())) + s.metrics.blocksPerChunk.Observe(float64(s.chunk.BlockCount())) + s.metrics.chunksCreatedTotal.Inc() + s.metrics.chunkCreatedStats.Inc(1) + + // add a chunk + res := s.chunk + s.chunk = s.NewChunk() + return res, nil +} + +func (s *stream) NewChunk() *chunkenc.MemChunk { + return chunkenc.NewMemChunk( + s.chunkFormat, + s.codec, + chunkenc.ChunkHeadFormatFor(s.chunkFormat), + s.blockSize, + s.targetChunkSize, + ) +} + +func withBackoff[T any]( + ctx context.Context, + config backoff.Config, + fn func() (T, error), +) (T, error) { + var zero T + + var boff = backoff.New(ctx, config) + for boff.Ongoing() { + res, err := fn() + if err != nil { + boff.Wait() + continue + } + return res, nil + } + + return zero, boff.ErrCause() +} diff --git a/pkg/blockbuilder/storage.go b/pkg/blockbuilder/storage.go new file mode 100644 index 000000000000..2815b9b97ad8 --- /dev/null +++ b/pkg/blockbuilder/storage.go @@ -0,0 +1,154 @@ +package blockbuilder + +import ( + "context" + "fmt" + "io" + "sort" + + "github.com/opentracing/opentracing-go" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/storage" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/config" +) + +type MultiStore struct { + stores []*storeEntry + storageConfig storage.Config +} + +type storeEntry struct { + start model.Time + cfg config.PeriodConfig + objectClient client.ObjectClient +} + +var _ client.ObjectClient = (*MultiStore)(nil) + +func NewMultiStore( + periodicConfigs []config.PeriodConfig, + storageConfig storage.Config, + clientMetrics storage.ClientMetrics, +) (*MultiStore, error) { + store := &MultiStore{ + storageConfig: storageConfig, + } + // sort by From time + sort.Slice(periodicConfigs, func(i, j int) bool { + return periodicConfigs[i].From.Time.Before(periodicConfigs[j].From.Time) + }) + for _, periodicConfig := range periodicConfigs { + objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, "storage-rf1", storageConfig, clientMetrics) + if err != nil { + return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err) + } + store.stores = append(store.stores, &storeEntry{ + start: periodicConfig.From.Time, + cfg: periodicConfig, + objectClient: objectClient, + }) + } + return store, nil +} + +func (m *MultiStore) GetStoreFor(ts model.Time) (client.ObjectClient, error) { + // find the schema with the lowest start _after_ tm + j := sort.Search(len(m.stores), func(j int) bool { + return m.stores[j].start > ts + }) + + // reduce it by 1 because we want a schema with start <= tm + j-- + + if 0 <= j && j < len(m.stores) { + return m.stores[j].objectClient, nil + } + + // should in theory never happen + return nil, fmt.Errorf("no store found for timestamp %s", ts) +} + +func (m *MultiStore) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return client.ObjectAttributes{}, err + } + return s.GetAttributes(ctx, objectKey) +} + +func (m *MultiStore) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return false, err + } + return s.ObjectExists(ctx, objectKey) +} + +func (m *MultiStore) PutObject(ctx context.Context, objectKey string, object io.Reader) error { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return err + } + return s.PutObject(ctx, objectKey, object) +} + +func (m *MultiStore) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, 0, err + } + return s.GetObject(ctx, objectKey) +} + +func (m *MultiStore) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "GetObjectRange") + if sp != nil { + sp.LogKV("objectKey", objectKey, "off", off, "length", length) + } + defer sp.Finish() + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, err + } + return s.GetObjectRange(ctx, objectKey, off, length) +} + +func (m *MultiStore) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, nil, err + } + return s.List(ctx, prefix, delimiter) +} + +func (m *MultiStore) DeleteObject(ctx context.Context, objectKey string) error { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return err + } + return s.DeleteObject(ctx, objectKey) +} + +func (m *MultiStore) IsObjectNotFoundErr(err error) bool { + s, _ := m.GetStoreFor(model.Now()) + if s == nil { + return false + } + return s.IsObjectNotFoundErr(err) +} + +func (m *MultiStore) IsRetryableErr(err error) bool { + s, _ := m.GetStoreFor(model.Now()) + if s == nil { + return false + } + return s.IsRetryableErr(err) +} + +func (m *MultiStore) Stop() { + for _, s := range m.stores { + s.objectClient.Stop() + } +} diff --git a/pkg/blockbuilder/storage_test.go b/pkg/blockbuilder/storage_test.go new file mode 100644 index 000000000000..8fc6b237e132 --- /dev/null +++ b/pkg/blockbuilder/storage_test.go @@ -0,0 +1,37 @@ +package blockbuilder + +import ( + "os" + "testing" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/storage" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" + "github.com/grafana/loki/v3/pkg/storage/config" +) + +var metrics *storage.ClientMetrics + +func NewTestStorage(t testing.TB) (*MultiStore, error) { + if metrics == nil { + m := storage.NewClientMetrics() + metrics = &m + } + dir := t.TempDir() + t.Cleanup(func() { + os.RemoveAll(dir) + metrics.Unregister() + }) + cfg := storage.Config{ + FSConfig: local.FSConfig{ + Directory: dir, + }, + } + return NewMultiStore([]config.PeriodConfig{ + { + From: config.DayTime{Time: model.Now()}, + ObjectType: "filesystem", + }, + }, cfg, *metrics) +} diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go new file mode 100644 index 000000000000..8af463fcd27d --- /dev/null +++ b/pkg/blockbuilder/tsdb.go @@ -0,0 +1,328 @@ +package blockbuilder + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/compression" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + util_log "github.com/grafana/loki/v3/pkg/util/log" +) + +// TsdbCreator accepts writes and builds TSDBs. +type TsdbCreator struct { + // Function to build a TSDB from the current state + + mtx sync.RWMutex + shards int + heads *tenantHeads +} + +// new creates a new HeadManager +func newTsdbCreator() *TsdbCreator { + m := &TsdbCreator{ + shards: 1 << 5, // 32 shards + } + m.reset() + + return m +} + +// reset updates heads +func (m *TsdbCreator) reset() { + m.heads = newTenantHeads(m.shards) +} + +// Append adds a new series for the given user +func (m *TsdbCreator) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) error { + m.mtx.RLock() + defer m.mtx.RUnlock() + + // TODO(owen-d): safe to remove? + // Remove __name__="logs" as it's not needed in TSDB + b := labels.NewBuilder(ls) + b.Del(labels.MetricName) + ls = b.Labels() + + // Just append to heads, no WAL needed + m.heads.Append(userID, ls, fprint, chks) + return nil +} + +type chunkInfo struct { + chunkMetas index.ChunkMetas + tsdbFormat int +} + +type tsdbWithID struct { + bucket model.Time + data []byte + id tsdb.Identifier +} + +// Create builds a TSDB from the current state using the provided mkTsdb function +func (m *TsdbCreator) create(ctx context.Context, nodeName string, tableRanges []config.TableRange) ([]tsdbWithID, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + type key struct { + bucket model.Time + prefix string + } + periods := make(map[key]*tsdb.Builder) + + if err := m.heads.forAll( + func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error { + // chunks may overlap index period bounds, in which case they're written to multiple + pds := make(map[key]chunkInfo) + for _, chk := range chks { + idxBuckets := tsdb.IndexBuckets(chk.From(), chk.Through(), tableRanges) + + for _, bucket := range idxBuckets { + k := key{ + bucket: bucket.BucketStart, + prefix: bucket.Prefix, + } + chkinfo := pds[k] + chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk) + chkinfo.tsdbFormat = bucket.TsdbFormat + pds[k] = chkinfo + } + } + + // Embed the tenant label into TSDB + lb := labels.NewBuilder(ls) + lb.Set(index.TenantLabel, user) + withTenant := lb.Labels() + + // Add the chunks to all relevant builders + for pd, chkinfo := range pds { + matchingChks := chkinfo.chunkMetas + b, ok := periods[pd] + if !ok { + b = tsdb.NewBuilder(chkinfo.tsdbFormat) + periods[pd] = b + } + + b.AddSeries( + withTenant, + // use the fingerprint without the added tenant label + // so queries route to the chunks which actually exist. + model.Fingerprint(fp), + matchingChks, + ) + } + + return nil + }, + ); err != nil { + level.Error(util_log.Logger).Log("err", err.Error(), "msg", "building TSDB") + return nil, err + } + + now := time.Now() + res := make([]tsdbWithID, 0, len(periods)) + + for p, b := range periods { + + level.Debug(util_log.Logger).Log( + "msg", "building tsdb for period", + "pd", p, + ) + + // build+move tsdb to multitenant dir + start := time.Now() + dst, data, err := b.BuildInMemory( + ctx, + func(_, _ model.Time, _ uint32) tsdb.Identifier { + return tsdb.NewPrefixedIdentifier( + tsdb.MultitenantTSDBIdentifier{ + NodeName: nodeName, + Ts: now, + }, + p.prefix, + "", + ) + }, + ) + + if err != nil { + return nil, err + } + + level.Debug(util_log.Logger).Log( + "msg", "finished building tsdb for period", + "pd", p, + "dst", dst.Path(), + "duration", time.Since(start), + ) + res = append(res, tsdbWithID{ + bucket: p.bucket, + id: dst, + data: data, + }) + } + + m.reset() + return res, nil +} + +// tenantHeads manages per-tenant series +type tenantHeads struct { + shards int + locks []sync.RWMutex + tenants []map[string]*Head +} + +func newTenantHeads(shards int) *tenantHeads { + t := &tenantHeads{ + shards: shards, + locks: make([]sync.RWMutex, shards), + tenants: make([]map[string]*Head, shards), + } + for i := range t.tenants { + t.tenants[i] = make(map[string]*Head) + } + return t +} + +func (t *tenantHeads) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) { + head := t.getOrCreateTenantHead(userID) + head.Append(ls, fprint, chks) +} + +func (t *tenantHeads) getOrCreateTenantHead(userID string) *Head { + idx := t.shardForTenant(userID) + mtx := &t.locks[idx] + + // Fast path: return existing head + mtx.RLock() + head, ok := t.tenants[idx][userID] + mtx.RUnlock() + if ok { + return head + } + + // Slow path: create new head + mtx.Lock() + defer mtx.Unlock() + + head, ok = t.tenants[idx][userID] + if !ok { + head = NewHead(userID) + t.tenants[idx][userID] = head + } + return head +} + +func (t *tenantHeads) shardForTenant(userID string) uint64 { + return xxhash.Sum64String(userID) & uint64(t.shards-1) +} + +// forAll iterates through all series in all tenant heads +func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error) error { + for i, shard := range t.tenants { + t.locks[i].RLock() + defer t.locks[i].RUnlock() + + for user, tenant := range shard { + if err := tenant.forAll(func(ls labels.Labels, fp uint64, chks index.ChunkMetas) error { + return fn(user, ls, fp, chks) + }); err != nil { + return err + } + } + } + return nil +} + +// Head manages series for a single tenant +type Head struct { + userID string + series map[uint64]*series + mtx sync.RWMutex +} + +type series struct { + labels labels.Labels + chks []index.ChunkMeta +} + +func NewHead(userID string) *Head { + return &Head{ + userID: userID, + series: make(map[uint64]*series), + } +} + +func (h *Head) Append(ls labels.Labels, fp uint64, chks index.ChunkMetas) { + h.mtx.Lock() + defer h.mtx.Unlock() + + s, ok := h.series[fp] + if !ok { + s = &series{labels: ls} + h.series[fp] = s + } + s.chks = append(s.chks, chks...) +} + +func (h *Head) forAll(fn func(ls labels.Labels, fp uint64, chks index.ChunkMetas) error) error { + h.mtx.RLock() + defer h.mtx.RUnlock() + + for fp, s := range h.series { + if err := fn(s.labels, fp, s.chks); err != nil { + return err + } + } + return nil +} + +type uploader struct { + store *MultiStore +} + +func newUploader(store *MultiStore) *uploader { + return &uploader{store: store} +} + +func (u *uploader) Put(ctx context.Context, db tsdbWithID) error { + client, err := u.store.GetStoreFor(db.bucket) + if err != nil { + return err + } + + reader := bytes.NewReader(db.data) + gzipPool := compression.GetWriterPool(compression.GZIP) + buf := bytes.NewBuffer(make([]byte, 0, 1<<20)) + compressedWriter := gzipPool.GetWriter(buf) + defer gzipPool.PutWriter(compressedWriter) + + _, err = io.Copy(compressedWriter, reader) + if err != nil { + return err + } + + err = compressedWriter.Close() + if err != nil { + return err + } + + return client.PutObject(ctx, db.id.Path(), buf) +} + +func buildFileName(indexName string) string { + return fmt.Sprintf("%s.gz", indexName) +} diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 827ed079d56a..a0d360c4a065 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -16,6 +16,10 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + + "github.com/grafana/loki/v3/pkg/kafka" + + "github.com/grafana/loki/v3/pkg/kafka/client" ) type SpecialOffset int @@ -97,6 +101,35 @@ type Reader struct { logger log.Logger } +func NewReader( + cfg kafka.Config, + partitionID int32, + instanceID string, + logger log.Logger, + reg prometheus.Registerer, +) (*Reader, error) { + // Create a new Kafka client for this reader + clientMetrics := client.NewReaderClientMetrics("partition-reader", reg) + c, err := client.NewReaderClient( + cfg, + clientMetrics, + log.With(logger, "component", "kafka-client"), + ) + if err != nil { + return nil, fmt.Errorf("creating kafka client: %w", err) + } + + // Create the reader + return newReader( + c, + cfg.Topic, + partitionID, + cfg.GetConsumerGroup(instanceID, partitionID), + logger, + reg, + ), nil +} + // NewReader creates a new Reader instance func newReader( client *kgo.Client, diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 76898f1b9ef4..49dcda1c928b 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -15,7 +15,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/client" ) var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded") @@ -81,27 +80,20 @@ func NewReaderService( logger log.Logger, reg prometheus.Registerer, ) (*ReaderService, error) { - // Create a new Kafka client for this reader - clientMetrics := client.NewReaderClientMetrics("partition-reader", reg) - c, err := client.NewReaderClient( - kafkaCfg, - clientMetrics, - log.With(logger, "component", "kafka-client"), - ) - if err != nil { - return nil, fmt.Errorf("creating kafka client: %w", err) - } // Create the reader - reader := newReader( - c, - kafkaCfg.Topic, + reader, err := NewReader( + kafkaCfg, partitionID, - kafkaCfg.GetConsumerGroup(instanceID, partitionID), + instanceID, logger, reg, ) + if err != nil { + return nil, errors.Wrap(err, "creating kafka reader") + } + return newReaderServiceFromIfc( ReaderConfig{ TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup, diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 3586e21b541d..ff5c6de1565b 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/v3/pkg/analytics" + "github.com/grafana/loki/v3/pkg/blockbuilder" "github.com/grafana/loki/v3/pkg/bloombuild" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" @@ -89,6 +90,7 @@ type Config struct { RulerStorage rulestore.Config `yaml:"ruler_storage,omitempty" doc:"hidden"` IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` + BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"` Pattern pattern.Config `yaml:"pattern_ingester,omitempty"` IndexGateway indexgateway.Config `yaml:"index_gateway"` BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"` @@ -183,6 +185,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.OperationalConfig.RegisterFlags(f) c.Profiling.RegisterFlags(f) c.KafkaConfig.RegisterFlags(f) + c.BlockBuilder.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -258,6 +261,9 @@ func (c *Config) Validate() error { if err := c.Ingester.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingester config")) } + if err := c.BlockBuilder.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_builder config")) + } if err := c.LimitsConfig.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config")) } @@ -372,6 +378,7 @@ type Loki struct { indexGatewayRingManager *lokiring.RingManager partitionRingWatcher *ring.PartitionRingWatcher partitionRing *ring.PartitionInstanceRing + blockBuilder *blockbuilder.BlockBuilder ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -682,6 +689,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(PatternIngesterTee, t.initPatternIngesterTee, modules.UserInvisibleModule) mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) + mm.RegisterModule(BlockBuilder, t.initBlockBuilder) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -719,6 +727,7 @@ func (t *Loki) setupModuleManager() error { IndexGatewayRing: {Overrides, MemberlistKV}, PartitionRing: {MemberlistKV, Server, Ring}, MemberlistKV: {Server}, + BlockBuilder: {PartitionRing, Store, Server}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b8ebc89b6475..726c8cfe52a6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/analytics" + "github.com/grafana/loki/v3/pkg/blockbuilder" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -47,6 +48,8 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/kafka/partitionring" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -134,6 +137,7 @@ const ( Analytics string = "analytics" InitCodec string = "init-codec" PartitionRing string = "partition-ring" + BlockBuilder string = "block-builder" ) const ( @@ -875,6 +879,12 @@ func (t *Loki) updateConfigForShipperStore() { t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly + case t.Cfg.isTarget(BlockBuilder): + // Blockbuilder handles index creation independently of the shipper. + // TODO: introduce Disabled mode for boltdb shipper and set it here. + t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly + t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeDisabled + default: // All other targets use the shipper store in RW mode t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadWrite @@ -1770,6 +1780,61 @@ func (t *Loki) initPartitionRing() (services.Service, error) { return t.partitionRingWatcher, nil } +func (t *Loki) initBlockBuilder() (services.Service, error) { + logger := log.With(util_log.Logger, "component", "block_builder") + + // TODO(owen-d): perhaps refactor to not use the ingester config? + id := t.Cfg.Ingester.LifecyclerConfig.ID + + ingestPartitionID, err := partitionring.ExtractIngesterPartitionID(id) + if err != nil { + return nil, fmt.Errorf("calculating block builder partition ID: %w", err) + } + + reader, err := partition.NewReader( + t.Cfg.KafkaConfig, + ingestPartitionID, + id, + logger, + prometheus.DefaultRegisterer, + ) + if err != nil { + return nil, err + } + + controller, err := blockbuilder.NewPartitionJobController( + reader, + t.Cfg.BlockBuilder.Backoff, + ) + + if err != nil { + return nil, err + } + + objectStore, err := blockbuilder.NewMultiStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics) + if err != nil { + return nil, err + } + + bb, err := blockbuilder.NewBlockBuilder( + id, + t.Cfg.BlockBuilder, + t.Cfg.SchemaConfig.Configs, + t.Store, + objectStore, + logger, + prometheus.DefaultRegisterer, + controller, + ) + + if err != nil { + return nil, err + } + + t.blockBuilder = bb + return t.blockBuilder, nil +} + func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil diff --git a/pkg/storage/stores/shipper/indexshipper/shipper.go b/pkg/storage/stores/shipper/indexshipper/shipper.go index 5b3037c45b08..2917b1fc7974 100644 --- a/pkg/storage/stores/shipper/indexshipper/shipper.go +++ b/pkg/storage/stores/shipper/indexshipper/shipper.go @@ -33,6 +33,9 @@ const ( ModeReadOnly = Mode("RO") // ModeWriteOnly is to allow only write operations ModeWriteOnly = Mode("WO") + // ModeDisabled is a no-op implementation which does nothing & does not error. + // It's used by the blockbuilder which handles index operations independently. + ModeDisabled = Mode("NO") // FilesystemObjectStoreType holds the periodic config type for the filesystem store FilesystemObjectStoreType = "filesystem" @@ -142,6 +145,8 @@ type indexShipper struct { func NewIndexShipper(prefix string, cfg Config, storageClient client.ObjectClient, limits downloads.Limits, tenantFilter downloads.TenantFilter, open index.OpenIndexFileFunc, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (IndexShipper, error) { switch cfg.Mode { + case ModeDisabled: + return Noop{}, nil case ModeReadOnly, ModeWriteOnly, ModeReadWrite: default: return nil, fmt.Errorf("invalid mode: %v", cfg.Mode) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index 23a951deacbd..be0a343309c5 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -134,8 +134,8 @@ func setupMultiTenantIndex(t *testing.T, indexFormat int, userStreams map[string dst := NewPrefixedIdentifier( MultitenantTSDBIdentifier{ - nodeName: "test", - ts: ts, + NodeName: "test", + Ts: ts, }, destDir, "", @@ -239,7 +239,7 @@ func TestCompactor_Compact(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, Schema: "v12", } - indexBkts := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) tableName := indexBkts[0] lbls1 := mustParseLabels(`{foo="bar", a="b"}`) @@ -497,8 +497,8 @@ func TestCompactor_Compact(t *testing.T) { t.Run(name, func(t *testing.T) { tempDir := t.TempDir() objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - tablePathInStorage := filepath.Join(objectStoragePath, tableName.prefix) - tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.prefix) + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.Prefix) require.NoError(t, util.EnsureDirectory(objectStoragePath)) require.NoError(t, util.EnsureDirectory(tablePathInStorage)) @@ -551,7 +551,7 @@ func TestCompactor_Compact(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - _, commonPrefixes, err := objectClient.List(context.Background(), tableName.prefix, "/") + _, commonPrefixes, err := objectClient.List(context.Background(), tableName.Prefix, "/") require.NoError(t, err) initializedIndexSets := map[string]compactor.IndexSet{} @@ -559,19 +559,19 @@ func TestCompactor_Compact(t *testing.T) { existingUserIndexSets := make(map[string]compactor.IndexSet, len(commonPrefixes)) for _, commonPrefix := range commonPrefixes { userID := path.Base(string(commonPrefix)) - idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) + idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) require.NoError(t, err) existingUserIndexSets[userID] = idxSet initializedIndexSets[userID] = idxSet } - commonIndexSet, err := newMockIndexSet("", tableName.prefix, tableWorkingDirectory, objectClient) + commonIndexSet, err := newMockIndexSet("", tableName.Prefix, tableWorkingDirectory, objectClient) require.NoError(t, err) // build TableCompactor and compact the index tCompactor := newTableCompactor(context.Background(), commonIndexSet, existingUserIndexSets, func(userID string) (compactor.IndexSet, error) { - idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) + idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) require.NoError(t, err) initializedIndexSetsMtx.Lock() @@ -875,9 +875,9 @@ func setupCompactedIndex(t *testing.T) *testContext { schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{periodConfig}, } - indexBuckets := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + indexBuckets := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) tableName := indexBuckets[0] - tableInterval := retention.ExtractIntervalFromTableName(tableName.prefix) + tableInterval := retention.ExtractIntervalFromTableName(tableName.Prefix) // shiftTableStart shift tableInterval.Start by the given amount of milliseconds. // It is used for building chunkmetas relative to start time of the table. shiftTableStart := func(ms int64) int64 { @@ -900,7 +900,7 @@ func setupCompactedIndex(t *testing.T) *testContext { builder.FinalizeChunks() - return newCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder) + return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder) } expectedChunkEntries := map[string][]retention.ChunkEntry{ diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go index 149d41bfa944..eab26fe643d5 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go @@ -161,13 +161,13 @@ func ParseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, ok bool } type MultitenantTSDBIdentifier struct { - nodeName string - ts time.Time + NodeName string + Ts time.Time } // Name builds filename with format + `-` + ` func (id MultitenantTSDBIdentifier) Name() string { - return fmt.Sprintf("%d-%s.tsdb", id.ts.Unix(), id.nodeName) + return fmt.Sprintf("%d-%s.tsdb", id.Ts.Unix(), id.NodeName) } func (id MultitenantTSDBIdentifier) Path() string { @@ -200,7 +200,7 @@ func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifie } return MultitenantTSDBIdentifier{ - ts: time.Unix(int64(ts), 0), - nodeName: strings.Join(xs[1:], "-"), + Ts: time.Unix(int64(ts), 0), + NodeName: strings.Join(xs[1:], "-"), }, true } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 756d354d1ed6..0766bd058fdf 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -62,6 +62,10 @@ const ( fingerprintInterval = 1 << 10 millisecondsInHour = int64(time.Hour / time.Millisecond) + + // reserved; used in multitenant indices to signal the tenant. Eventually compacted away when + // single tenant indices are created. + TenantLabel = "__loki_tenant__" ) type indexWriterStage uint8 diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go index b0d1824936d5..6ca252770169 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go @@ -40,9 +40,9 @@ func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Ind func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) { itr := indexIterFunc(func(f func(context.Context, Index) error) error { // Ensure we query both per tenant and multitenant TSDBs - idxBuckets := indexBuckets(from, through, []config.TableRange{i.tableRange}) + idxBuckets := IndexBuckets(from, through, []config.TableRange{i.tableRange}) for _, bkt := range idxBuckets { - if err := i.shipper.ForEachConcurrent(ctx, bkt.prefix, user, func(multitenant bool, idx shipperindex.Index) error { + if err := i.shipper.ForEachConcurrent(ctx, bkt.Prefix, user, func(multitenant bool, idx shipperindex.Index) error { impl, ok := idx.(Index) if !ok { return fmt.Errorf("unexpected shipper index type: %T", idx) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go index 96f56d7021f4..84c250eb7464 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go @@ -165,13 +165,13 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe // chunks may overlap index period bounds, in which case they're written to multiple pds := make(map[string]chunkInfo) for _, chk := range chks { - idxBuckets := indexBuckets(chk.From(), chk.Through(), tableRanges) + idxBuckets := IndexBuckets(chk.From(), chk.Through(), tableRanges) for _, bucket := range idxBuckets { - chkinfo := pds[bucket.prefix] + chkinfo := pds[bucket.Prefix] chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk) - chkinfo.tsdbFormat = bucket.tsdbFormat - pds[bucket.prefix] = chkinfo + chkinfo.tsdbFormat = bucket.TsdbFormat + pds[bucket.Prefix] = chkinfo } } @@ -208,8 +208,8 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) dst := NewPrefixedIdentifier( MultitenantTSDBIdentifier{ - nodeName: m.nodeName, - ts: heads.start, + NodeName: m.nodeName, + Ts: heads.start, }, dstDir, "", @@ -300,19 +300,24 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo return nil } -type indexInfo struct { - prefix string - tsdbFormat int +type IndexInfo struct { + BucketStart model.Time + Prefix string + TsdbFormat int } -func indexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []indexInfo) { +func IndexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []IndexInfo) { start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) for cur := start; cur <= end; cur++ { cfg := tableRanges.ConfigForTableNumber(cur) if cfg != nil { tsdbFormat, _ := cfg.TSDBFormat() // Ignoring error, as any valid period config should return valid format. - res = append(res, indexInfo{prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), tsdbFormat: tsdbFormat}) + res = append(res, IndexInfo{ + BucketStart: model.TimeFromUnixNano(cur * int64(config.ObjectStorageIndexRequiredPeriod)), + Prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), + TsdbFormat: tsdbFormat, + }) } } if len(res) == 0 { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/manager_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/manager_test.go new file mode 100644 index 000000000000..6816b0aeb208 --- /dev/null +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/manager_test.go @@ -0,0 +1,71 @@ +package tsdb + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/v3/pkg/storage/types" +) + +func TestIndexBuckets(t *testing.T) { + var ( + day0 = model.Time(0) + day1 = day0.Add(24 * time.Hour) + day2 = day1.Add(24 * time.Hour) + periods = []config.PeriodConfig{ + { + From: config.NewDayTime(day0), + Schema: "v12", + IndexType: "tsdb", + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: "index/", + Period: 24 * time.Hour, + }, + }, + }, + { + From: config.NewDayTime(day2), + Schema: "v13", + IndexType: "tsdb", + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: "index2/", + Period: 24 * time.Hour, + }, + }, + }, + } + + tableRanges = config.GetIndexStoreTableRanges(types.TSDBType, periods) + ) + tests := []struct { + name string + from model.Time + through model.Time + expectedInfo []IndexInfo + }{ + { + name: "single table range", + from: day0, + through: day2, + expectedInfo: []IndexInfo{ + {BucketStart: day0, TsdbFormat: index.FormatV2, Prefix: "index/0"}, + {BucketStart: day1, TsdbFormat: index.FormatV2, Prefix: "index/1"}, + {BucketStart: day2, TsdbFormat: index.FormatV3, Prefix: "index2/2"}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + res := IndexBuckets(tc.from, tc.through, tableRanges) + require.Equal(t, tc.expectedInfo, res) + }) + } +} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/store.go b/pkg/storage/stores/shipper/indexshipper/tsdb/store.go index 1ef58c32a1e5..8ca8a2489e6d 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/store.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/store.go @@ -85,6 +85,13 @@ func (s *store) init(name, prefix string, indexShipperCfg indexshipper.Config, s var indices []Index opts := DefaultIndexClientOptions() + // early return in case index shipper is disabled. + if indexShipperCfg.Mode == indexshipper.ModeDisabled { + s.indexWriter = noopIndexWriter{} + s.Reader = NewIndexClient(NoopIndex{}, opts, limits) + return nil + } + if indexShipperCfg.Mode == indexshipper.ModeWriteOnly { // We disable bloom filters on write nodes // for the Stats() methods as it's of relatively little @@ -172,3 +179,9 @@ type failingIndexWriter struct{} func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdbindex.ChunkMetas) error { return fmt.Errorf("index writer is not initialized due to tsdb store being initialized in read-only mode") } + +type noopIndexWriter struct{} + +func (f noopIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdbindex.ChunkMetas) error { + return nil +}