Skip to content

Commit

Permalink
Implement PrefetchBloomBlocks gRPC method on gateway
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 21, 2024
1 parent fd25955 commit 204acaa
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,18 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func (g *Gateway) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
// TODO: Implement prefetching of bloom blocks
return &logproto.PrefetchBloomBlocksResponse{}, nil
func (g *Gateway) PrefetchBloomBlocks(ctx context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
refs, err := decodeBlockKeys(req.Blocks)
if err != nil {
return nil, err
}
_, err = g.bloomStore.FetchBlocks(
ctx,
refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithIgnoreNotFound(true),
)
return &logproto.PrefetchBloomBlocksResponse{}, err
}

// FilterChunkRefs implements BloomGatewayServer
Expand Down Expand Up @@ -209,14 +218,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
blocks, err := decodeBlockKeys(req.Blocks)
if err != nil {
stats.Status = labelFailure
return nil, err
}

// Shortcut if request does not contain blocks
Expand Down Expand Up @@ -475,3 +480,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR

cur.Refs = cur.Refs[:len(res)]
}

func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
for _, key := range keys {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}
return blocks, nil
}

0 comments on commit 204acaa

Please sign in to comment.