From 204acaaf91fde7a121db84f82430de21f06c365c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 21 Nov 2024 10:06:40 +0100 Subject: [PATCH] Implement `PrefetchBloomBlocks` gRPC method on gateway Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 39 +++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 2de1eed18c6b9..5ab631868e256 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -161,9 +161,18 @@ func (g *Gateway) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } -func (g *Gateway) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) { - // TODO: Implement prefetching of bloom blocks - return &logproto.PrefetchBloomBlocksResponse{}, nil +func (g *Gateway) PrefetchBloomBlocks(ctx context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) { + refs, err := decodeBlockKeys(req.Blocks) + if err != nil { + return nil, err + } + _, err = g.bloomStore.FetchBlocks( + ctx, + refs, + bloomshipper.WithFetchAsync(true), + bloomshipper.WithIgnoreNotFound(true), + ) + return &logproto.PrefetchBloomBlocksResponse{}, err } // FilterChunkRefs implements BloomGatewayServer @@ -209,14 +218,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } - blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks)) - for _, key := range req.Blocks { - block, err := bloomshipper.BlockRefFromKey(key) - if err != nil { - stats.Status = labelFailure - return nil, errors.New("could not parse block key") - } - blocks = append(blocks, block) + blocks, err := decodeBlockKeys(req.Blocks) + if err != nil { + stats.Status = labelFailure + return nil, err } // Shortcut if request does not contain blocks @@ -475,3 +480,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR cur.Refs = cur.Refs[:len(res)] } + +func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) { + blocks := make([]bloomshipper.BlockRef, 0, len(keys)) + for _, key := range keys { + block, err := bloomshipper.BlockRefFromKey(key) + if err != nil { + return nil, errors.New("could not parse block key") + } + blocks = append(blocks, block) + } + return blocks, nil +}