From b4060154d198e17bef8ba0fbb1c99bb5c93a412d Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 22 Nov 2024 13:57:09 +0100 Subject: [PATCH] feat(blooms): Prefetch bloom blocks as soon as they are built (#15050) Signed-off-by: Christian Haudum Co-authored-by: Christian Haudum --- docs/sources/shared/configuration.md | 4 + pkg/bloombuild/builder/builder.go | 29 +- pkg/bloombuild/builder/builder_test.go | 2 +- pkg/bloombuild/builder/config.go | 1 + pkg/bloomgateway/bloomgateway.go | 61 ++- pkg/bloomgateway/cache.go | 8 + pkg/bloomgateway/cache_test.go | 8 + pkg/bloomgateway/client.go | 51 +- pkg/bloomgateway/metrics.go | 12 +- pkg/bloomgateway/querier_test.go | 6 + pkg/logproto/bloomgateway.pb.go | 494 ++++++++++++++++-- pkg/logproto/bloomgateway.proto | 7 + pkg/loki/modules.go | 17 + .../shipper/bloomshipper/blockscache.go | 38 +- .../stores/shipper/bloomshipper/fetcher.go | 16 +- pkg/validation/limits.go | 6 + 16 files changed, 694 insertions(+), 66 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 5fd2850f229d..40ee8f681fc1 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3816,6 +3816,10 @@ shard_streams: # CLI flag: -bloom-build.block-encoding [bloom_block_encoding: | default = "none"] +# Experimental. Prefetch blocks on bloom gateways as soon as they are built. +# CLI flag: -bloom-build.prefetch-blocks +[bloom_prefetch_blocks: | default = false] + # Experimental. The maximum bloom block size. A value of 0 sets an unlimited # size. Default is 200MB. The actual block size might exceed this limit since # blooms will be added to blocks until the block exceeds the maximum block size. diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 716b26ea8987..b5bb682e8efc 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" + "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage" @@ -48,8 +49,9 @@ type Builder struct { metrics *Metrics logger log.Logger - bloomStore bloomshipper.Store - chunkLoader ChunkLoader + bloomStore bloomshipper.Store + chunkLoader ChunkLoader + bloomGateway bloomgateway.Client client protos.PlannerForBuilderClient @@ -66,6 +68,7 @@ func New( _ storage.ClientMetrics, fetcherProvider stores.ChunkFetcherProvider, bloomStore bloomshipper.Store, + bloomGateway bloomgateway.Client, logger log.Logger, r prometheus.Registerer, rm *ring.RingManager, @@ -77,13 +80,14 @@ func New( metrics := NewMetrics(r) b := &Builder{ - ID: builderID, - cfg: cfg, - limits: limits, - metrics: metrics, - bloomStore: bloomStore, - chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics), - logger: logger, + ID: builderID, + cfg: cfg, + limits: limits, + metrics: metrics, + bloomStore: bloomStore, + chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics), + bloomGateway: bloomGateway, + logger: logger, } if rm != nil { @@ -519,6 +523,13 @@ func (b *Builder) processTask( b.metrics.metasCreated.Inc() level.Debug(logger).Log("msg", "uploaded meta") created = append(created, meta) + + // Now that the meta is written thus blocks can be queried, we prefetch them to the gateway + if b.bloomGateway != nil && b.limits.PrefetchBloomBlocks(tenant) { + if err := b.bloomGateway.PrefetchBloomBlocks(ctx, meta.Blocks); err != nil { + level.Error(logger).Log("msg", "failed to prefetch block on gateway", "err", err) + } + } } b.metrics.seriesPerTask.Observe(float64(totalSeries)) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index fcd19179c142..7fdbf44ffdb8 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -75,7 +75,7 @@ func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Lo metrics := storage.NewClientMetrics() metrics.Unregister() - builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil) + builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, nil, logger, prometheus.NewPedanticRegistry(), nil) require.NoError(t, err) return builder diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 0a6444fa24cc..026a5b88d7b7 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -42,4 +42,5 @@ type Limits interface { BloomMaxBlockSize(tenantID string) int BloomMaxBloomSize(tenantID string) int BuilderResponseTimeout(tenantID string) time.Duration + PrefetchBloomBlocks(tenantID string) bool } diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 30624f9d3245..4242118e225c 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -161,6 +161,43 @@ func (g *Gateway) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } +func (g *Gateway) PrefetchBloomBlocks(_ context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) { + refs, err := decodeBlockKeys(req.Blocks) + if err != nil { + return nil, err + } + + bqs, err := g.bloomStore.FetchBlocks( + // We don't use the ctx passed to the handler since its canceled when the handler returns + context.Background(), + refs, + bloomshipper.WithFetchAsync(true), + bloomshipper.WithIgnoreNotFound(true), + bloomshipper.WithCacheGetOptions( + bloomshipper.WithSkipHitMissMetrics(true), + ), + ) + if err != nil { + g.metrics.prefetchedBlocks.WithLabelValues(typeError).Add(float64(len(refs))) + return nil, err + } + + for _, bq := range bqs { + if bq == nil { + // This is the expected case: the blocks is not yet downloaded and the block querier is nil + continue + } + + // Close any block querier that were already downloaded + if err := bq.Close(); err != nil { + level.Warn(g.logger).Log("msg", "failed to close block querier", "err", err) + } + } + + g.metrics.prefetchedBlocks.WithLabelValues(typeSuccess).Add(float64(len(refs))) + return &logproto.PrefetchBloomBlocksResponse{}, err +} + // FilterChunkRefs implements BloomGatewayServer func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) { tenantID, err := tenant.TenantID(ctx) @@ -204,14 +241,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } - blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks)) - for _, key := range req.Blocks { - block, err := bloomshipper.BlockRefFromKey(key) - if err != nil { - stats.Status = labelFailure - return nil, errors.New("could not parse block key") - } - blocks = append(blocks, block) + blocks, err := decodeBlockKeys(req.Blocks) + if err != nil { + stats.Status = labelFailure + return nil, err } // Shortcut if request does not contain blocks @@ -470,3 +503,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR cur.Refs = cur.Refs[:len(res)] } + +func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) { + blocks := make([]bloomshipper.BlockRef, 0, len(keys)) + for _, key := range keys { + block, err := bloomshipper.BlockRefFromKey(key) + if err != nil { + return nil, errors.New("could not parse block key") + } + blocks = append(blocks, block) + } + return blocks, nil +} diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index 77d5168ca303..acfe951ffa36 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -113,6 +113,7 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache. type ClientCache struct { cache *resultscache.ResultsCache + next logproto.BloomGatewayClient limits CacheLimits logger log.Logger } @@ -149,12 +150,19 @@ func NewBloomGatewayClientCacheMiddleware( ) return &ClientCache{ + next: next, cache: resultsCache, limits: limits, logger: logger, } } +// PrefetchBloomBlocks implements logproto.BloomGatewayClient. +func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) { + return c.next.PrefetchBloomBlocks(ctx, in, opts...) +} + +// FilterChunkRefs implements logproto.BloomGatewayClient. func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) { cacheReq := requestWithGrpcCallOptions{ FilterChunkRefRequest: req, diff --git a/pkg/bloomgateway/cache_test.go b/pkg/bloomgateway/cache_test.go index f85d366cc5aa..d0a524b4907d 100644 --- a/pkg/bloomgateway/cache_test.go +++ b/pkg/bloomgateway/cache_test.go @@ -468,6 +468,8 @@ type mockServer struct { res *logproto.FilterChunkRefResponse } +var _ logproto.BloomGatewayClient = &mockServer{} + func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) { var calls int return &mockServer{ @@ -480,11 +482,17 @@ func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) { s.res = res } +// FilterChunkRefs implements logproto.BloomGatewayClient. func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) { *s.calls++ return s.res, nil } +// PrefetchBloomBlocks implements logproto.BloomGatewayClient. +func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) { + panic("unimplemented") +} + type mockLimits struct { cacheFreshness time.Duration cacheInterval time.Duration diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 23f7026d0070..b085b849488e 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -116,6 +116,7 @@ func (i *ClientConfig) Validate() error { type Client interface { FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) + PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error } // clientPool is a minimal interface that is satisfied by the JumpHashClientPool. @@ -204,6 +205,47 @@ func (c *GatewayClient) Close() { c.dnsProvider.Stop() } +func (c *GatewayClient) PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error { + if len(blocks) == 0 { + return nil + } + + pos := make(map[string]int) + servers := make([]addrWithBlocks, 0, len(blocks)) + for _, block := range blocks { + addr, err := c.pool.Addr(block.String()) + if err != nil { + level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", block, "err", err) + continue + } + + if idx, found := pos[addr]; found { + servers[idx].blocks = append(servers[idx].blocks, block.String()) + } else { + pos[addr] = len(servers) + servers = append(servers, addrWithBlocks{ + addr: addr, + blocks: []string{block.String()}, + }) + } + } + + return concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { + rs := servers[i] + return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { + req := &logproto.PrefetchBloomBlocksRequest{Blocks: rs.blocks} + _, err := client.PrefetchBloomBlocks(ctx, req) + if err != nil { + level.Error(c.logger).Log("msg", "block prefetch failed for instance, skipping", "addr", rs.addr, "blocks", len(rs.blocks), "err", err) + c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeError).Inc() + } else { + c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeSuccess).Inc() + } + return err + }) + }) +} + // FilterChunks implements Client func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) { // no block and therefore no series with chunks @@ -268,10 +310,10 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo "err", err, ) // filter none of the results on failed request - c.metrics.clientRequests.WithLabelValues(typeError).Inc() + c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeError).Inc() results[i] = rs.groups } else { - c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() + c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeSuccess).Inc() results[i] = resp.ChunkRefs } @@ -390,6 +432,11 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway return err } +type addrWithBlocks struct { + addr string + blocks []string +} + type addrWithGroups struct { addr string blocks []string diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 690f95354a23..af04cc7e03bc 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -18,6 +18,9 @@ type metrics struct { const ( typeSuccess = "success" typeError = "error" + + routeFilterChunks = "FilterChunks" + routePrefectBlocks = "PrefetchBloomBlocks" ) type clientMetrics struct { @@ -32,7 +35,7 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { Subsystem: "bloom_gateway_client", Name: "requests_total", Help: "Total number of requests made to the bloom gateway", - }, []string{"type"}), + }, []string{"route", "type"}), requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway_client", @@ -50,6 +53,7 @@ type serverMetrics struct { requestedChunks prometheus.Histogram filteredChunks prometheus.Histogram receivedMatchers prometheus.Histogram + prefetchedBlocks *prometheus.CounterVec } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -105,6 +109,12 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str Help: "Number of matchers per request.", Buckets: prometheus.ExponentialBuckets(1, 2, 9), // 1 -> 256 }), + prefetchedBlocks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "prefetched_blocks_total", + Help: "Total amount of blocks prefetched by the bloom-gateway", + }, []string{"status"}), } } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index ab890ab5d84f..f06d9d199f0f 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -26,6 +26,8 @@ type noopClient struct { callCount int } +var _ Client = &noopClient{} + // FilterChunks implements Client. func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.Interval, blocks []blockWithSeries, _ plan.QueryPlan) (result []*logproto.GroupedChunkRefs, err error) { for _, block := range blocks { @@ -39,6 +41,10 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In return result, c.err } +func (c *noopClient) PrefetchBloomBlocks(_ context.Context, _ []bloomshipper.BlockRef) error { + return nil +} + type mockBlockResolver struct{} // Resolve implements BlockResolver. diff --git a/pkg/logproto/bloomgateway.pb.go b/pkg/logproto/bloomgateway.pb.go index e4841b8ae0bd..2c1bbb8f7f3b 100644 --- a/pkg/logproto/bloomgateway.pb.go +++ b/pkg/logproto/bloomgateway.pb.go @@ -243,51 +243,134 @@ func (m *GroupedChunkRefs) GetLabels() *IndexSeries { return nil } +type PrefetchBloomBlocksRequest struct { + Blocks []string `protobuf:"bytes,1,rep,name=blocks,proto3" json:"blocks,omitempty"` +} + +func (m *PrefetchBloomBlocksRequest) Reset() { *m = PrefetchBloomBlocksRequest{} } +func (*PrefetchBloomBlocksRequest) ProtoMessage() {} +func (*PrefetchBloomBlocksRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_a50b5dd1dbcd1415, []int{4} +} +func (m *PrefetchBloomBlocksRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PrefetchBloomBlocksRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PrefetchBloomBlocksRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PrefetchBloomBlocksRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PrefetchBloomBlocksRequest.Merge(m, src) +} +func (m *PrefetchBloomBlocksRequest) XXX_Size() int { + return m.Size() +} +func (m *PrefetchBloomBlocksRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PrefetchBloomBlocksRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PrefetchBloomBlocksRequest proto.InternalMessageInfo + +func (m *PrefetchBloomBlocksRequest) GetBlocks() []string { + if m != nil { + return m.Blocks + } + return nil +} + +type PrefetchBloomBlocksResponse struct { +} + +func (m *PrefetchBloomBlocksResponse) Reset() { *m = PrefetchBloomBlocksResponse{} } +func (*PrefetchBloomBlocksResponse) ProtoMessage() {} +func (*PrefetchBloomBlocksResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_a50b5dd1dbcd1415, []int{5} +} +func (m *PrefetchBloomBlocksResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PrefetchBloomBlocksResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PrefetchBloomBlocksResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PrefetchBloomBlocksResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PrefetchBloomBlocksResponse.Merge(m, src) +} +func (m *PrefetchBloomBlocksResponse) XXX_Size() int { + return m.Size() +} +func (m *PrefetchBloomBlocksResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PrefetchBloomBlocksResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PrefetchBloomBlocksResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*FilterChunkRefRequest)(nil), "logproto.FilterChunkRefRequest") proto.RegisterType((*FilterChunkRefResponse)(nil), "logproto.FilterChunkRefResponse") proto.RegisterType((*ShortRef)(nil), "logproto.ShortRef") proto.RegisterType((*GroupedChunkRefs)(nil), "logproto.GroupedChunkRefs") + proto.RegisterType((*PrefetchBloomBlocksRequest)(nil), "logproto.PrefetchBloomBlocksRequest") + proto.RegisterType((*PrefetchBloomBlocksResponse)(nil), "logproto.PrefetchBloomBlocksResponse") } func init() { proto.RegisterFile("pkg/logproto/bloomgateway.proto", fileDescriptor_a50b5dd1dbcd1415) } var fileDescriptor_a50b5dd1dbcd1415 = []byte{ - // 532 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0xa2, 0x95, 0x15, 0xa4, 0x8d, 0x95, 0x03, 0xcd, - 0x05, 0x5b, 0x4a, 0x85, 0xc4, 0x85, 0x4b, 0x2a, 0x51, 0xf5, 0x06, 0x5b, 0xc4, 0x01, 0x89, 0x83, - 0xe3, 0x8c, 0x7f, 0x14, 0x7b, 0xd7, 0xdd, 0x5d, 0x03, 0xbd, 0xf1, 0x08, 0xbc, 0x03, 0x17, 0x9e, - 0x80, 0x67, 0xe8, 0x31, 0xe2, 0x54, 0x71, 0xa8, 0x88, 0x73, 0xe1, 0xd8, 0x47, 0x40, 0x76, 0xe2, - 0xba, 0xa9, 0x40, 0x95, 0x38, 0x71, 0xf2, 0xee, 0xce, 0x37, 0xe3, 0x6f, 0xbe, 0x6f, 0x06, 0xf7, - 0xd3, 0x59, 0xe0, 0xc4, 0x22, 0x48, 0xa5, 0xd0, 0xc2, 0x99, 0xc4, 0x42, 0x24, 0x81, 0xab, 0xe1, - 0x83, 0x7b, 0x62, 0x97, 0x4f, 0xa4, 0x5d, 0x05, 0x7b, 0x0f, 0x03, 0x11, 0x88, 0x15, 0xae, 0x38, - 0xad, 0xe2, 0xbd, 0x47, 0x1b, 0x05, 0xaa, 0xc3, 0x2a, 0x38, 0xf8, 0xbe, 0x85, 0xb7, 0x5f, 0x44, - 0xb1, 0x06, 0xb9, 0x1f, 0x66, 0x7c, 0xc6, 0xc0, 0x67, 0x70, 0x9c, 0x81, 0xd2, 0x64, 0x1f, 0x37, - 0x7d, 0x29, 0x12, 0x13, 0x59, 0x68, 0xd8, 0x18, 0x3b, 0xa7, 0xe7, 0x7d, 0xe3, 0xc7, 0x79, 0x7f, - 0x37, 0x88, 0x74, 0x98, 0x4d, 0x6c, 0x4f, 0x24, 0x4e, 0x2a, 0x45, 0x02, 0x3a, 0x84, 0x4c, 0x39, - 0x9e, 0x48, 0x12, 0xc1, 0x9d, 0x44, 0x4c, 0x21, 0xb6, 0x5f, 0x47, 0x09, 0xb0, 0x32, 0x99, 0x1c, - 0xe2, 0xdb, 0x3a, 0x94, 0x22, 0x0b, 0x42, 0x73, 0xeb, 0xdf, 0xea, 0x54, 0xf9, 0xc4, 0xc6, 0x4d, - 0x09, 0xbe, 0x32, 0x1b, 0x56, 0x63, 0xd8, 0x1d, 0xf5, 0xec, 0xcb, 0x46, 0x0e, 0xa4, 0xc8, 0x52, - 0x98, 0x56, 0xfc, 0x15, 0x2b, 0x71, 0xc4, 0xc5, 0xcd, 0x34, 0x76, 0xb9, 0x79, 0xcb, 0x42, 0xc3, - 0xee, 0xe8, 0x5e, 0x8d, 0x7f, 0x19, 0xbb, 0x7c, 0xfc, 0x7c, 0xcd, 0xe3, 0xe9, 0x15, 0x1e, 0x81, - 0x74, 0x7d, 0x97, 0xbb, 0x4e, 0x2c, 0x66, 0x91, 0xf3, 0x7e, 0xcf, 0x29, 0x74, 0x3b, 0xce, 0x40, - 0x46, 0x20, 0x9d, 0xa2, 0x94, 0xfd, 0x2a, 0x03, 0x79, 0x52, 0xa4, 0xb3, 0xb2, 0x34, 0xd9, 0xc1, - 0xad, 0x49, 0x2c, 0xbc, 0x99, 0x32, 0x5b, 0x56, 0x63, 0xd8, 0x61, 0xeb, 0xdb, 0x80, 0xe1, 0x9d, - 0xeb, 0x9a, 0xaa, 0x54, 0x70, 0x05, 0xe4, 0x19, 0xee, 0x78, 0x15, 0x4f, 0x13, 0xdd, 0xd8, 0x49, - 0x0d, 0x1e, 0x7c, 0x43, 0xb8, 0x7d, 0x14, 0x0a, 0xa9, 0x19, 0xf8, 0xff, 0x9d, 0x37, 0x3d, 0xdc, - 0xf6, 0x42, 0xf0, 0x66, 0x2a, 0x4b, 0xcc, 0x86, 0x85, 0x86, 0x77, 0xd9, 0xe5, 0x7d, 0xf0, 0x05, - 0xe1, 0x07, 0xd7, 0x1b, 0x23, 0x16, 0xee, 0xfa, 0x11, 0x0f, 0x40, 0xa6, 0x32, 0xe2, 0xba, 0xec, - 0xa3, 0xc9, 0xae, 0x3e, 0x15, 0xda, 0x6a, 0xe0, 0x2e, 0xd7, 0x25, 0xb9, 0x0e, 0x5b, 0xdf, 0xc8, - 0xe3, 0x8d, 0x31, 0x20, 0xb5, 0x78, 0x95, 0x38, 0x6b, 0xfb, 0x9f, 0xe0, 0x56, 0xec, 0x4e, 0x20, - 0x56, 0x66, 0xb3, 0x1c, 0x80, 0xed, 0x1a, 0x79, 0xc8, 0xa7, 0xf0, 0xf1, 0xa8, 0xf0, 0x55, 0xb1, - 0x35, 0x68, 0xe4, 0xe3, 0x3b, 0xe3, 0x62, 0xb5, 0x0e, 0x56, 0xab, 0x45, 0xde, 0xe0, 0xfb, 0x9b, - 0x16, 0x2a, 0xd2, 0xaf, 0x2b, 0xfc, 0x71, 0x63, 0x7a, 0xd6, 0xdf, 0x01, 0x2b, 0xfb, 0x07, 0xc6, - 0xf8, 0xdd, 0x7c, 0x41, 0x8d, 0xb3, 0x05, 0x35, 0x2e, 0x16, 0x14, 0x7d, 0xca, 0x29, 0xfa, 0x9a, - 0x53, 0x74, 0x9a, 0x53, 0x34, 0xcf, 0x29, 0xfa, 0x99, 0x53, 0xf4, 0x2b, 0xa7, 0xc6, 0x45, 0x4e, - 0xd1, 0xe7, 0x25, 0x35, 0xe6, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0xf1, 0x76, 0xf7, 0x86, 0x29, 0xad, - 0xfe, 0x3b, 0x69, 0x95, 0x9f, 0xbd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x05, 0x31, 0x08, - 0x35, 0x04, 0x00, 0x00, + // 581 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xbd, 0x6e, 0xd4, 0x40, + 0x10, 0xf6, 0xe6, 0x8e, 0x23, 0xd9, 0xf0, 0xa7, 0x85, 0x44, 0x96, 0x23, 0xf6, 0x2c, 0x0b, 0xc8, + 0x35, 0xd8, 0x52, 0x02, 0x12, 0x0d, 0xcd, 0x45, 0x22, 0x4a, 0x17, 0x36, 0x88, 0x02, 0x89, 0xc2, + 0xe7, 0x1b, 0xff, 0xe8, 0x6c, 0xaf, 0xb3, 0x5e, 0x03, 0xe9, 0x78, 0x04, 0xde, 0x81, 0x86, 0x27, + 0xe0, 0x19, 0x52, 0x50, 0x44, 0x54, 0x11, 0x45, 0x44, 0x7c, 0x0d, 0x65, 0x1e, 0x01, 0xd9, 0x3e, + 0xc7, 0x77, 0xd1, 0x45, 0x27, 0x51, 0x51, 0xd9, 0x3b, 0xf3, 0xcd, 0xcc, 0x37, 0xf3, 0xcd, 0xe0, + 0x6e, 0x32, 0xf2, 0xac, 0x90, 0x7b, 0x89, 0xe0, 0x92, 0x5b, 0x83, 0x90, 0xf3, 0xc8, 0xb3, 0x25, + 0x7c, 0xb4, 0x8f, 0xcc, 0xd2, 0x44, 0x96, 0x6b, 0xa7, 0xf6, 0xc0, 0xe3, 0x1e, 0xaf, 0x70, 0xc5, + 0x5f, 0xe5, 0xd7, 0x36, 0x66, 0x12, 0xd4, 0x3f, 0x95, 0xd3, 0xf8, 0xb9, 0x84, 0xd7, 0x5e, 0x05, + 0xa1, 0x04, 0xb1, 0xe3, 0x67, 0xf1, 0x88, 0x81, 0xcb, 0xe0, 0x30, 0x83, 0x54, 0x92, 0x1d, 0xdc, + 0x76, 0x05, 0x8f, 0x54, 0xa4, 0xa3, 0x5e, 0xab, 0x6f, 0x1d, 0x9f, 0x75, 0x95, 0x5f, 0x67, 0xdd, + 0x4d, 0x2f, 0x90, 0x7e, 0x36, 0x30, 0x1d, 0x1e, 0x59, 0x89, 0xe0, 0x11, 0x48, 0x1f, 0xb2, 0xd4, + 0x72, 0x78, 0x14, 0xf1, 0xd8, 0x8a, 0xf8, 0x10, 0x42, 0xf3, 0x4d, 0x10, 0x01, 0x2b, 0x83, 0xc9, + 0x1e, 0xbe, 0x29, 0x7d, 0xc1, 0x33, 0xcf, 0x57, 0x97, 0xfe, 0x2d, 0x4f, 0x1d, 0x4f, 0x4c, 0xdc, + 0x16, 0xe0, 0xa6, 0x6a, 0x4b, 0x6f, 0xf5, 0x56, 0xb7, 0x34, 0xf3, 0xb2, 0x91, 0x5d, 0xc1, 0xb3, + 0x04, 0x86, 0x35, 0xff, 0x94, 0x95, 0x38, 0x62, 0xe3, 0x76, 0x12, 0xda, 0xb1, 0x7a, 0x43, 0x47, + 0xbd, 0xd5, 0xad, 0x3b, 0x0d, 0x7e, 0x3f, 0xb4, 0xe3, 0xfe, 0xcb, 0x09, 0x8f, 0xe7, 0x53, 0x3c, + 0x3c, 0x61, 0xbb, 0x76, 0x6c, 0x5b, 0x21, 0x1f, 0x05, 0xd6, 0x87, 0x6d, 0xab, 0x98, 0xdb, 0x61, + 0x06, 0x22, 0x00, 0x61, 0x15, 0xa9, 0xcc, 0xd7, 0x19, 0x88, 0xa3, 0x22, 0x9c, 0x95, 0xa9, 0xc9, + 0x3a, 0xee, 0x0c, 0x42, 0xee, 0x8c, 0x52, 0xb5, 0xa3, 0xb7, 0x7a, 0x2b, 0x6c, 0xf2, 0x32, 0x18, + 0x5e, 0xbf, 0x3a, 0xd3, 0x34, 0xe1, 0x71, 0x0a, 0xe4, 0x05, 0x5e, 0x71, 0x6a, 0x9e, 0x2a, 0x5a, + 0xd8, 0x49, 0x03, 0x36, 0xbe, 0x23, 0xbc, 0x7c, 0xe0, 0x73, 0x21, 0x19, 0xb8, 0xff, 0x9d, 0x36, + 0x1a, 0x5e, 0x76, 0x7c, 0x70, 0x46, 0x69, 0x16, 0xa9, 0x2d, 0x1d, 0xf5, 0x6e, 0xb3, 0xcb, 0xb7, + 0xf1, 0x15, 0xe1, 0x7b, 0x57, 0x1b, 0x23, 0x3a, 0x5e, 0x75, 0x83, 0xd8, 0x03, 0x91, 0x88, 0x20, + 0x96, 0x65, 0x1f, 0x6d, 0x36, 0x6d, 0x2a, 0x66, 0x2b, 0x21, 0xb6, 0x63, 0x59, 0x92, 0x5b, 0x61, + 0x93, 0x17, 0x79, 0x32, 0xb3, 0x06, 0xa4, 0x19, 0x5e, 0x3d, 0x9c, 0x89, 0xfc, 0x4f, 0x71, 0x27, + 0xb4, 0x07, 0x10, 0xa6, 0x6a, 0xbb, 0x5c, 0x80, 0xb5, 0x06, 0xb9, 0x17, 0x0f, 0xe1, 0xd3, 0x41, + 0xa1, 0x6b, 0xca, 0x26, 0x20, 0xe3, 0x19, 0xd6, 0xf6, 0x05, 0xb8, 0x20, 0x1d, 0xbf, 0x5f, 0x9c, + 0x58, 0xbf, 0x54, 0xb2, 0xbe, 0x85, 0x46, 0x68, 0x34, 0x23, 0xf4, 0x43, 0xbc, 0x31, 0x37, 0xaa, + 0x52, 0x7b, 0xeb, 0x07, 0xc2, 0xb7, 0x4a, 0xfb, 0x6e, 0x75, 0xb0, 0xe4, 0x2d, 0xbe, 0x3b, 0xbb, + 0x18, 0x29, 0xe9, 0x36, 0xbc, 0xe6, 0xde, 0xa1, 0xa6, 0x5f, 0x0f, 0xa8, 0xca, 0x18, 0x0a, 0x19, + 0xe2, 0xfb, 0x73, 0x78, 0x90, 0x47, 0x53, 0x4b, 0x7f, 0x6d, 0x73, 0xda, 0xe3, 0x05, 0xa8, 0xba, + 0x4a, 0xff, 0xfd, 0xc9, 0x39, 0x55, 0x4e, 0xcf, 0xa9, 0x72, 0x71, 0x4e, 0xd1, 0xe7, 0x9c, 0xa2, + 0x6f, 0x39, 0x45, 0xc7, 0x39, 0x45, 0x27, 0x39, 0x45, 0xbf, 0x73, 0x8a, 0xfe, 0xe4, 0x54, 0xb9, + 0xc8, 0x29, 0xfa, 0x32, 0xa6, 0xca, 0xc9, 0x98, 0x2a, 0xa7, 0x63, 0xaa, 0xbc, 0xdb, 0x5c, 0x70, + 0x61, 0x75, 0xf1, 0x41, 0xa7, 0xfc, 0x6c, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x6b, 0x23, + 0x31, 0xf1, 0x04, 0x00, 0x00, } func (this *FilterChunkRefRequest) Equal(that interface{}) bool { @@ -433,6 +516,56 @@ func (this *GroupedChunkRefs) Equal(that interface{}) bool { } return true } +func (this *PrefetchBloomBlocksRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrefetchBloomBlocksRequest) + if !ok { + that2, ok := that.(PrefetchBloomBlocksRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Blocks) != len(that1.Blocks) { + return false + } + for i := range this.Blocks { + if this.Blocks[i] != that1.Blocks[i] { + return false + } + } + return true +} +func (this *PrefetchBloomBlocksResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrefetchBloomBlocksResponse) + if !ok { + that2, ok := that.(PrefetchBloomBlocksResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} func (this *FilterChunkRefRequest) GoString() string { if this == nil { return "nil" @@ -490,6 +623,25 @@ func (this *GroupedChunkRefs) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *PrefetchBloomBlocksRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.PrefetchBloomBlocksRequest{") + s = append(s, "Blocks: "+fmt.Sprintf("%#v", this.Blocks)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *PrefetchBloomBlocksResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&logproto.PrefetchBloomBlocksResponse{") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringBloomgateway(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -512,6 +664,7 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type BloomGatewayClient interface { FilterChunkRefs(ctx context.Context, in *FilterChunkRefRequest, opts ...grpc.CallOption) (*FilterChunkRefResponse, error) + PrefetchBloomBlocks(ctx context.Context, in *PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*PrefetchBloomBlocksResponse, error) } type bloomGatewayClient struct { @@ -531,9 +684,19 @@ func (c *bloomGatewayClient) FilterChunkRefs(ctx context.Context, in *FilterChun return out, nil } +func (c *bloomGatewayClient) PrefetchBloomBlocks(ctx context.Context, in *PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*PrefetchBloomBlocksResponse, error) { + out := new(PrefetchBloomBlocksResponse) + err := c.cc.Invoke(ctx, "/logproto.BloomGateway/PrefetchBloomBlocks", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BloomGatewayServer is the server API for BloomGateway service. type BloomGatewayServer interface { FilterChunkRefs(context.Context, *FilterChunkRefRequest) (*FilterChunkRefResponse, error) + PrefetchBloomBlocks(context.Context, *PrefetchBloomBlocksRequest) (*PrefetchBloomBlocksResponse, error) } // UnimplementedBloomGatewayServer can be embedded to have forward compatible implementations. @@ -543,6 +706,9 @@ type UnimplementedBloomGatewayServer struct { func (*UnimplementedBloomGatewayServer) FilterChunkRefs(ctx context.Context, req *FilterChunkRefRequest) (*FilterChunkRefResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method FilterChunkRefs not implemented") } +func (*UnimplementedBloomGatewayServer) PrefetchBloomBlocks(ctx context.Context, req *PrefetchBloomBlocksRequest) (*PrefetchBloomBlocksResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PrefetchBloomBlocks not implemented") +} func RegisterBloomGatewayServer(s *grpc.Server, srv BloomGatewayServer) { s.RegisterService(&_BloomGateway_serviceDesc, srv) @@ -566,6 +732,24 @@ func _BloomGateway_FilterChunkRefs_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _BloomGateway_PrefetchBloomBlocks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PrefetchBloomBlocksRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BloomGatewayServer).PrefetchBloomBlocks(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/logproto.BloomGateway/PrefetchBloomBlocks", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BloomGatewayServer).PrefetchBloomBlocks(ctx, req.(*PrefetchBloomBlocksRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _BloomGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "logproto.BloomGateway", HandlerType: (*BloomGatewayServer)(nil), @@ -574,6 +758,10 @@ var _BloomGateway_serviceDesc = grpc.ServiceDesc{ MethodName: "FilterChunkRefs", Handler: _BloomGateway_FilterChunkRefs_Handler, }, + { + MethodName: "PrefetchBloomBlocks", + Handler: _BloomGateway_PrefetchBloomBlocks_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/logproto/bloomgateway.proto", @@ -781,6 +969,61 @@ func (m *GroupedChunkRefs) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *PrefetchBloomBlocksRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PrefetchBloomBlocksRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PrefetchBloomBlocksRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Blocks) > 0 { + for iNdEx := len(m.Blocks) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Blocks[iNdEx]) + copy(dAtA[i:], m.Blocks[iNdEx]) + i = encodeVarintBloomgateway(dAtA, i, uint64(len(m.Blocks[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *PrefetchBloomBlocksResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PrefetchBloomBlocksResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PrefetchBloomBlocksResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func encodeVarintBloomgateway(dAtA []byte, offset int, v uint64) int { offset -= sovBloomgateway(v) base := offset @@ -880,6 +1123,30 @@ func (m *GroupedChunkRefs) Size() (n int) { return n } +func (m *PrefetchBloomBlocksRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Blocks) > 0 { + for _, s := range m.Blocks { + l = len(s) + n += 1 + l + sovBloomgateway(uint64(l)) + } + } + return n +} + +func (m *PrefetchBloomBlocksResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovBloomgateway(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -950,6 +1217,25 @@ func (this *GroupedChunkRefs) String() string { }, "") return s } +func (this *PrefetchBloomBlocksRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PrefetchBloomBlocksRequest{`, + `Blocks:` + fmt.Sprintf("%v", this.Blocks) + `,`, + `}`, + }, "") + return s +} +func (this *PrefetchBloomBlocksResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PrefetchBloomBlocksResponse{`, + `}`, + }, "") + return s +} func valueToStringBloomgateway(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1519,6 +1805,144 @@ func (m *GroupedChunkRefs) Unmarshal(dAtA []byte) error { } return nil } +func (m *PrefetchBloomBlocksRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBloomgateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrefetchBloomBlocksRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrefetchBloomBlocksRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Blocks", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBloomgateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBloomgateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthBloomgateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Blocks = append(m.Blocks, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBloomgateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBloomgateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBloomgateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PrefetchBloomBlocksResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBloomgateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrefetchBloomBlocksResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrefetchBloomBlocksResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipBloomgateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBloomgateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBloomgateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipBloomgateway(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logproto/bloomgateway.proto b/pkg/logproto/bloomgateway.proto index 809afb90b7b5..c34027e6cbc6 100644 --- a/pkg/logproto/bloomgateway.proto +++ b/pkg/logproto/bloomgateway.proto @@ -50,6 +50,13 @@ message GroupedChunkRefs { IndexSeries labels = 4; } +message PrefetchBloomBlocksRequest { + repeated string blocks = 1; +} + +message PrefetchBloomBlocksResponse {} + service BloomGateway { rpc FilterChunkRefs(FilterChunkRefRequest) returns (FilterChunkRefResponse) {} + rpc PrefetchBloomBlocks(PrefetchBloomBlocksRequest) returns (PrefetchBloomBlocksResponse) {} } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 726c8cfe52a6..0e53cbd00e4a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1644,6 +1644,22 @@ func (t *Loki) initBloomBuilder() (services.Service, error) { ringManager = t.indexGatewayRingManager } + var bloomGatewayClient bloomgateway.Client + if t.Cfg.BloomGateway.Enabled { + var err error + bloomGatewayClient, err = bloomgateway.NewClient( + t.Cfg.BloomGateway.Client, + t.Overrides, + prometheus.DefaultRegisterer, + logger, + t.cacheGenerationLoader, + t.Cfg.CompactorConfig.RetentionEnabled, + ) + if err != nil { + return nil, err + } + } + return builder.New( t.Cfg.BloomBuild.Builder, t.Overrides, @@ -1652,6 +1668,7 @@ func (t *Loki) initBloomBuilder() (services.Service, error) { t.ClientMetrics, t.Store, t.BloomStore, + bloomGatewayClient, logger, prometheus.DefaultRegisterer, ringManager, diff --git a/pkg/storage/stores/shipper/bloomshipper/blockscache.go b/pkg/storage/stores/shipper/bloomshipper/blockscache.go index a7992af267c5..3e1c1f656c5d 100644 --- a/pkg/storage/stores/shipper/bloomshipper/blockscache.go +++ b/pkg/storage/stores/shipper/bloomshipper/blockscache.go @@ -36,7 +36,7 @@ type Cache interface { Put(ctx context.Context, key string, value BlockDirectory) error PutInc(ctx context.Context, key string, value BlockDirectory) error PutMany(ctx context.Context, keys []string, values []BlockDirectory) error - Get(ctx context.Context, key string) (BlockDirectory, bool) + Get(ctx context.Context, key string, opts ...CacheGetOption) (BlockDirectory, bool) Release(ctx context.Context, key string) error Stop() } @@ -288,27 +288,50 @@ func (c *BlocksCache) evict(key string, element *list.Element, reason string) { c.metrics.entriesEvicted.WithLabelValues(reason).Inc() } +type cacheGetOptions struct { + ReportHitMiss bool +} + +func (o *cacheGetOptions) apply(options ...CacheGetOption) { + for _, opt := range options { + opt(o) + } +} + +type CacheGetOption func(opts *cacheGetOptions) + +func WithSkipHitMissMetrics(v bool) CacheGetOption { + return func(opts *cacheGetOptions) { + opts.ReportHitMiss = !v + } +} + // Get implements Cache. // Get returns the stored value against the given key. -func (c *BlocksCache) Get(ctx context.Context, key string) (BlockDirectory, bool) { +func (c *BlocksCache) Get(ctx context.Context, key string, opts ...CacheGetOption) (BlockDirectory, bool) { if ctx.Err() != nil { return BlockDirectory{}, false } + opt := &cacheGetOptions{ReportHitMiss: true} + opt.apply(opts...) + c.lock.Lock() defer c.lock.Unlock() - entry := c.get(key) + entry := c.get(key, opt) if entry == nil { return BlockDirectory{}, false } return entry.Value, true } -func (c *BlocksCache) get(key string) *Entry { +func (c *BlocksCache) get(key string, opt *cacheGetOptions) *Entry { element, exists := c.entries[key] if !exists { - c.metrics.misses.Inc() + if opt.ReportHitMiss { + c.metrics.misses.Inc() + } return nil } @@ -317,7 +340,10 @@ func (c *BlocksCache) get(key string) *Entry { c.lru.MoveToFront(element) - c.metrics.hits.Inc() + if opt.ReportHitMiss { + c.metrics.hits.Inc() + } + return entry } diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 3e81e2da5b1d..fb22c89869a0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -22,7 +22,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/spanlogger" ) -var downloadQueueCapacity = 10000 +var downloadQueueCapacity = 100000 type options struct { ignoreNotFound bool // ignore 404s from object storage; default=true @@ -31,6 +31,8 @@ type options struct { // NB(owen-d): this can only be safely used when blooms are not captured outside // of iteration or it can introduce use-after-free bugs usePool mempool.Allocator + + CacheGetOptions []CacheGetOption } func (o *options) apply(opts ...FetchOption) { @@ -59,6 +61,12 @@ func WithPool(v mempool.Allocator) FetchOption { } } +func WithCacheGetOptions(cacheOpts ...CacheGetOption) FetchOption { + return func(opts *options) { + opts.CacheGetOptions = cacheOpts + } +} + type fetcher interface { FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...FetchOption) ([]*CloseableBlockQuerier, error) @@ -244,7 +252,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc var enqueueTime time.Duration for i := 0; i < n; i++ { key := cacheKey(refs[i]) - dir, isFound, err := f.fromCache(ctx, key) + dir, isFound, err := f.fromCache(ctx, key, cfg.CacheGetOptions...) if err != nil { return results, err } @@ -376,14 +384,14 @@ func (f *Fetcher) processTask(ctx context.Context, task downloadRequest[BlockRef } } -func (f *Fetcher) fromCache(ctx context.Context, key string) (BlockDirectory, bool, error) { +func (f *Fetcher) fromCache(ctx context.Context, key string, opts ...CacheGetOption) (BlockDirectory, bool, error) { var zero BlockDirectory if ctx.Err() != nil { return zero, false, errors.Wrap(ctx.Err(), "from cache") } - item, found := f.blocksCache.Get(ctx, key) + item, found := f.blocksCache.Get(ctx, key, opts...) // item wasn't found if !found { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 7387cc567c6a..23c3b4939f84 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -214,6 +214,7 @@ type Limits struct { BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` BloomTaskTargetSeriesChunkSize flagext.ByteSize `yaml:"bloom_task_target_series_chunk_size" json:"bloom_task_target_series_chunk_size" category:"experimental"` BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"` + BloomPrefetchBlocks bool `yaml:"bloom_prefetch_blocks" json:"bloom_prefetch_blocks" category:"experimental"` BloomMaxBlockSize flagext.ByteSize `yaml:"bloom_max_block_size" json:"bloom_max_block_size" category:"experimental"` BloomMaxBloomSize flagext.ByteSize `yaml:"bloom_max_bloom_size" json:"bloom_max_bloom_size" category:"experimental"` @@ -399,6 +400,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Experimental. Interval for computing the cache key in the Bloom Gateway.") f.StringVar(&l.BloomBlockEncoding, "bloom-build.block-encoding", "none", "Experimental. Compression algorithm for bloom block pages.") + f.BoolVar(&l.BloomPrefetchBlocks, "bloom-build.prefetch-blocks", false, "Experimental. Prefetch blocks on bloom gateways as soon as they are built.") _ = l.BloomMaxBlockSize.Set(defaultBloomBuildMaxBlockSize) f.Var(&l.BloomMaxBlockSize, "bloom-build.max-block-size", @@ -1055,6 +1057,10 @@ func (o *Overrides) BuilderResponseTimeout(userID string) time.Duration { return o.getOverridesForUser(userID).BloomBuilderResponseTimeout } +func (o *Overrides) PrefetchBloomBlocks(userID string) bool { + return o.getOverridesForUser(userID).BloomPrefetchBlocks +} + func (o *Overrides) BloomTaskMaxRetries(userID string) int { return o.getOverridesForUser(userID).BloomBuildTaskMaxRetries }