Skip to content

Commit

Permalink
feat(blooms): Prefetch bloom blocks as soon as they are built (#15050)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
salvacorts and chaudum authored Nov 22, 2024
1 parent 2ae1ead commit b406015
Show file tree
Hide file tree
Showing 16 changed files with 694 additions and 66 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3816,6 +3816,10 @@ shard_streams:
# CLI flag: -bloom-build.block-encoding
[bloom_block_encoding: <string> | default = "none"]

# Experimental. Prefetch blocks on bloom gateways as soon as they are built.
# CLI flag: -bloom-build.prefetch-blocks
[bloom_prefetch_blocks: <boolean> | default = false]

# Experimental. The maximum bloom block size. A value of 0 sets an unlimited
# size. Default is 200MB. The actual block size might exceed this limit since
# blooms will be added to blocks until the block exceeds the maximum block size.
Expand Down
29 changes: 20 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
Expand All @@ -48,8 +49,9 @@ type Builder struct {
metrics *Metrics
logger log.Logger

bloomStore bloomshipper.Store
chunkLoader ChunkLoader
bloomStore bloomshipper.Store
chunkLoader ChunkLoader
bloomGateway bloomgateway.Client

client protos.PlannerForBuilderClient

Expand All @@ -66,6 +68,7 @@ func New(
_ storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomGateway bloomgateway.Client,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
Expand All @@ -77,13 +80,14 @@ func New(

metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
logger: logger,
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
bloomGateway: bloomGateway,
logger: logger,
}

if rm != nil {
Expand Down Expand Up @@ -519,6 +523,13 @@ func (b *Builder) processTask(
b.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta")
created = append(created, meta)

// Now that the meta is written thus blocks can be queried, we prefetch them to the gateway
if b.bloomGateway != nil && b.limits.PrefetchBloomBlocks(tenant) {
if err := b.bloomGateway.PrefetchBloomBlocks(ctx, meta.Blocks); err != nil {
level.Error(logger).Log("msg", "failed to prefetch block on gateway", "err", err)
}
}
}

b.metrics.seriesPerTask.Observe(float64(totalSeries))
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Lo
metrics := storage.NewClientMetrics()
metrics.Unregister()

builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil)
builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, nil, logger, prometheus.NewPedanticRegistry(), nil)
require.NoError(t, err)

return builder
Expand Down
1 change: 1 addition & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ type Limits interface {
BloomMaxBlockSize(tenantID string) int
BloomMaxBloomSize(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
PrefetchBloomBlocks(tenantID string) bool
}
61 changes: 53 additions & 8 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,43 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func (g *Gateway) PrefetchBloomBlocks(_ context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
refs, err := decodeBlockKeys(req.Blocks)
if err != nil {
return nil, err
}

bqs, err := g.bloomStore.FetchBlocks(
// We don't use the ctx passed to the handler since its canceled when the handler returns
context.Background(),
refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithIgnoreNotFound(true),
bloomshipper.WithCacheGetOptions(
bloomshipper.WithSkipHitMissMetrics(true),
),
)
if err != nil {
g.metrics.prefetchedBlocks.WithLabelValues(typeError).Add(float64(len(refs)))
return nil, err
}

for _, bq := range bqs {
if bq == nil {
// This is the expected case: the blocks is not yet downloaded and the block querier is nil
continue
}

// Close any block querier that were already downloaded
if err := bq.Close(); err != nil {
level.Warn(g.logger).Log("msg", "failed to close block querier", "err", err)
}
}

g.metrics.prefetchedBlocks.WithLabelValues(typeSuccess).Add(float64(len(refs)))
return &logproto.PrefetchBloomBlocksResponse{}, err
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
Expand Down Expand Up @@ -204,14 +241,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
blocks, err := decodeBlockKeys(req.Blocks)
if err != nil {
stats.Status = labelFailure
return nil, err
}

// Shortcut if request does not contain blocks
Expand Down Expand Up @@ -470,3 +503,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR

cur.Refs = cur.Refs[:len(res)]
}

func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
for _, key := range keys {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}
return blocks, nil
}
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.

type ClientCache struct {
cache *resultscache.ResultsCache
next logproto.BloomGatewayClient
limits CacheLimits
logger log.Logger
}
Expand Down Expand Up @@ -149,12 +150,19 @@ func NewBloomGatewayClientCacheMiddleware(
)

return &ClientCache{
next: next,
cache: resultsCache,
limits: limits,
logger: logger,
}
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
return c.next.PrefetchBloomBlocks(ctx, in, opts...)
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ type mockServer struct {
res *logproto.FilterChunkRefResponse
}

var _ logproto.BloomGatewayClient = &mockServer{}

func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
var calls int
return &mockServer{
Expand All @@ -480,11 +482,17 @@ func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
s.res = res
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
*s.calls++
return s.res, nil
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
panic("unimplemented")
}

type mockLimits struct {
cacheFreshness time.Duration
cacheInterval time.Duration
Expand Down
51 changes: 49 additions & 2 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (i *ClientConfig) Validate() error {

type Client interface {
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error
}

// clientPool is a minimal interface that is satisfied by the JumpHashClientPool.
Expand Down Expand Up @@ -204,6 +205,47 @@ func (c *GatewayClient) Close() {
c.dnsProvider.Stop()
}

func (c *GatewayClient) PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error {
if len(blocks) == 0 {
return nil
}

pos := make(map[string]int)
servers := make([]addrWithBlocks, 0, len(blocks))
for _, block := range blocks {
addr, err := c.pool.Addr(block.String())
if err != nil {
level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", block, "err", err)
continue
}

if idx, found := pos[addr]; found {
servers[idx].blocks = append(servers[idx].blocks, block.String())
} else {
pos[addr] = len(servers)
servers = append(servers, addrWithBlocks{
addr: addr,
blocks: []string{block.String()},
})
}
}

return concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.PrefetchBloomBlocksRequest{Blocks: rs.blocks}
_, err := client.PrefetchBloomBlocks(ctx, req)
if err != nil {
level.Error(c.logger).Log("msg", "block prefetch failed for instance, skipping", "addr", rs.addr, "blocks", len(rs.blocks), "err", err)
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeError).Inc()
} else {
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeSuccess).Inc()
}
return err
})
})
}

// FilterChunks implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
// no block and therefore no series with chunks
Expand Down Expand Up @@ -268,10 +310,10 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
"err", err,
)
// filter none of the results on failed request
c.metrics.clientRequests.WithLabelValues(typeError).Inc()
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeError).Inc()
results[i] = rs.groups
} else {
c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc()
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeSuccess).Inc()
results[i] = resp.ChunkRefs
}

Expand Down Expand Up @@ -390,6 +432,11 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}

type addrWithBlocks struct {
addr string
blocks []string
}

type addrWithGroups struct {
addr string
blocks []string
Expand Down
12 changes: 11 additions & 1 deletion pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type metrics struct {
const (
typeSuccess = "success"
typeError = "error"

routeFilterChunks = "FilterChunks"
routePrefectBlocks = "PrefetchBloomBlocks"
)

type clientMetrics struct {
Expand All @@ -32,7 +35,7 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
Subsystem: "bloom_gateway_client",
Name: "requests_total",
Help: "Total number of requests made to the bloom gateway",
}, []string{"type"}),
}, []string{"route", "type"}),
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Expand All @@ -50,6 +53,7 @@ type serverMetrics struct {
requestedChunks prometheus.Histogram
filteredChunks prometheus.Histogram
receivedMatchers prometheus.Histogram
prefetchedBlocks *prometheus.CounterVec
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand Down Expand Up @@ -105,6 +109,12 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Help: "Number of matchers per request.",
Buckets: prometheus.ExponentialBuckets(1, 2, 9), // 1 -> 256
}),
prefetchedBlocks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "prefetched_blocks_total",
Help: "Total amount of blocks prefetched by the bloom-gateway",
}, []string{"status"}),
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type noopClient struct {
callCount int
}

var _ Client = &noopClient{}

// FilterChunks implements Client.
func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.Interval, blocks []blockWithSeries, _ plan.QueryPlan) (result []*logproto.GroupedChunkRefs, err error) {
for _, block := range blocks {
Expand All @@ -39,6 +41,10 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In
return result, c.err
}

func (c *noopClient) PrefetchBloomBlocks(_ context.Context, _ []bloomshipper.BlockRef) error {
return nil
}

type mockBlockResolver struct{}

// Resolve implements BlockResolver.
Expand Down
Loading

0 comments on commit b406015

Please sign in to comment.