Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Prefetch bloom blocks as soon as they are built #15050

Merged
merged 13 commits into from
Nov 22, 2024
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3765,6 +3765,10 @@ shard_streams:
# CLI flag: -bloom-build.block-encoding
[bloom_block_encoding: <string> | default = "none"]

# Experimental. Prefetch blocks on bloom gateways as soon as they are built.
# CLI flag: -bloom-build.prefetch-blocks
[bloom_prefetch_blocks: <boolean> | default = false]

# Experimental. The maximum bloom block size. A value of 0 sets an unlimited
# size. Default is 200MB. The actual block size might exceed this limit since
# blooms will be added to blocks until the block exceeds the maximum block size.
Expand Down
29 changes: 20 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
Expand All @@ -48,8 +49,9 @@ type Builder struct {
metrics *Metrics
logger log.Logger

bloomStore bloomshipper.Store
chunkLoader ChunkLoader
bloomStore bloomshipper.Store
chunkLoader ChunkLoader
bloomGateway bloomgateway.Client

client protos.PlannerForBuilderClient

Expand All @@ -66,6 +68,7 @@ func New(
_ storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomGateway bloomgateway.Client,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
Expand All @@ -77,13 +80,14 @@ func New(

metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
logger: logger,
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
bloomGateway: bloomGateway,
logger: logger,
}

if rm != nil {
Expand Down Expand Up @@ -519,6 +523,13 @@ func (b *Builder) processTask(
b.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta")
created = append(created, meta)

// Now that the meta is written thus blocks can be queried, we prefetch them to the gateway
if b.bloomGateway != nil && b.limits.PrefetchBloomBlocks(tenant) {
if err := b.bloomGateway.PrefetchBloomBlocks(ctx, meta.Blocks); err != nil {
level.Error(logger).Log("msg", "failed to prefetch block on gateway", "err", err)
}
}
}

b.metrics.seriesPerTask.Observe(float64(totalSeries))
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Lo
metrics := storage.NewClientMetrics()
metrics.Unregister()

builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil)
builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, nil, logger, prometheus.NewPedanticRegistry(), nil)
require.NoError(t, err)

return builder
Expand Down
1 change: 1 addition & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ type Limits interface {
BloomMaxBlockSize(tenantID string) int
BloomMaxBloomSize(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
PrefetchBloomBlocks(tenantID string) bool
}
61 changes: 53 additions & 8 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,43 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func (g *Gateway) PrefetchBloomBlocks(_ context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need prefetch-specific metrics so we can measure the impact of this? (for example, how many blocks were downloaded via a prefetch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

refs, err := decodeBlockKeys(req.Blocks)
if err != nil {
return nil, err
}

bqs, err := g.bloomStore.FetchBlocks(
// We don't use the ctx passed to the handler since its canceled when the handler returns
context.Background(),
Copy link
Contributor Author

@salvacorts salvacorts Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not passing a timeout here since you always need to call the cancel function before returning, which would trigger the download to stop. This is not an issue since when the gateway stops, the queue is closed and any pending block download is stopped.

refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithIgnoreNotFound(true),
bloomshipper.WithCacheGetOptions(
bloomshipper.WithSkipHitMissMetrics(true),
),
)
if err != nil {
g.metrics.prefetchedBlocks.WithLabelValues(typeError).Add(float64(len(refs)))
return nil, err
}

for _, bq := range bqs {
if bq == nil {
// This is the expected case: the blocks is not yet downloaded and the block querier is nil
continue
}

// Close any block querier that were already downloaded
if err := bq.Close(); err != nil {
level.Warn(g.logger).Log("msg", "failed to close block querier", "err", err)
}
}

g.metrics.prefetchedBlocks.WithLabelValues(typeSuccess).Add(float64(len(refs)))
return &logproto.PrefetchBloomBlocksResponse{}, err
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
Expand Down Expand Up @@ -204,14 +241,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
blocks, err := decodeBlockKeys(req.Blocks)
if err != nil {
stats.Status = labelFailure
return nil, err
}

// Shortcut if request does not contain blocks
Expand Down Expand Up @@ -470,3 +503,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR

cur.Refs = cur.Refs[:len(res)]
}

func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
for _, key := range keys {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}
return blocks, nil
}
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.

type ClientCache struct {
cache *resultscache.ResultsCache
next logproto.BloomGatewayClient
limits CacheLimits
logger log.Logger
}
Expand Down Expand Up @@ -149,12 +150,19 @@ func NewBloomGatewayClientCacheMiddleware(
)

return &ClientCache{
next: next,
cache: resultsCache,
limits: limits,
logger: logger,
}
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
return c.next.PrefetchBloomBlocks(ctx, in, opts...)
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ type mockServer struct {
res *logproto.FilterChunkRefResponse
}

var _ logproto.BloomGatewayClient = &mockServer{}

func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
var calls int
return &mockServer{
Expand All @@ -480,11 +482,17 @@ func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
s.res = res
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
*s.calls++
return s.res, nil
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
panic("unimplemented")
}

type mockLimits struct {
cacheFreshness time.Duration
cacheInterval time.Duration
Expand Down
51 changes: 49 additions & 2 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (i *ClientConfig) Validate() error {

type Client interface {
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error
}

// clientPool is a minimal interface that is satisfied by the JumpHashClientPool.
Expand Down Expand Up @@ -204,6 +205,47 @@ func (c *GatewayClient) Close() {
c.dnsProvider.Stop()
}

func (c *GatewayClient) PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error {
if len(blocks) == 0 {
return nil
}

pos := make(map[string]int)
servers := make([]addrWithBlocks, 0, len(blocks))
for _, block := range blocks {
addr, err := c.pool.Addr(block.String())
if err != nil {
level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", block, "err", err)
continue
}

if idx, found := pos[addr]; found {
servers[idx].blocks = append(servers[idx].blocks, block.String())
} else {
pos[addr] = len(servers)
servers = append(servers, addrWithBlocks{
addr: addr,
blocks: []string{block.String()},
})
}
}

return concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.PrefetchBloomBlocksRequest{Blocks: rs.blocks}
_, err := client.PrefetchBloomBlocks(ctx, req)
if err != nil {
level.Error(c.logger).Log("msg", "block prefetch failed for instance, skipping", "addr", rs.addr, "blocks", len(rs.blocks), "err", err)
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeError).Inc()
} else {
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeSuccess).Inc()
}
return err
})
})
}

// FilterChunks implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
// no block and therefore no series with chunks
Expand Down Expand Up @@ -268,10 +310,10 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
"err", err,
)
// filter none of the results on failed request
c.metrics.clientRequests.WithLabelValues(typeError).Inc()
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeError).Inc()
results[i] = rs.groups
} else {
c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc()
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeSuccess).Inc()
results[i] = resp.ChunkRefs
}

Expand Down Expand Up @@ -390,6 +432,11 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}

type addrWithBlocks struct {
addr string
blocks []string
}

type addrWithGroups struct {
addr string
blocks []string
Expand Down
12 changes: 11 additions & 1 deletion pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type metrics struct {
const (
typeSuccess = "success"
typeError = "error"

routeFilterChunks = "FilterChunks"
routePrefectBlocks = "PrefetchBloomBlocks"
)

type clientMetrics struct {
Expand All @@ -32,7 +35,7 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
Subsystem: "bloom_gateway_client",
Name: "requests_total",
Help: "Total number of requests made to the bloom gateway",
}, []string{"type"}),
}, []string{"route", "type"}),
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Expand All @@ -50,6 +53,7 @@ type serverMetrics struct {
requestedChunks prometheus.Histogram
filteredChunks prometheus.Histogram
receivedMatchers prometheus.Histogram
prefetchedBlocks *prometheus.CounterVec
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand Down Expand Up @@ -105,6 +109,12 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Help: "Number of matchers per request.",
Buckets: prometheus.ExponentialBuckets(1, 2, 9), // 1 -> 256
}),
prefetchedBlocks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "prefetched_blocks_total",
Help: "Total amount of blocks prefetched by the bloom-gateway",
}, []string{"status"}),
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type noopClient struct {
callCount int
}

var _ Client = &noopClient{}

// FilterChunks implements Client.
func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.Interval, blocks []blockWithSeries, _ plan.QueryPlan) (result []*logproto.GroupedChunkRefs, err error) {
for _, block := range blocks {
Expand All @@ -39,6 +41,10 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In
return result, c.err
}

func (c *noopClient) PrefetchBloomBlocks(_ context.Context, _ []bloomshipper.BlockRef) error {
return nil
}

type mockBlockResolver struct{}

// Resolve implements BlockResolver.
Expand Down
Loading
Loading