Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

Commit

Permalink
Add option to limit concurrent queries to Cassandra. (cortexproject#2562
Browse files Browse the repository at this point in the history
)

* Add option to limit concurrent queries to Cassandra.

Signed-off-by: Tom Wilkie <[email protected]>

* go mod vendor

Signed-off-by: Tom Wilkie <[email protected]>

* Add changelog entry.

Signed-off-by: Tom Wilkie <[email protected]>

* Review feedback.

Signed-off-by: Tom Wilkie <[email protected]>
  • Loading branch information
tomwilkie authored and gouthamve committed May 7, 2020
1 parent 9da7483 commit 5bf3618
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 7 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,10 @@ cassandra:
# CLI flag: -cassandra.retry-min-backoff
[retry_min_backoff: <duration> | default = 100ms]

# Limit number of concurrent queries to Cassandra. (Default is 0: no limit)
# CLI flag: -cassandra.query-concurrency
[query_concurrency: <int> | default = 0]

boltdb:
# Location of BoltDB index files.
# CLI flag: -boltdb.dir
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
go.uber.org/atomic v1.5.0
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/api v0.14.0
google.golang.org/grpc v1.25.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
36 changes: 30 additions & 6 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
Expand Down Expand Up @@ -39,6 +40,7 @@ type Config struct {
Retries int `yaml:"max_retries"`
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
MinBackoff time.Duration `yaml:"retry_min_backoff"`
QueryConcurrency int `yaml:"query_concurrency"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -62,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. (Default is 0: no retries)")
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -179,9 +182,10 @@ func (cfg *Config) createKeyspace() error {

// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
type StorageClient struct {
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
querySemaphore *semaphore.Weighted
}

// NewStorageClient returns a new StorageClient.
Expand All @@ -193,10 +197,16 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
return nil, errors.WithStack(err)
}

var querySemaphore *semaphore.Weighted
if cfg.QueryConcurrency > 0 {
querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency))
}

client := &StorageClient{
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
querySemaphore: querySemaphore,
}
return client, nil
}
Expand Down Expand Up @@ -252,6 +262,13 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue
}

func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer s.querySemaphore.Release(1)
}

var q *gocql.Query

switch {
Expand Down Expand Up @@ -358,6 +375,13 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
}

func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return input, err
}
defer s.querySemaphore.Release(1)
}

tableName, err := s.schemaCfg.ChunkTableFor(input.From)
if err != nil {
return input, err
Expand Down
136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5bf3618

Please sign in to comment.