Skip to content

Commit

Permalink
feat(blooms): Add task timining and sizing metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Nov 20, 2024
1 parent a2b66d3 commit 31b902c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 24 deletions.
9 changes: 9 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Metrics struct {
tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
tenantTasksCompleted *prometheus.GaugeVec
tenantTasksTiming *prometheus.HistogramVec

// Retention metrics
retentionRunning prometheus.Gauge
Expand Down Expand Up @@ -166,6 +167,14 @@ func NewMetrics(
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant", "status"}),
tenantTasksTiming: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenant_tasks_time_seconds",
Help: "Time spent building tasks for a tenant during the current build iteration.",
// 1s --> 1h (steps of 1 minute)
Buckets: prometheus.LinearBuckets(1, 60, 60),
}, []string{"tenant", "status"}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Expand Down
30 changes: 18 additions & 12 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type Planner struct {
tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase

tasksQueue *queue.Queue
tasksQueue *queue.Queue
planFactory *strategies.Factory

metrics *Metrics
logger log.Logger
Expand Down Expand Up @@ -86,14 +87,15 @@ func New(
}

p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
planFactory: strategies.NewFactory(limits, strategies.NewMetrics(r), logger),
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}

p.retentionManager = NewRetentionManager(
Expand Down Expand Up @@ -370,7 +372,7 @@ func (p *Planner) computeTasks(
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
strategy, err := p.planFactory.GetStrategy(tenant)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}
Expand Down Expand Up @@ -770,8 +772,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
continue
}

startTime := time.Now()
result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusFailure).Observe(time.Since(startTime).Seconds())
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.tasksQueue.Release(task.ProtoTask)
Expand Down Expand Up @@ -811,10 +815,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(task.queueTime).Seconds(),
"timeSinceEnqueued", time.Since(task.queueTime).Seconds(),
"buildTime", time.Since(startTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.tasksQueue.Release(task.ProtoTask)
p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusSuccess).Observe(time.Since(startTime).Seconds())

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down Expand Up @@ -866,7 +872,7 @@ func (p *Planner) forwardTaskToBuilder(
case err := <-errCh:
return nil, err
case <-timeout:
return nil, fmt.Errorf("timeout waiting for response from builder (%s)", builderID)
return nil, fmt.Errorf("timeout (%s) waiting for response from builder (%s)", taskTimeout, builderID)
}
}

Expand Down
37 changes: 32 additions & 5 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand All @@ -21,22 +22,47 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

const (
metricsNamespace = "loki"
metricsSubsystem = "bloomplanner"
)

type ChunkSizeStrategyMetrics struct {
tenantTaskSize *prometheus.HistogramVec
}

func NewChunkSizeStrategyMetrics(reg prometheus.Registerer) *ChunkSizeStrategyMetrics {

Check warning on line 34 in pkg/bloombuild/planner/strategies/chunksize.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'reg' seems to be unused, consider removing or renaming it as _ (revive)
return &ChunkSizeStrategyMetrics{
tenantTaskSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenant_task_size_bytes",
Help: "Size of tasks generated by the chunk size strategy",
// 1GB --> 512GB
Buckets: prometheus.ExponentialBuckets(1e9, 2, 10),
}, []string{"tenant"}),
}
}

type ChunkSizeStrategyLimits interface {
BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64
}

type ChunkSizeStrategy struct {
limits ChunkSizeStrategyLimits
logger log.Logger
limits ChunkSizeStrategyLimits
metrics *ChunkSizeStrategyMetrics
logger log.Logger
}

func NewChunkSizeStrategy(
limits ChunkSizeStrategyLimits,
metrics *ChunkSizeStrategyMetrics,
logger log.Logger,
) (*ChunkSizeStrategy, error) {
return &ChunkSizeStrategy{
limits: limits,
logger: logger,
limits: limits,
metrics: metrics,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -82,8 +108,9 @@ func (s *ChunkSizeStrategy) Plan(
continue
}

bounds := series.Bounds()
s.metrics.tenantTaskSize.WithLabelValues(tenant).Observe(float64(series.Size()))

bounds := series.Bounds()
blocks, err := getBlocksMatchingBounds(metas, bounds)
if err != nil {
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/strategies/chunksize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
Expand Down Expand Up @@ -228,7 +229,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

strategy, err := NewChunkSizeStrategy(tc.limits, logger)
strategy, err := NewChunkSizeStrategy(tc.limits, NewChunkSizeStrategyMetrics(prometheus.NewPedanticRegistry()), logger)
require.NoError(t, err)

actual, err := strategy.Plan(context.Background(), plannertest.TestTable, "fake", tc.tsdbs, tc.originalMetas)
Expand Down
37 changes: 31 additions & 6 deletions pkg/bloombuild/planner/strategies/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -32,18 +33,42 @@ type PlanningStrategy interface {
Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error)
}

func NewStrategy(
tenantID string,
type Metrics struct {
*ChunkSizeStrategyMetrics
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
ChunkSizeStrategyMetrics: NewChunkSizeStrategyMetrics(reg),
}
}

type Factory struct {
limits Limits
logger log.Logger
metrics *Metrics
}

func NewFactory(
limits Limits,
metrics *Metrics,
logger log.Logger,
) (PlanningStrategy, error) {
strategy := limits.BloomPlanningStrategy(tenantID)
) *Factory {
return &Factory{
limits: limits,
logger: logger,
metrics: metrics,
}
}

func (f *Factory) GetStrategy(tenantID string) (PlanningStrategy, error) {
strategy := f.limits.BloomPlanningStrategy(tenantID)

switch strategy {
case SplitKeyspaceStrategyName:
return NewSplitKeyspaceStrategy(limits, logger)
return NewSplitKeyspaceStrategy(f.limits, f.logger)
case SplitBySeriesChunkSizeStrategyName:
return NewChunkSizeStrategy(limits, logger)
return NewChunkSizeStrategy(f.limits, f.metrics.ChunkSizeStrategyMetrics, f.logger)
default:
return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy)
}
Expand Down

0 comments on commit 31b902c

Please sign in to comment.