Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return an http error during scraping if metrics collide when escaped to underscores #1641

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion prometheus/desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Desc struct {
// must be unique among all registered descriptors and can therefore be
// used as an identifier of the descriptor.
id uint64
// escapedID is similar to id, but with the metric and label names escaped
// with underscores.
escapedID uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use id for escaped ID?

// dimHash is a hash of the label names (preset and variable) and the
// Help string. Each Desc with the same fqName must have the same
// dimHash.
Expand Down Expand Up @@ -142,11 +145,18 @@ func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, const
}

xxh := xxhash.New()
for _, val := range labelValues {
escapedXXH := xxhash.New()
for i, val := range labelValues {
xxh.WriteString(val)
xxh.Write(separatorByteSlice)
if i == 0 {
val = model.EscapeName(val, model.UnderscoreEscaping)
}
escapedXXH.WriteString(val)
escapedXXH.Write(separatorByteSlice)
}
d.id = xxh.Sum64()
d.escapedID = escapedXXH.Sum64()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why not use id as escaped ID always?

// Sort labelNames so that order doesn't matter for the hash.
sort.Strings(labelNames)
// Now hash together (in this order) the help string and the sorted
Expand Down
16 changes: 16 additions & 0 deletions prometheus/promhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

"github.com/klauspost/compress/zstd"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -121,6 +122,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
if opts.MaxRequestsInFlight > 0 {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
}
var hasEscapedCollisions bool
if opts.Registry != nil {
// Initialize all possibilities that can occur below.
errCnt.WithLabelValues("gathering")
Expand All @@ -134,6 +136,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
}
}
}
hasEscapedCollisions = reg.HasEscapedCollision()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... should I use the singular or the plural? :)


// Select compression formats to offer based on default or user choice.
var compressions []string
Expand Down Expand Up @@ -190,6 +193,19 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
} else {
contentType = expfmt.Negotiate(req.Header)
}

if hasEscapedCollisions {
switch contentType.ToEscapingScheme() {
case model.UnderscoreEscaping, model.DotsEscaping:
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error: one or more metrics collide when escaped")
}
httpError(rsp, fmt.Errorf("one or more metrics collide when escaped"))
return
default:
}
}

rsp.Header().Set(contentTypeHeader, string(contentType))

w, encodingHeader, closeWriter, err := negotiateEncodingWriter(req, rsp, compressions)
Expand Down
50 changes: 50 additions & 0 deletions prometheus/promhttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"github.com/klauspost/compress/zstd"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -79,6 +81,10 @@ func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(),
return mfs, func() { g.doneInvoked++ }, err
}

func (g *mockTransactionGatherer) HasEscapedCollision() bool {
return g.g.HasEscapedCollision()
}

func readCompressedBody(r io.Reader, comp Compression) (string, error) {
switch comp {
case Gzip:
Expand Down Expand Up @@ -548,6 +554,50 @@ func TestNegotiateEncodingWriter(t *testing.T) {
}
}

func TestEscapedCollisions(t *testing.T) {
oldScheme := model.NameValidationScheme
defer func() {
model.NameValidationScheme = oldScheme
}()
model.NameValidationScheme = model.UTF8Validation

reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "A test metric with underscores",
}))
reg.MustRegister(prometheus.NewCounter(prometheus.CounterOpts{
Name: "test.metric",
Help: "A test metric with dots",
}))

handler := HandlerFor(reg, HandlerOpts{})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tada


t.Run("fail case", func(t *testing.T) {
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/metrics", nil)
request.Header.Add(acceptHeader, string(expfmt.NewFormat(expfmt.TypeTextPlain)))
handler.ServeHTTP(writer, request)
if writer.Code != 500 {
t.Errorf("wanted error 500, got %d", writer.Code)
}
expectErr := "An error has occurred while serving metrics:\n\none or more metrics collide when escaped\n"
if writer.Body.String() != expectErr {
t.Error("incorrect body returned, want " + expectErr + " got " + writer.Body.String())
}
})

t.Run("success case", func(t *testing.T) {
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/metrics", nil)
request.Header.Add(acceptHeader, string(expfmt.NewFormat(expfmt.TypeTextPlain).WithEscapingScheme(model.NoEscaping)))
handler.ServeHTTP(writer, request)
if writer.Code != 200 {
t.Errorf("wanted 200 OK, got %d", writer.Code)
}
})
}

func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
name string
Expand Down
129 changes: 115 additions & 14 deletions prometheus/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ func init() {
// pre-registered.
func NewRegistry() *Registry {
return &Registry{
collectorsByID: map[uint64]Collector{},
descIDs: map[uint64]struct{}{},
dimHashesByName: map[string]uint64{},
collectorsByID: map[uint64]Collector{},
collectorsByEscapedID: map[uint64]Collector{},
descIDs: map[uint64]struct{}{},
escapedDescIDs: map[uint64]struct{}{},
dimHashesByName: map[string]uint64{},
}
}

func (r *Registry) HasEscapedCollision() bool {
return r.hasEscapedCollision
}

// NewPedanticRegistry returns a registry that checks during collection if each
// collected Metric is consistent with its reported Desc, and if the Desc has
// actually been registered with the registry. Unchecked Collectors (those whose
Expand Down Expand Up @@ -158,6 +164,11 @@ type Gatherer interface {
// expose an incomplete result and instead disregard the returned
// MetricFamily protobufs in case the returned error is non-nil.
Gather() ([]*dto.MetricFamily, error)

// HasEscapedCollision returns true if any two of the registered metrics would
// be the same when escaped to underscores. This is needed to prevent
// duplicate metric issues when being scraped by a legacy system.
HasEscapedCollision() bool
}

// Register registers the provided Collector with the DefaultRegisterer.
Expand Down Expand Up @@ -194,6 +205,10 @@ func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
return gf()
}

func (gf GathererFunc) HasEscapedCollision() bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the most risky change. both mimir and avalanche use gathererfunc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return false
}

// AlreadyRegisteredError is returned by the Register method if the Collector to
// be registered has already been registered before, or a different Collector
// that collects the same metrics has been registered before. Registration fails
Expand Down Expand Up @@ -258,22 +273,36 @@ func (errs MultiError) MaybeUnwrap() error {
// Registry implements Collector to allow it to be used for creating groups of
// metrics. See the Grouping example for how this can be done.
type Registry struct {
mtx sync.RWMutex
collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
mtx sync.RWMutex
collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
// collectorsByEscapedID stores colletors by escapedID, only if escaped id is
// different (otherwise we can just do the lookup in the regular map).
collectorsByEscapedID map[uint64]Collector
descIDs map[uint64]struct{}
// escapedDescIDs records desc ids of the escaped version of the metric, only
// if different from the regular name.
escapedDescIDs map[uint64]struct{}
dimHashesByName map[string]uint64
uncheckedCollectors []Collector
pedanticChecksEnabled bool

// hasEscapedCollision is set to true if any two metrics that were not
// identical under UTF-8 would collide if scraped by a system that requires
// names to be escaped to legacy underscore replacement.
hasEscapedCollision bool
}

// Register implements Registerer.
func (r *Registry) Register(c Collector) error {
var (
descChan = make(chan *Desc, capDescChan)
newDescIDs = map[uint64]struct{}{}
newDimHashesByName = map[string]uint64{}
collectorID uint64 // All desc IDs XOR'd together.
duplicateDescErr error
descChan = make(chan *Desc, capDescChan)
newDescIDs = map[uint64]struct{}{}
newEscapedIDs = map[uint64]struct{}{}
newDimHashesByName = map[string]uint64{}
collectorID uint64 // All desc IDs XOR'd together.
escapedID uint64
duplicateDescErr error
duplicateEscapedDesc bool
)
go func() {
c.Describe(descChan)
Expand Down Expand Up @@ -307,6 +336,22 @@ func (r *Registry) Register(c Collector) error {
collectorID ^= desc.id
}

// Also check to see if the descID is unique when all the names are escaped
// to underscores. First check the primary map, then check the secondary
// map. We only officially log a collision later.
if _, exists := r.descIDs[desc.escapedID]; exists {
duplicateEscapedDesc = true
}
if _, exists := r.escapedDescIDs[desc.escapedID]; exists {
duplicateEscapedDesc = true
}
if _, exists := newEscapedIDs[desc.escapedID]; !exists {
if desc.escapedID != desc.id {
newEscapedIDs[desc.escapedID] = struct{}{}
}
escapedID ^= desc.escapedID
}

// Are all the label names and the help string consistent with
// previous descriptors of the same name?
// First check existing descriptors...
Expand All @@ -331,7 +376,17 @@ func (r *Registry) Register(c Collector) error {
r.uncheckedCollectors = append(r.uncheckedCollectors, c)
return nil
}
if existing, exists := r.collectorsByID[collectorID]; exists {

existing, collision := r.collectorsByID[collectorID]
// Also check whether the underscore-escaped versions of the IDs match.
if !collision {
_, escapedCollision := r.collectorsByID[escapedID]
r.hasEscapedCollision = r.hasEscapedCollision || escapedCollision
_, escapedCollision = r.collectorsByEscapedID[escapedID]
r.hasEscapedCollision = r.hasEscapedCollision || escapedCollision
}

if collision {
switch e := existing.(type) {
case *wrappingCollector:
return AlreadyRegisteredError{
Expand All @@ -351,23 +406,36 @@ func (r *Registry) Register(c Collector) error {
return duplicateDescErr
}

if duplicateEscapedDesc {
r.hasEscapedCollision = true
}

// Only after all tests have passed, actually register.
r.collectorsByID[collectorID] = c
// We only need to store the escapedID if it doesn't match the unescaped one.
if escapedID != collectorID {
r.collectorsByEscapedID[escapedID] = c
}
for hash := range newDescIDs {
r.descIDs[hash] = struct{}{}
}
for name, dimHash := range newDimHashesByName {
r.dimHashesByName[name] = dimHash
}
for hash := range newEscapedIDs {
r.escapedDescIDs[hash] = struct{}{}
}
return nil
}

// Unregister implements Registerer.
func (r *Registry) Unregister(c Collector) bool {
var (
descChan = make(chan *Desc, capDescChan)
descIDs = map[uint64]struct{}{}
collectorID uint64 // All desc IDs XOR'd together.
descChan = make(chan *Desc, capDescChan)
descIDs = map[uint64]struct{}{}
escapedDescIDs = map[uint64]struct{}{}
collectorID uint64 // All desc IDs XOR'd together.
collectorEscapedID uint64
)
go func() {
c.Describe(descChan)
Expand All @@ -377,6 +445,8 @@ func (r *Registry) Unregister(c Collector) bool {
if _, exists := descIDs[desc.id]; !exists {
collectorID ^= desc.id
descIDs[desc.id] = struct{}{}
collectorEscapedID ^= desc.escapedID
escapedDescIDs[desc.escapedID] = struct{}{}
}
}

Expand All @@ -391,9 +461,13 @@ func (r *Registry) Unregister(c Collector) bool {
defer r.mtx.Unlock()

delete(r.collectorsByID, collectorID)
delete(r.collectorsByEscapedID, collectorEscapedID)
for id := range descIDs {
delete(r.descIDs, id)
}
for id := range escapedDescIDs {
delete(r.escapedDescIDs, id)
}
// dimHashesByName is left untouched as those must be consistent
// throughout the lifetime of a program.
return true
Expand Down Expand Up @@ -802,6 +876,15 @@ func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
}

func (gs Gatherers) HasEscapedCollision() bool {
for _, g := range gs {
if g.HasEscapedCollision() {
return true
}
}
return false
}

// checkSuffixCollisions checks for collisions with the “magic” suffixes the
// Prometheus text format and the internal metric representation of the
// Prometheus server add while flattening Summaries and Histograms.
Expand Down Expand Up @@ -1033,6 +1116,15 @@ func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err err
}, errs.MaybeUnwrap()
}

func (r *MultiTRegistry) HasEscapedCollision() bool {
for _, g := range r.tGatherers {
if g.HasEscapedCollision() {
return true
}
}
return false
}

// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
// used by metric family is no longer used by a caller. This allows implementations with cache.
type TransactionalGatherer interface {
Expand All @@ -1058,6 +1150,11 @@ type TransactionalGatherer interface {
// Important: done is expected to be triggered (even if the error occurs!)
// once caller does not need returned slice of dto.MetricFamily.
Gather() (_ []*dto.MetricFamily, done func(), err error)

// HasEscapedCollision returns true if any two of the registered metrics would
// be the same when escaped to underscores. This is needed to prevent
// duplicate metric issues when being scraped by a legacy system.
HasEscapedCollision() bool
}

// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
Expand All @@ -1074,3 +1171,7 @@ func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), er
mfs, err := g.g.Gather()
return mfs, func() {}, err
}

func (g *noTransactionGatherer) HasEscapedCollision() bool {
return g.g.HasEscapedCollision()
}
Loading