Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pkg/metrics_store/metrics_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import (
// interface. Instead of storing entire Kubernetes objects, it stores metrics
// generated based on those objects.
type MetricsStore struct {
// metrics is a map indexed by Kubernetes object id, containing a slice of
// metric families, containing a slice of metrics. We need to keep metrics
// grouped by metric families in order to zip families with their help text in
// MetricsStore.WriteAll().
metrics sync.Map
// metrics points to a sync.Map indexed by Kubernetes object id, containing a slice of
// metric families, containing a slice of metrics. It's a pointer so cloned stores can
// safely share the same backing storage without copying or mutating it.
metrics *sync.Map

// generateMetricsFunc generates metrics based on a given Kubernetes object
// and returns them grouped by metric family.
Expand All @@ -48,7 +47,7 @@ func NewMetricsStore(headers []string, generateFunc func(interface{}) []metric.F
return &MetricsStore{
generateMetricsFunc: generateFunc,
headers: headers,
metrics: sync.Map{},
metrics: &sync.Map{},
}
}

Expand Down
109 changes: 76 additions & 33 deletions pkg/metrics_store/metrics_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func (m MetricsWriter) WriteAll(w io.Writer) error {
}

for i, help := range m.stores[0].headers {
if help != "" && help != "\n" {
// Skip empty headers (set by SanitizeHeaders for duplicates)
if help == "" {
continue
}

if help != "\n" {
help += "\n"
}

Expand Down Expand Up @@ -95,49 +100,87 @@ func (m MetricsWriter) WriteAll(w io.Writer) error {

// SanitizeHeaders sanitizes the headers of the given MetricsWriterList.
func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) MetricsWriterList {
var lastHeader string
clonedWriters := make(MetricsWriterList, 0, len(writers))
for _, writer := range writers {
clonedStores := make([]*MetricsStore, 0, len(writer.stores))
for _, store := range writer.stores {
clonedHeaders := make([]string, len(store.headers))
copy(clonedHeaders, store.headers)
clonedStore := &MetricsStore{
headers: clonedHeaders,
}
// Share the metrics backing storage by sharing the pointer.
clonedStore.metrics = store.metrics
clonedStores = append(clonedStores, clonedStore)
}
clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName})
}

// Deduplicate by metric name across all writers to handle non-consecutive duplicates during CRS reload.
seenHELP := make(map[string]struct{})
seenTYPE := make(map[string]struct{})
for _, writer := range clonedWriters {
if len(writer.stores) > 0 {
for i := 0; i < len(writer.stores[0].headers); {
for i := 0; i < len(writer.stores[0].headers); i++ {
header := writer.stores[0].headers[i]

// Removes duplicate headers from the given MetricsWriterList for the same family (generated through CRS).
// These are expected to be consecutive since G** resolution generates groups of similar metrics with same headers before moving onto the next G** spec in the CRS configuration.
// Skip this step if we encounter a repeated header, as it will be removed.
if header != lastHeader && strings.HasPrefix(header, "# HELP") {

// If the requested content type is text/plain, replace "info" and "statesets" with "gauge", as they are not recognized by Prometheus' plain text machinery.
// When Prometheus requests proto-based formats, this branch is also used because any requested format that is not OpenMetrics falls back to text/plain in metrics_handler.go
if contentType.FormatType() == expfmt.TypeTextPlain {
infoTypeString := string(metric.Info)
stateSetTypeString := string(metric.StateSet)
if strings.HasSuffix(header, infoTypeString) {
header = header[:len(header)-len(infoTypeString)] + string(metric.Gauge)
writer.stores[0].headers[i] = header
lines := strings.Split(header, "\n")
shouldRemove := false
modifiedLines := make([]string, 0, len(lines))

for _, line := range lines {
switch {
case strings.HasPrefix(line, "# HELP "):
fields := strings.Fields(line)
if len(fields) >= 3 {
metricName := fields[2]
if _, seen := seenHELP[metricName]; seen {
shouldRemove = true
break
}
seenHELP[metricName] = struct{}{}
modifiedLines = append(modifiedLines, line)
} else {
modifiedLines = append(modifiedLines, line)
}
case strings.HasPrefix(line, "# TYPE "):
if shouldRemove {
break
}
if strings.HasSuffix(header, stateSetTypeString) {
header = header[:len(header)-len(stateSetTypeString)] + string(metric.Gauge)
writer.stores[0].headers[i] = header
fields := strings.Fields(line)
if len(fields) >= 3 {
metricName := fields[2]
modifiedLine := line
if contentType.FormatType() == expfmt.TypeTextPlain {
infoTypeString := string(metric.Info)
stateSetTypeString := string(metric.StateSet)
if strings.HasSuffix(line, infoTypeString) {
modifiedLine = line[:len(line)-len(infoTypeString)] + string(metric.Gauge)
} else if strings.HasSuffix(line, stateSetTypeString) {
modifiedLine = line[:len(line)-len(stateSetTypeString)] + string(metric.Gauge)
}
}
if _, seen := seenTYPE[metricName]; seen {
shouldRemove = true
break
}
seenTYPE[metricName] = struct{}{}
modifiedLines = append(modifiedLines, modifiedLine)
} else {
modifiedLines = append(modifiedLines, line)
}
default:
modifiedLines = append(modifiedLines, line)
}
}

// Nullify duplicate headers after the sanitization to not miss out on any new candidates.
if header == lastHeader {
writer.stores[0].headers = append(writer.stores[0].headers[:i], writer.stores[0].headers[i+1:]...)

// Do not increment the index, as the next header is now at the current index.
continue
if shouldRemove {
writer.stores[0].headers[i] = ""
} else if len(modifiedLines) > 0 {
writer.stores[0].headers[i] = strings.Join(modifiedLines, "\n")
}

// Update the last header.
lastHeader = header

// Move to the next header.
i++
}
}
}

return writers
return clonedWriters
}
91 changes: 79 additions & 12 deletions pkg/metrics_store/metrics_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,11 @@ func TestSanitizeHeaders(t *testing.T) {
"# HELP foo foo_help\n# TYPE foo counter",
},
expectedHeaders: []string{
"",
"# HELP foo foo_help\n# TYPE foo gauge",
"# HELP foo foo_help\n# TYPE foo info",
"# HELP foo foo_help\n# TYPE foo stateset",
"# HELP foo foo_help\n# TYPE foo counter",
"",
"",
"",
},
},
{
Expand All @@ -309,10 +310,17 @@ func TestSanitizeHeaders(t *testing.T) {
"# HELP foo foo_help\n# TYPE foo counter",
},
expectedHeaders: []string{
"",
"",
"",
"# HELP foo foo_help\n# TYPE foo gauge",
"# HELP foo foo_help\n# TYPE foo info",
"# HELP foo foo_help\n# TYPE foo stateset",
"# HELP foo foo_help\n# TYPE foo counter",
"",
"",
"",
"",
"",
"",
"",
},
},
{
Expand All @@ -326,8 +334,11 @@ func TestSanitizeHeaders(t *testing.T) {
"# HELP foo foo_help\n# TYPE foo counter",
},
expectedHeaders: []string{
"",
"# HELP foo foo_help\n# TYPE foo gauge",
"# HELP foo foo_help\n# TYPE foo counter",
"",
"",
"",
},
},
{
Expand All @@ -347,23 +358,79 @@ func TestSanitizeHeaders(t *testing.T) {
"# HELP foo foo_help\n# TYPE foo counter",
},
expectedHeaders: []string{
"",
"",
"",
"# HELP foo foo_help\n# TYPE foo gauge",
"# HELP foo foo_help\n# TYPE foo counter",
"",
"",
"",
"",
"",
"",
"",
},
},
}

for _, testcase := range testcases {
writer := NewMetricsWriter("test", NewMetricsStore(testcase.headers, nil))
originalStore := NewMetricsStore(testcase.headers, nil)
writer := NewMetricsWriter("test", originalStore)
t.Run(testcase.name, func(t *testing.T) {
SanitizeHeaders(testcase.contentType, MetricsWriterList{writer})
if !reflect.DeepEqual(testcase.expectedHeaders, writer.stores[0].headers) {
t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, writer.stores[0].headers))
sanitizedWriters := SanitizeHeaders(testcase.contentType, MetricsWriterList{writer})
if !reflect.DeepEqual(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers) {
t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers))
}
if !reflect.DeepEqual(testcase.headers, originalStore.headers) {
t.Fatalf("Original headers were mutated. Expected: %v, Got: %v", testcase.headers, originalStore.headers)
}
})
}
}

func TestSanitizeHeadersImmutability(t *testing.T) {
originalHeaders := []string{
"# HELP foo_info foo_help\n# TYPE foo_info info",
"# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset",
"# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge",
}

store := NewMetricsStore(originalHeaders, nil)
writer := NewMetricsWriter("test", store)

textPlainContentType := expfmt.NewFormat(expfmt.TypeTextPlain)
sanitizedWriters1 := SanitizeHeaders(textPlainContentType, MetricsWriterList{writer})

if !reflect.DeepEqual(originalHeaders, store.headers) {
t.Fatalf("Original headers were mutated after first request. Expected: %v, Got: %v", originalHeaders, store.headers)
}

expectedTextHeaders := []string{
"# HELP foo_info foo_help\n# TYPE foo_info gauge",
"# HELP foo_stateset foo_help\n# TYPE foo_stateset gauge",
"# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge",
}
if !reflect.DeepEqual(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers) {
t.Fatalf("First request headers mismatch. (-want, +got):\n%s", cmp.Diff(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers))
}

openMetricsContentType := expfmt.NewFormat(expfmt.TypeOpenMetrics)
sanitizedWriters2 := SanitizeHeaders(openMetricsContentType, MetricsWriterList{writer})

if !reflect.DeepEqual(originalHeaders, store.headers) {
t.Fatalf("Original headers were mutated after second request. Expected: %v, Got: %v", originalHeaders, store.headers)
}

expectedOpenMetricsHeaders := []string{
"# HELP foo_info foo_help\n# TYPE foo_info info",
"# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset",
"# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge",
}
if !reflect.DeepEqual(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers) {
t.Fatalf("Second request headers mismatch. Expected OpenMetrics to preserve info/stateset. (-want, +got):\n%s", cmp.Diff(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers))
}
}

func BenchmarkSanitizeHeaders(b *testing.B) {
benchmarks := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/metricshandler/metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

m.metricsWriters = metricsstore.SanitizeHeaders(contentType, m.metricsWriters)
sanitizedWriters := metricsstore.SanitizeHeaders(contentType, m.metricsWriters)

requestedResources := parseResources(r.URL.Query()["resources"])
excludedResources := parseResources(r.URL.Query()["exclude_resources"])

for _, w := range m.metricsWriters {
for _, w := range sanitizedWriters {
if requestedResources != nil {
if _, ok := requestedResources[w.ResourceName]; !ok {
continue
Expand Down