diff --git a/CHANGES.md b/CHANGES.md index e034542c5..346567151 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,6 +21,7 @@ Release Notes. - Add the "api version" service to gRPC and HTTP server. - Metadata: Wait for the existing registration to be removed before registering the node. - Stream: Introduce the batch scan to improve the performance of the query and limit the memory usage. +- Add memory protector to protect the memory usage of the system. It will limit the memory usage of the querying. ### Bug Fixes diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index 62ae16f1d..c56f410ea 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -94,7 +94,7 @@ func (q *queryService) Name() string { func (q *queryService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("distributed-query") - fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow query threshold, 0 means no slow query log") + fs.DurationVar(&q.slowQuery, "dst-slow-query", 5*time.Second, "distributed slow query threshold, 0 means no slow query log") return fs } diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 721bdb835..888d683de 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -61,7 +61,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query" ) -const defaultQueryTimeout = 30 * time.Second +const defaultQueryTimeout = 10 * time.Second type propertyServer struct { propertyv1.UnimplementedPropertyServiceServer diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index b2b7f2865..797bf83c7 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -25,6 +25,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" @@ -53,6 +54,7 @@ type option struct { type measure struct { databaseSupplier schema.Supplier + pm *protector.Memory indexTagMap map[string]struct{} l *logger.Logger schema *databasev1.Measure @@ -122,7 +124,8 @@ type measureSpec struct { topNAggregations []*databasev1.TopNAggregation } -func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec, l *logger.Logger, pipeline queue.Queue, +func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec, + l *logger.Logger, pipeline queue.Queue, pm *protector.Memory, ) (*measure, error) { m := &measure{ shardNum: shardNum, @@ -130,6 +133,7 @@ func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec, l *logge indexRules: spec.indexRules, topNAggregations: spec.topNAggregations, l: l, + pm: pm, } if err := m.parseSpec(); err != nil { return nil, err diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index 363e4b305..ca55ec800 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -75,8 +76,9 @@ func setUp() (*services, func()) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) metricSvc := observability.NewMetricService(metadataService, pipeline, "test", nil) + pm := protector.NewMemory(metricSvc) // Init Measure Service - measureService, err := measure.NewService(context.TODO(), metadataService, pipeline, nil, metricSvc) + measureService, err := measure.NewService(metadataService, pipeline, nil, metricSvc, pm) gomega.Expect(err).NotTo(gomega.HaveOccurred()) preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService} querySvc, err := query.NewService(context.TODO(), nil, measureService, metadataService, pipeline) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index f6871b313..229aa371d 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" @@ -267,6 +268,7 @@ type supplier struct { pipeline queue.Queue omr observability.MetricsRegistry l *logger.Logger + pm *protector.Memory path string option option } @@ -279,6 +281,7 @@ func newSupplier(path string, svc *service) *supplier { pipeline: svc.localPipeline, option: svc.option, omr: svc.omr, + pm: svc.pm, } } @@ -288,7 +291,7 @@ func (s *supplier) OpenResource(shardNum uint32, supplier resourceSchema.Supplie schema: measureSchema, indexRules: spec.IndexRules(), topNAggregations: spec.TopN(), - }, s.l, s.pipeline) + }, s.l, s.pipeline, s.pm) } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { @@ -351,5 +354,5 @@ func (s *portableSupplier) OpenResource(shardNum uint32, _ resourceSchema.Suppli schema: measureSchema, indexRules: spec.IndexRules(), topNAggregations: spec.TopN(), - }, s.l, nil) + }, s.l, nil, nil) } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 10f927593..4529c2989 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -353,6 +353,7 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } var hit int + var totalBlockBytes uint64 for tstIter.nextBlock() { if hit%checkDoneEvery == 0 { select { @@ -366,10 +367,14 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] p := tstIter.piHeap[0] bc.init(p.p, p.curBlock, qo) result.data = append(result.data, bc) + totalBlockBytes += bc.bm.uncompressedSizeBytes } if tstIter.Error() != nil { return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) } + if err := s.pm.AcquireResource(ctx, totalBlockBytes); err != nil { + return err + } result.sidToIndex = make(map[common.SeriesID]int) for i, si := range originalSids { result.sidToIndex[si] = i diff --git a/banyand/measure/service.go b/banyand/measure/service.go index e5eb353c4..5bc9383fd 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -28,6 +28,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -60,6 +61,7 @@ type service struct { omr observability.MetricsRegistry schemaRepo *schemaRepo l *logger.Logger + pm *protector.Memory root string option option maxDiskUsagePercent int @@ -147,11 +149,12 @@ func (s *service) GracefulStop() { } // NewService returns a new service. -func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Server, metricPipeline queue.Server, omr observability.MetricsRegistry) (Service, error) { +func NewService(metadata metadata.Repo, pipeline queue.Server, metricPipeline queue.Server, omr observability.MetricsRegistry, pm *protector.Memory) (Service, error) { return &service{ metadata: metadata, pipeline: pipeline, metricPipeline: metricPipeline, omr: omr, + pm: pm, }, nil } diff --git a/banyand/protector/protector.go b/banyand/protector/protector.go new file mode 100644 index 000000000..6cb7acd33 --- /dev/null +++ b/banyand/protector/protector.go @@ -0,0 +1,191 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package protector provides a set of protectors that stop the query services when the resource usage exceeds the limit. +package protector + +import ( + "context" + "errors" + "fmt" + "runtime/metrics" + "sync/atomic" + "time" + + "github.com/dustin/go-humanize" + + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var scope = observability.RootScope.SubScope("memory_protector") + +// Memory is a protector that stops the query services when the memory usage exceeds the limit. +type Memory struct { + omr observability.MetricsRegistry + limitGauge meter.Gauge + usageGauge meter.Gauge + l *logger.Logger + closed chan struct{} + blockedChan chan struct{} + allowedPercent int + allowedBytes run.Bytes + limit uint64 + usage uint64 +} + +// NewMemory creates a new Memory protector. +func NewMemory(omr observability.MetricsRegistry) *Memory { + queueSize := cgroups.CPUs() + factory := omr.With(scope) + + return &Memory{ + omr: omr, + blockedChan: make(chan struct{}, queueSize), + closed: make(chan struct{}), + + limitGauge: factory.NewGauge("limit"), + usageGauge: factory.NewGauge("usage"), + } +} + +// AcquireResource attempts to acquire a `size` amount of memory. +func (m *Memory) AcquireResource(ctx context.Context, size uint64) error { + if m.limit == 0 { + return nil + } + start := time.Now() + + select { + case m.blockedChan <- struct{}{}: + defer func() { <-m.blockedChan }() + case <-ctx.Done(): + return fmt.Errorf("context canceled while waiting for blocked queue slot: %w", ctx.Err()) + } + + for { + currentUsage := atomic.LoadUint64(&m.usage) + if currentUsage+size <= m.limit { + return nil + } + + select { + case <-time.After(100 * time.Millisecond): + continue + case <-ctx.Done(): + return fmt.Errorf( + "context canceled: memory acquisition failed (currentUsage: %d, limit: %d, size: %d, blockedDuration: %v): %w", + currentUsage, m.limit, size, time.Since(start), ctx.Err(), + ) + } + } +} + +// Name returns the name of the protector. +func (m *Memory) Name() string { + return "memory-protector" +} + +// FlagSet returns the flag set for the protector. +func (m *Memory) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet(m.Name()) + flagS.IntVarP(&m.allowedPercent, "allowed-percent", "", 75, + "Allowed bytes of memory usage. If the memory usage exceeds this value, the query services will stop. "+ + "Setting a large value may evict data from the OS page cache, causing high disk I/O.") + flagS.VarP(&m.allowedBytes, "allowed-bytes", "", "Allowed percentage of total memory usage. If usage exceeds this value, the query services will stop. "+ + "This takes effect only if `allowed-bytes` is 0. If usage is too high, it may cause OS page cache eviction.") + return flagS +} + +// Validate validates the protector's flags. +func (m *Memory) Validate() error { + if m.allowedPercent <= 0 || m.allowedPercent > 100 { + if m.allowedBytes <= 0 { + return errors.New("allowed-bytes must be greater than 0") + } + return errors.New("allowed-percent must be in the range (0, 100]") + } + return nil +} + +// PreRun initializes the protector. +func (m *Memory) PreRun(context.Context) error { + m.l = logger.GetLogger(m.Name()) + if m.allowedBytes > 0 { + m.limit = uint64(m.allowedBytes) + m.l.Info(). + Str("limit", humanize.Bytes(m.limit)). + Msg("memory protector enabled") + } else { + cgLimit, err := cgroups.MemoryLimit() + if err != nil { + m.l.Warn().Err(err).Msg("failed to get memory limit from cgroups, disable memory protector") + return nil + } + if cgLimit <= 0 || cgLimit > 1e18 { + m.l.Warn().Int64("cgroup_memory_limit", cgLimit).Msg("cgroup memory limit is invalid, disable memory protector") + return nil + } + m.limit = uint64(cgLimit) * uint64(m.allowedPercent) / 100 + m.l.Info(). + Str("limit", humanize.Bytes(m.limit)). + Str("cgroup_limit", humanize.Bytes(uint64(cgLimit))). + Int("percent", m.allowedPercent). + Msg("memory protector enabled") + } + m.limitGauge.Set(float64(m.limit)) + return nil +} + +// GracefulStop stops the protector. +func (m *Memory) GracefulStop() { + close(m.closed) +} + +// Serve starts the protector. +func (m *Memory) Serve() run.StopNotify { + if m.limit == 0 { + return m.closed + } + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-m.closed: + return + case <-ticker.C: + samples := []metrics.Sample{ + {Name: "/memory/classes/total:bytes"}, + } + metrics.Read(samples) + usedBytes := samples[0].Value.Uint64() + + atomic.StoreUint64(&m.usage, usedBytes) + + if usedBytes > m.limit { + m.l.Warn().Str("used", humanize.Bytes(usedBytes)).Str("limit", humanize.Bytes(m.limit)).Msg("memory usage exceeds limit") + } + } + } + }() + return m.closed +} diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go index 9abaf6284..e8a54f030 100644 --- a/banyand/stream/block_scanner.go +++ b/banyand/stream/block_scanner.go @@ -26,9 +26,11 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" @@ -80,6 +82,8 @@ var shardScanConcurrencyCh = make(chan struct{}, cgroups.CPUs()) type blockScanner struct { segment storage.Segment[*tsTable, *option] + pm *protector.Memory + l *logger.Logger series []*pbv1.Series seriesIDs []uint64 qo queryOptions @@ -165,6 +169,7 @@ func (q *blockScanner) scanBlocks(ctx context.Context, seriesList []uint64, tab case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) + q.l.Warn().Err(ti.Error()).Msg("cannot init tstIter") } return } @@ -178,10 +183,25 @@ func (q *blockScanner) scanBlocks(ctx context.Context, seriesList []uint64, tab bs.qo.elementFilter = filter bs.bm.copyFrom(p.curBlock) if len(batch.bss) >= cap(batch.bss) { + var totalBlockBytes uint64 + for i := range batch.bss { + totalBlockBytes += batch.bss[i].bm.uncompressedSizeBytes + } + if err := q.pm.AcquireResource(ctx, totalBlockBytes); err != nil { + batch.err = fmt.Errorf("cannot acquire resource: %w", err) + select { + case blockCh <- batch: + case <-ctx.Done(): + releaseBlockScanResultBatch(batch) + q.l.Warn().Err(err).Msg("cannot acquire resource") + } + return + } select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) + q.l.Warn().Int("batch.len", len(batch.bss)).Msg("context canceled while sending block") return } batch = generateBlockScanResultBatch() @@ -192,6 +212,7 @@ func (q *blockScanner) scanBlocks(ctx context.Context, seriesList []uint64, tab select { case blockCh <- batch: case <-ctx.Done(): + releaseBlockScanResultBatch(batch) } return diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index e566b4e02..07fdd8730 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" @@ -252,6 +253,7 @@ type supplier struct { pipeline queue.Queue omr observability.MetricsRegistry l *logger.Logger + pm *protector.Memory path string option option } @@ -264,6 +266,7 @@ func newSupplier(path string, svc *service) *supplier { pipeline: svc.localPipeline, option: svc.option, omr: svc.omr, + pm: svc.pm, } } @@ -272,7 +275,7 @@ func (s *supplier) OpenResource(shardNum uint32, supplier resourceSchema.Supplie return openStream(shardNum, supplier, streamSpec{ schema: streamSchema, indexRules: spec.IndexRules(), - }, s.l), nil + }, s.l, s.pm), nil } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { @@ -334,5 +337,5 @@ func (s *portableSupplier) OpenResource(shardNum uint32, _ resourceSchema.Suppli return openStream(shardNum, nil, streamSpec{ schema: streamSchema, indexRules: spec.IndexRules(), - }, s.l), nil + }, s.l, nil), nil } diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 062663c86..24658da33 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -82,6 +82,8 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m series: series, qo: qo, sm: s, + pm: s.pm, + l: s.l, } if sqo.Order == nil { result.asc = true @@ -91,6 +93,7 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m return result, nil } var result idxResult + result.pm = s.pm result.segments = segments result.sm = s result.qo = queryOptions{ diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go index 106e556c6..e49e0429f 100644 --- a/banyand/stream/query_by_idx.go +++ b/banyand/stream/query_by_idx.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" @@ -37,6 +38,7 @@ import ( type idxResult struct { sortingIter itersort.Iterator[*index.DocumentResult] sm *stream + pm *protector.Memory tabs []*tsTable elementIDsSorted []uint64 data []*blockCursor @@ -86,6 +88,7 @@ func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } var hit int + var totalBlockBytes uint64 for ti.nextBlock() { if hit%checkDoneEvery == 0 { select { @@ -99,10 +102,14 @@ func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error { p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) qr.data = append(qr.data, bc) + totalBlockBytes += bc.bm.uncompressedSizeBytes } if ti.Error() != nil { return fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) } + if err := qr.pm.AcquireResource(ctx, totalBlockBytes); err != nil { + return fmt.Errorf("cannot acquire resource: %w", err) + } return nil } diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go index ef45f09a2..5f07f2eaa 100644 --- a/banyand/stream/query_by_ts.go +++ b/banyand/stream/query_by_ts.go @@ -25,7 +25,9 @@ import ( "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -35,6 +37,8 @@ var _ model.StreamQueryResult = (*tsResult)(nil) type tsResult struct { sm *stream + pm *protector.Memory + l *logger.Logger segments []storage.Segment[*tsTable, option] series []*pbv1.Series shards []*model.StreamResult @@ -75,6 +79,8 @@ func (t *tsResult) scanSegment(ctx context.Context) error { segment: segment, qo: t.qo, series: t.series, + pm: t.pm, + l: t.l, } defer bs.close() if err := bs.searchSeries(ctx); err != nil { diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 18775ea2f..a3be38268 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -28,6 +28,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -52,13 +53,14 @@ type Service interface { var _ Service = (*service)(nil) type service struct { - schemaRepo schemaRepo writeListener bus.MessageListener metadata metadata.Repo pipeline queue.Server localPipeline queue.Queue omr observability.MetricsRegistry l *logger.Logger + pm *protector.Memory + schemaRepo schemaRepo root string option option maxDiskUsagePercent int @@ -136,10 +138,11 @@ func (s *service) GracefulStop() { } // NewService returns a new service. -func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Server, omr observability.MetricsRegistry) (Service, error) { +func NewService(metadata metadata.Repo, pipeline queue.Server, omr observability.MetricsRegistry, pm *protector.Memory) (Service, error) { return &service{ metadata: metadata, pipeline: pipeline, omr: omr, + pm: pm, }, nil } diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 9e4ebaa72..0b27dca68 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -26,6 +26,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -72,6 +73,7 @@ type stream struct { schema *databasev1.Stream tagMap map[string]*databasev1.TagSpec entityMap map[string]int + pm *protector.Memory name string group string indexRuleLocators partition.IndexRuleLocator @@ -111,12 +113,15 @@ type streamSpec struct { indexRules []*databasev1.IndexRule } -func openStream(shardNum uint32, db schema.Supplier, spec streamSpec, l *logger.Logger) *stream { +func openStream(shardNum uint32, db schema.Supplier, + spec streamSpec, l *logger.Logger, pm *protector.Memory, +) *stream { s := &stream{ shardNum: shardNum, schema: spec.schema, indexRules: spec.indexRules, l: l, + pm: pm, } s.parseSpec() if db == nil { diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index 71c4ea026..9a16a3e2a 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" @@ -76,9 +77,9 @@ func setUp() (*services, func()) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) metricSvc := observability.NewMetricService(metadataService, pipeline, "test", nil) - + pm := protector.NewMemory(metricSvc) // Init Stream Service - streamService, err := stream.NewService(context.TODO(), metadataService, pipeline, metricSvc) + streamService, err := stream.NewService(metadataService, pipeline, metricSvc, pm) gomega.Expect(err).NotTo(gomega.HaveOccurred()) preloadStreamSvc := &preloadStreamService{metaSvc: metadataService} querySvc, err := query.NewService(context.TODO(), streamService, nil, metadataService, pipeline) diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md index 3f19fd6a0..dd4a6c913 100644 --- a/docs/operation/configuration.md +++ b/docs/operation/configuration.md @@ -100,6 +100,11 @@ The following flags are used to configure the embedded etcd storage engine which - `--metadata-root-path string`: The root path of metadata (default: "/tmp"). +The following flags are used to configure the memory protector: + +- `--allowed-bytes bytes`: Allowed bytes of memory usage. If the memory usage exceeds this value, the query services will stop. Setting a large value may evict data from the OS page cache, causing high disk I/O. (default 0B) +- `--allowed-percent int`: Allowed percentage of total memory usage. If usage exceeds this value, the query services will stop. This takes effect only if `allowed-bytes` is 0. If usage is too high, it may cause OS page cache eviction. (default 75) + ### Observability - `--observability-listener-addr string`: Listen address for observability (default: ":2121"). diff --git a/docs/operation/troubleshooting/query.md b/docs/operation/troubleshooting/query.md index e231dca12..03cda77f9 100644 --- a/docs/operation/troubleshooting/query.md +++ b/docs/operation/troubleshooting/query.md @@ -125,3 +125,15 @@ If the `block_header` is: ``` The `PartID` is 377403, which means this block is in the data part `part_377403_/tmp/measure/measure-default/seg-20240923/shard-0/0000000000005c23b`. The `SeriesID` is 4570144289778100188, which means this block is for the series with the ID `4570144289778100188`. The `MinTimestamp` and `MaxTimestamp` are `Jun 16 23:08:08` and `Sep 24 23:08:08`, respectively. The `Count` is 1, which means there is only one data point in this block. The `UncompressedSize` is 16 B, which means the uncompressed size of this block is 16 bytes. + +### Memory Acquisition Failed + +When you faced the following error: + +```json +{"level":"error","module":"QUERY.MEASURE.MINUTE.SERVICE_CPM_MINUTE","error":"failed to query measure: context canceled: memory acquisition failed (currentUsage: 455081320, limit: 5, size: 1428, blockedDuration: 31.874045791s): context canceled","req":{"groups":["minute"], "name":"service_cpm_minute", "timeRange":{"begin":"2025-01-22T10:39:58Z", "end":"2025-01-22T11:09:58Z"}, "fieldProjection":{"names":["total", "value"]}},"time":"2025-01-22T11:11:38Z","message":"fail to query"} +``` + +It means the query service has reached the memory limit. The query service will stop if the memory usage exceeds the limit. The memory limit is controlled by the `allowed-bytes` or `allowed-percent` flags. If the memory is sucient, you can increase the memory limit by setting the `allowed-bytes` or `allowed-percent` flags. Please refer to the [Configuration](../configuration.md#data--storage) documentation for more information on setting the memory limit. + +BanyanDB get the cgroup memory limit. If the memory limit is not set, BanyanDB will ignore the memory limit. diff --git a/pkg/cgroups/memory.go b/pkg/cgroups/memory.go new file mode 100644 index 000000000..d4b6cbae7 --- /dev/null +++ b/pkg/cgroups/memory.go @@ -0,0 +1,142 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cgroups + +import ( + "bufio" + "fmt" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" +) + +const ( + procCgroupPath = "proc/self/cgroup" + procMountInfoPath = "proc/self/mountinfo" +) + +// MemoryLimit returns the memory limit in bytes for the current process. +func MemoryLimit() (int64, error) { + return getCgroupMemoryLimit(os.DirFS("/")) +} + +func getCgroupMemoryLimit(fsys fs.FS) (int64, error) { + isV2, err := isCGroupV2(fsys) + if err != nil { + return 0, fmt.Errorf("failed to determine cgroup version: %w", err) + } + + cgroupData, err := fs.ReadFile(fsys, procCgroupPath) + if err != nil { + return 0, fmt.Errorf("failed to read cgroup data: %w", err) + } + + memoryCgroupPath, err := findMemoryCgroupPath(string(cgroupData), isV2) + if err != nil { + return 0, fmt.Errorf("failed to find memory cgroup path: %w", err) + } + + var limitPath string + if isV2 { + limitPath = filepath.Join("sys/fs/cgroup", memoryCgroupPath, "memory.max") + } else { + limitPath = filepath.Join("sys/fs/cgroup/memory", memoryCgroupPath, "memory.limit_in_bytes") + } + + limit, err := parseMemoryLimit(fsys, limitPath) + if err != nil { + return 0, fmt.Errorf("failed to parse memory limit: %w", err) + } + + if !isV2 { + hierarchicalLimitPath := filepath.Join("sys/fs/cgroup/memory", memoryCgroupPath, "memory.hierarchical_memory_limit") + hierarchicalLimit, err := parseMemoryLimit(fsys, hierarchicalLimitPath) + if err != nil { + return 0, fmt.Errorf("failed to parse hierarchical memory limit: %w", err) + } + if hierarchicalLimit < limit { + limit = hierarchicalLimit + } + } + + return limit, nil +} + +func isCGroupV2(fsys fs.FS) (bool, error) { + file, err := fsys.Open(procMountInfoPath) + if err != nil { + return false, fmt.Errorf("failed to open mountinfo: %w", err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Fields(line) + if len(fields) < 5 { + continue + } + mountPoint := fields[4] + if mountPoint == "/sys/fs/cgroup" { + for _, field := range fields { + if strings.HasPrefix(field, "cgroup2") { + return true, nil + } + } + } + } + return false, nil +} + +func findMemoryCgroupPath(cgroupData string, isV2 bool) (string, error) { + lines := strings.Split(cgroupData, "\n") + for _, line := range lines { + fields := strings.Split(line, ":") + if len(fields) >= 3 { + if isV2 && fields[1] == "" { + // For cgroup v2, the second field is empty + return fields[2], nil + } else if !isV2 && fields[1] == "memory" { + // For cgroup v1, the second field is "memory" + return fields[2], nil + } + } + } + return "", fmt.Errorf("memory cgroup not found") +} + +func parseMemoryLimit(fsys fs.FS, path string) (int64, error) { + limitData, err := fs.ReadFile(fsys, path) + if err != nil { + return 0, fmt.Errorf("failed to read memory limit: %w", err) + } + + limitStr := strings.TrimSpace(string(limitData)) + if limitStr == "max" { + return -1, nil // -1 represents unlimited + } + + limit, err := strconv.ParseInt(limitStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse memory limit: %w", err) + } + + return limit, nil +} diff --git a/pkg/cgroups/memory_test.go b/pkg/cgroups/memory_test.go new file mode 100644 index 000000000..d7328f973 --- /dev/null +++ b/pkg/cgroups/memory_test.go @@ -0,0 +1,133 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cgroups + +import ( + "errors" + "io/fs" + "testing" + "testing/fstest" +) + +func TestGetCgroupMemoryLimit(t *testing.T) { + tests := []struct { + fs fs.FS + expectedError error + name string + expectedLimit int64 + }{ + { + name: "cgroup v1 with explicit memory limit", + fs: fstest.MapFS{ + "proc/self/cgroup": &fstest.MapFile{ + Data: []byte("0::/user.slice/user-1000.slice/session-1.scope\n1:memory:/user.slice/user-1000.slice/session-1.scope"), + }, + "proc/self/mountinfo": &fstest.MapFile{ + Data: []byte("36 35 0:33 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:18 - cgroup cgroup rw,memory"), + }, + "sys/fs/cgroup/memory/user.slice/user-1000.slice/session-1.scope/memory.limit_in_bytes": &fstest.MapFile{ + Data: []byte("1073741824"), + }, + "sys/fs/cgroup/memory/user.slice/user-1000.slice/session-1.scope/memory.hierarchical_memory_limit": &fstest.MapFile{ + Data: []byte("1073741824"), + }, + }, + expectedLimit: 1073741824, + expectedError: nil, + }, + { + name: "cgroup v1 with inherited memory limit", + fs: fstest.MapFS{ + "proc/self/cgroup": &fstest.MapFile{ + Data: []byte("0::/user.slice/user-1000.slice/session-1.scope\n1:memory:/user.slice/user-1000.slice/session-1.scope"), + }, + "proc/self/mountinfo": &fstest.MapFile{ + Data: []byte("36 35 0:33 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:18 - cgroup cgroup rw,memory"), + }, + "sys/fs/cgroup/memory/user.slice/user-1000.slice/session-1.scope/memory.limit_in_bytes": &fstest.MapFile{ + Data: []byte("9223372036854771712"), // Very high value + }, + "sys/fs/cgroup/memory/user.slice/user-1000.slice/session-1.scope/memory.hierarchical_memory_limit": &fstest.MapFile{ + Data: []byte("536870912"), + }, + }, + expectedLimit: 536870912, + expectedError: nil, + }, + { + name: "cgroup v2 with explicit memory limit", + fs: fstest.MapFS{ + "proc/self/cgroup": &fstest.MapFile{ + Data: []byte("0::/user.slice/user-1000.slice/session-1.scope"), + }, + "proc/self/mountinfo": &fstest.MapFile{ + Data: []byte("36 35 0:33 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:18 - cgroup2 cgroup rw"), + }, + "sys/fs/cgroup/user.slice/user-1000.slice/session-1.scope/memory.max": &fstest.MapFile{ + Data: []byte("2147483648"), + }, + }, + expectedLimit: 2147483648, + expectedError: nil, + }, + { + name: "cgroup v2 with unlimited memory", + fs: fstest.MapFS{ + "proc/self/cgroup": &fstest.MapFile{ + Data: []byte("0::/user.slice/user-1000.slice/session-1.scope"), + }, + "proc/self/mountinfo": &fstest.MapFile{ + Data: []byte("36 35 0:33 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:18 - cgroup2 cgroup rw"), + }, + "sys/fs/cgroup/user.slice/user-1000.slice/session-1.scope/memory.max": &fstest.MapFile{ + Data: []byte("max"), + }, + }, + expectedLimit: -1, + expectedError: nil, + }, + { + name: "missing cgroup file", + fs: fstest.MapFS{ + "proc/self/cgroup": &fstest.MapFile{ + Data: []byte("0::/user.slice/user-1000.slice/session-1.scope"), + }, + "proc/self/mountinfo": &fstest.MapFile{ + Data: []byte("36 35 0:33 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:18 - cgroup2 cgroup rw"), + }, + }, + expectedLimit: 0, + expectedError: errors.New("failed to read memory limit"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + limit, err := getCgroupMemoryLimit(tt.fs) + if err != nil && tt.expectedError == nil { + t.Fatalf("unexpected error: %v", err) + } + if err == nil && tt.expectedError != nil { + t.Fatalf("expected error: %v, got nil", tt.expectedError) + } + if limit != tt.expectedLimit { + t.Fatalf("expected limit: %d, got: %d", tt.expectedLimit, limit) + } + }) + } +} diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index 0c162f115..04feb9462 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/property" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/sub" @@ -46,16 +47,17 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { } localPipeline := queue.Local() metricSvc := observability.NewMetricService(metaSvc, localPipeline, "data", nil) + pm := protector.NewMemory(metricSvc) pipeline := sub.NewServer(metricSvc) propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc) if err != nil { l.Fatal().Err(err).Msg("failed to initiate property service") } - streamSvc, err := stream.NewService(ctx, metaSvc, pipeline, metricSvc) + streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } - measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, localPipeline, metricSvc) + measureSvc, err := measure.NewService(metaSvc, pipeline, localPipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } @@ -71,6 +73,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { metaSvc, localPipeline, metricSvc, + pm, pipeline, propertySvc, measureSvc, diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index e34ad0e13..89e362ec7 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/property" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" @@ -48,18 +49,19 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate metadata service") } metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) + pm := protector.NewMemory(metricSvc) propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc) if err != nil { l.Fatal().Err(err).Msg("failed to initiate property service") } - streamSvc, err := stream.NewService(ctx, metaSvc, pipeline, metricSvc) + streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } var srvMetrics *grpcprom.ServerMetrics srvMetrics.UnaryServerInterceptor() srvMetrics.UnaryServerInterceptor() - measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil, metricSvc) + measureSvc, err := measure.NewService(metaSvc, pipeline, nil, metricSvc, pm) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } @@ -82,6 +84,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { pipeline, metaSvc, metricSvc, + pm, propertySvc, measureSvc, streamSvc, diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 54280489f..3362e6cc7 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -40,7 +40,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -const defaultQueryTimeout = 30 * time.Second +const defaultQueryTimeout = 15 * time.Second var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index 02f169768..9fcb81f95 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -59,8 +59,7 @@ services: - sw_agent:/skywalking-java-agent oap: - # TODO: Use the official image when it's available - image: "docker.io/hanahmily/oap:${SW_OAP_COMMIT}" + image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}" expose: - 11800 - 12800 diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index e30f60433..fc1b39434 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -26,5 +26,5 @@ SW_ROVER_COMMIT=0ae8f12d6eb6cc9fa125c603ee57d0b21fc8c6d0 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3 SW_CTL_COMMIT=6210a3b79089535af782ca51359ce6c5b68890b2 -SW_OAP_COMMIT=e4249a2 +SW_OAP_COMMIT=8ae16de51167c12a27a58795657a416c991c901d SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965 diff --git a/test/stress/classic/env b/test/stress/classic/env index 86418d59e..9d6d6731b 100644 --- a/test/stress/classic/env +++ b/test/stress/classic/env @@ -27,7 +27,7 @@ SW_ROVER_COMMIT=0ae8f12d6eb6cc9fa125c603ee57d0b21fc8c6d0 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3 SW_CTL_COMMIT=6210a3b79089535af782ca51359ce6c5b68890b2 -SW_OAP_COMMIT=6d262cce62e156bd197177abb3640ea65bb2d38e +SW_OAP_COMMIT=8ae16de51167c12a27a58795657a416c991c901d SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965 VUS=10 diff --git a/test/stress/classic/env.dev b/test/stress/classic/env.dev index f3905d93a..e0adf380e 100644 --- a/test/stress/classic/env.dev +++ b/test/stress/classic/env.dev @@ -26,7 +26,7 @@ SW_ROVER_COMMIT=0ae8f12d6eb6cc9fa125c603ee57d0b21fc8c6d0 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3 SW_CTL_COMMIT=6210a3b79089535af782ca51359ce6c5b68890b2 -SW_OAP_COMMIT=6d262cce62e156bd197177abb3640ea65bb2d38e +SW_OAP_COMMIT=8ae16de51167c12a27a58795657a416c991c901d SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=d3f8fe894d1a206164b73f5b523d2eb62d9e9965 VUS=1 diff --git a/test/stress/trace/env b/test/stress/trace/env index 37a1faae7..3d6719e7d 100644 --- a/test/stress/trace/env +++ b/test/stress/trace/env @@ -14,4 +14,4 @@ # limitations under the License. -SW_OAP_COMMIT=6d262cce62e156bd197177abb3640ea65bb2d38e +SW_OAP_COMMIT=8ae16de51167c12a27a58795657a416c991c901d