Skip to content

Commit

Permalink
Ruler: Add support for per-user external labels (#6340)
Browse files Browse the repository at this point in the history
* Ruler: Add support for per-user external labels

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>

* Add more test cases

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>

* Update the cache before updating the rule manager

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>

---------

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
  • Loading branch information
damnever authored Dec 3, 2024
1 parent 5e6907d commit bc1b5be
Show file tree
Hide file tree
Showing 16 changed files with 380 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3546,6 +3546,9 @@ query_rejection:
# CLI flag: -ruler.query-offset
[ruler_query_offset: <duration> | default = 0s]

# external labels for alerting rules
[ruler_external_labels: <map of string (labelName) to string (labelValue)> | default = []]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down
7 changes: 4 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
// no need to initialize module if load path is empty
return nil, nil
}
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
runtimeConfigLoader := runtimeConfigLoader{cfg: t.Cfg}
t.Cfg.RuntimeConfig.Loader = runtimeConfigLoader.load

// make sure to set default limits before we start loading configuration into memory
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
Expand Down Expand Up @@ -612,14 +613,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (l *runtimeConfigTenantLimits) AllByUserID() map[string]*validation.Limits
return nil
}

func loadRuntimeConfig(r io.Reader) (interface{}, error) {
type runtimeConfigLoader struct {
cfg Config
}

func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
var overrides = &RuntimeConfigValues{}

decoder := yaml.NewDecoder(r)
Expand All @@ -74,6 +78,12 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
return nil, errMultipleDocuments
}

for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
return nil, err
}
}

return overrides, nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/cortex/runtime_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand All @@ -28,7 +29,8 @@ overrides:
'1235': *id001
'1236': *id001
`)
runtimeCfg, err := loadRuntimeConfig(yamlFile)
loader := runtimeConfigLoader{cfg: Config{Distributor: distributor.Config{ShardByAllLabels: true}}}
runtimeCfg, err := loader.load(yamlFile)
require.NoError(t, err)

limits := validation.Limits{
Expand All @@ -51,7 +53,7 @@ func TestLoadRuntimeConfig_ShouldLoadEmptyFile(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)
assert.Equal(t, &RuntimeConfigValues{}, actual)
}
Expand All @@ -60,7 +62,7 @@ func TestLoadRuntimeConfig_MissingPointerFieldsAreNil(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)

actualCfg, ok := actual.(*RuntimeConfigValues)
Expand Down Expand Up @@ -102,7 +104,7 @@ overrides:
}

for _, tc := range cases {
actual, err := loadRuntimeConfig(strings.NewReader(tc))
actual, err := runtimeConfigLoader{}.load(strings.NewReader(tc))
assert.Equal(t, errMultipleDocuments, err)
assert.Nil(t, actual)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
r := newTestRuler(t, cfg, store, nil)
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}
r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}

a := NewAPI(r, r.store, log.NewNopLogger())

Expand Down Expand Up @@ -508,7 +508,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
r := newTestRuler(t, cfg, store, nil)
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}
r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}

a := NewAPI(r, r.store, log.NewNopLogger())

Expand Down
1 change: 1 addition & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type RulesLimits interface {
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
RulerExternalLabels(userID string) labels.Labels
}

// EngineQueryFunc returns a new engine query function validating max queryLength.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestPusherErrors(t *testing.T) {
writes := prometheus.NewCounter(prometheus.CounterOpts{})
failures := prometheus.NewCounter(prometheus.CounterOpts{})

pa := NewPusherAppendable(pusher, "user-1", ruleLimits{}, writes, failures)
pa := NewPusherAppendable(pusher, "user-1", &ruleLimits{}, writes, failures)

lbls, err := parser.ParseMetric("foo_bar")
require.NoError(t, err)
Expand Down
68 changes: 68 additions & 0 deletions pkg/ruler/external_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ruler

import (
"sync"

"github.com/prometheus/prometheus/model/labels"
)

// userExternalLabels checks and merges per-user external labels with global external labels.
type userExternalLabels struct {
global labels.Labels
limits RulesLimits
builder *labels.Builder

mtx sync.Mutex
users map[string]labels.Labels
}

func newUserExternalLabels(global labels.Labels, limits RulesLimits) *userExternalLabels {
return &userExternalLabels{
global: global,
limits: limits,
builder: labels.NewBuilder(nil),

mtx: sync.Mutex{},
users: map[string]labels.Labels{},
}
}

func (e *userExternalLabels) get(userID string) (labels.Labels, bool) {
e.mtx.Lock()
defer e.mtx.Unlock()
lset, ok := e.users[userID]
return lset, ok
}

func (e *userExternalLabels) update(userID string) (labels.Labels, bool) {
lset := e.limits.RulerExternalLabels(userID)

e.mtx.Lock()
defer e.mtx.Unlock()

e.builder.Reset(e.global)
for _, l := range lset {
e.builder.Set(l.Name, l.Value)
}
lset = e.builder.Labels()

if !labels.Equal(e.users[userID], lset) {
e.users[userID] = lset
return lset, true
}
return lset, false
}

func (e *userExternalLabels) remove(user string) {
e.mtx.Lock()
defer e.mtx.Unlock()
delete(e.users, user)
}

func (e *userExternalLabels) cleanup() {
e.mtx.Lock()
defer e.mtx.Unlock()
for user := range e.users {
delete(e.users, user)
}
}
69 changes: 69 additions & 0 deletions pkg/ruler/external_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ruler

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestUserExternalLabels(t *testing.T) {
limits := ruleLimits{}
e := newUserExternalLabels(labels.FromStrings("from", "cortex"), &limits)

tests := []struct {
name string
removeBeforeTest bool
exists bool
userExternalLabels labels.Labels
expectedExternalLabels labels.Labels
}{
{
name: "global labels only",
removeBeforeTest: false,
exists: false,
userExternalLabels: nil,
expectedExternalLabels: labels.FromStrings("from", "cortex"),
},
{
name: "local labels without overriding",
removeBeforeTest: true,
exists: false,
userExternalLabels: labels.FromStrings("tag", "local"),
expectedExternalLabels: labels.FromStrings("from", "cortex", "tag", "local"),
},
{
name: "local labels that override globals",
removeBeforeTest: false,
exists: true,
userExternalLabels: labels.FromStrings("from", "cloud", "tag", "local"),
expectedExternalLabels: labels.FromStrings("from", "cloud", "tag", "local"),
},
}

const userID = "test-user"
for _, data := range tests {
data := data
t.Run(data.name, func(t *testing.T) {
if data.removeBeforeTest {
e.remove(userID)
}
_, exists := e.get(userID)
require.Equal(t, data.exists, exists)

limits.externalLabels = data.userExternalLabels
lset, ok := e.update(userID)
require.True(t, ok)
require.Equal(t, data.expectedExternalLabels, lset)
lset1, ok := e.update(userID)
require.False(t, ok) // Not updated.
require.Equal(t, data.expectedExternalLabels, lset1)
})
}

_, ok := e.get(userID)
require.True(t, ok)
e.cleanup()
_, ok = e.get(userID)
require.False(t, ok)
}
Loading

0 comments on commit bc1b5be

Please sign in to comment.