diff --git a/pkg/metrics_store/metrics_store.go b/pkg/metrics_store/metrics_store.go index d0be4bdff..912ab3277 100644 --- a/pkg/metrics_store/metrics_store.go +++ b/pkg/metrics_store/metrics_store.go @@ -28,11 +28,10 @@ import ( // interface. Instead of storing entire Kubernetes objects, it stores metrics // generated based on those objects. type MetricsStore struct { - // metrics is a map indexed by Kubernetes object id, containing a slice of - // metric families, containing a slice of metrics. We need to keep metrics - // grouped by metric families in order to zip families with their help text in - // MetricsStore.WriteAll(). - metrics sync.Map + // metrics points to a sync.Map indexed by Kubernetes object id, containing a slice of + // metric families, containing a slice of metrics. It's a pointer so cloned stores can + // safely share the same backing storage without copying or mutating it. + metrics *sync.Map // generateMetricsFunc generates metrics based on a given Kubernetes object // and returns them grouped by metric family. @@ -48,7 +47,7 @@ func NewMetricsStore(headers []string, generateFunc func(interface{}) []metric.F return &MetricsStore{ generateMetricsFunc: generateFunc, headers: headers, - metrics: sync.Map{}, + metrics: &sync.Map{}, } } diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 713e14b5c..c7a8c32ba 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -59,7 +59,12 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { } for i, help := range m.stores[0].headers { - if help != "" && help != "\n" { + // Skip empty headers (set by SanitizeHeaders for duplicates) + if help == "" { + continue + } + + if help != "\n" { help += "\n" } @@ -95,49 +100,87 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { // SanitizeHeaders sanitizes the headers of the given MetricsWriterList. func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) MetricsWriterList { - var lastHeader string + clonedWriters := make(MetricsWriterList, 0, len(writers)) for _, writer := range writers { + clonedStores := make([]*MetricsStore, 0, len(writer.stores)) + for _, store := range writer.stores { + clonedHeaders := make([]string, len(store.headers)) + copy(clonedHeaders, store.headers) + clonedStore := &MetricsStore{ + headers: clonedHeaders, + } + // Share the metrics backing storage by sharing the pointer. + clonedStore.metrics = store.metrics + clonedStores = append(clonedStores, clonedStore) + } + clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName}) + } + + // Deduplicate by metric name across all writers to handle non-consecutive duplicates during CRS reload. + seenHELP := make(map[string]struct{}) + seenTYPE := make(map[string]struct{}) + for _, writer := range clonedWriters { if len(writer.stores) > 0 { - for i := 0; i < len(writer.stores[0].headers); { + for i := 0; i < len(writer.stores[0].headers); i++ { header := writer.stores[0].headers[i] - - // Removes duplicate headers from the given MetricsWriterList for the same family (generated through CRS). - // These are expected to be consecutive since G** resolution generates groups of similar metrics with same headers before moving onto the next G** spec in the CRS configuration. - // Skip this step if we encounter a repeated header, as it will be removed. - if header != lastHeader && strings.HasPrefix(header, "# HELP") { - - // If the requested content type is text/plain, replace "info" and "statesets" with "gauge", as they are not recognized by Prometheus' plain text machinery. - // When Prometheus requests proto-based formats, this branch is also used because any requested format that is not OpenMetrics falls back to text/plain in metrics_handler.go - if contentType.FormatType() == expfmt.TypeTextPlain { - infoTypeString := string(metric.Info) - stateSetTypeString := string(metric.StateSet) - if strings.HasSuffix(header, infoTypeString) { - header = header[:len(header)-len(infoTypeString)] + string(metric.Gauge) - writer.stores[0].headers[i] = header + lines := strings.Split(header, "\n") + shouldRemove := false + modifiedLines := make([]string, 0, len(lines)) + + for _, line := range lines { + switch { + case strings.HasPrefix(line, "# HELP "): + fields := strings.Fields(line) + if len(fields) >= 3 { + metricName := fields[2] + if _, seen := seenHELP[metricName]; seen { + shouldRemove = true + break + } + seenHELP[metricName] = struct{}{} + modifiedLines = append(modifiedLines, line) + } else { + modifiedLines = append(modifiedLines, line) + } + case strings.HasPrefix(line, "# TYPE "): + if shouldRemove { + break } - if strings.HasSuffix(header, stateSetTypeString) { - header = header[:len(header)-len(stateSetTypeString)] + string(metric.Gauge) - writer.stores[0].headers[i] = header + fields := strings.Fields(line) + if len(fields) >= 3 { + metricName := fields[2] + modifiedLine := line + if contentType.FormatType() == expfmt.TypeTextPlain { + infoTypeString := string(metric.Info) + stateSetTypeString := string(metric.StateSet) + if strings.HasSuffix(line, infoTypeString) { + modifiedLine = line[:len(line)-len(infoTypeString)] + string(metric.Gauge) + } else if strings.HasSuffix(line, stateSetTypeString) { + modifiedLine = line[:len(line)-len(stateSetTypeString)] + string(metric.Gauge) + } + } + if _, seen := seenTYPE[metricName]; seen { + shouldRemove = true + break + } + seenTYPE[metricName] = struct{}{} + modifiedLines = append(modifiedLines, modifiedLine) + } else { + modifiedLines = append(modifiedLines, line) } + default: + modifiedLines = append(modifiedLines, line) } } - // Nullify duplicate headers after the sanitization to not miss out on any new candidates. - if header == lastHeader { - writer.stores[0].headers = append(writer.stores[0].headers[:i], writer.stores[0].headers[i+1:]...) - - // Do not increment the index, as the next header is now at the current index. - continue + if shouldRemove { + writer.stores[0].headers[i] = "" + } else if len(modifiedLines) > 0 { + writer.stores[0].headers[i] = strings.Join(modifiedLines, "\n") } - - // Update the last header. - lastHeader = header - - // Move to the next header. - i++ } } } - return writers + return clonedWriters } diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index dd7b56e68..5f61f08fd 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -286,10 +286,11 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", "# HELP foo foo_help\n# TYPE foo gauge", - "# HELP foo foo_help\n# TYPE foo info", - "# HELP foo foo_help\n# TYPE foo stateset", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", }, }, { @@ -309,10 +310,17 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", + "", + "", "# HELP foo foo_help\n# TYPE foo gauge", - "# HELP foo foo_help\n# TYPE foo info", - "# HELP foo foo_help\n# TYPE foo stateset", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", + "", + "", + "", + "", }, }, { @@ -326,8 +334,11 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", "# HELP foo foo_help\n# TYPE foo gauge", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", }, }, { @@ -347,23 +358,79 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", + "", + "", "# HELP foo foo_help\n# TYPE foo gauge", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", + "", + "", + "", + "", }, }, } for _, testcase := range testcases { - writer := NewMetricsWriter("test", NewMetricsStore(testcase.headers, nil)) + originalStore := NewMetricsStore(testcase.headers, nil) + writer := NewMetricsWriter("test", originalStore) t.Run(testcase.name, func(t *testing.T) { - SanitizeHeaders(testcase.contentType, MetricsWriterList{writer}) - if !reflect.DeepEqual(testcase.expectedHeaders, writer.stores[0].headers) { - t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, writer.stores[0].headers)) + sanitizedWriters := SanitizeHeaders(testcase.contentType, MetricsWriterList{writer}) + if !reflect.DeepEqual(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers) { + t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers)) + } + if !reflect.DeepEqual(testcase.headers, originalStore.headers) { + t.Fatalf("Original headers were mutated. Expected: %v, Got: %v", testcase.headers, originalStore.headers) } }) } } +func TestSanitizeHeadersImmutability(t *testing.T) { + originalHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info info", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + + store := NewMetricsStore(originalHeaders, nil) + writer := NewMetricsWriter("test", store) + + textPlainContentType := expfmt.NewFormat(expfmt.TypeTextPlain) + sanitizedWriters1 := SanitizeHeaders(textPlainContentType, MetricsWriterList{writer}) + + if !reflect.DeepEqual(originalHeaders, store.headers) { + t.Fatalf("Original headers were mutated after first request. Expected: %v, Got: %v", originalHeaders, store.headers) + } + + expectedTextHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info gauge", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset gauge", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + if !reflect.DeepEqual(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers) { + t.Fatalf("First request headers mismatch. (-want, +got):\n%s", cmp.Diff(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers)) + } + + openMetricsContentType := expfmt.NewFormat(expfmt.TypeOpenMetrics) + sanitizedWriters2 := SanitizeHeaders(openMetricsContentType, MetricsWriterList{writer}) + + if !reflect.DeepEqual(originalHeaders, store.headers) { + t.Fatalf("Original headers were mutated after second request. Expected: %v, Got: %v", originalHeaders, store.headers) + } + + expectedOpenMetricsHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info info", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + if !reflect.DeepEqual(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers) { + t.Fatalf("Second request headers mismatch. Expected OpenMetrics to preserve info/stateset. (-want, +got):\n%s", cmp.Diff(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers)) + } +} + func BenchmarkSanitizeHeaders(b *testing.B) { benchmarks := []struct { name string diff --git a/pkg/metricshandler/metrics_handler.go b/pkg/metricshandler/metrics_handler.go index 4adc88f3b..ecdd9a5fd 100644 --- a/pkg/metricshandler/metrics_handler.go +++ b/pkg/metricshandler/metrics_handler.go @@ -220,12 +220,12 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - m.metricsWriters = metricsstore.SanitizeHeaders(contentType, m.metricsWriters) + sanitizedWriters := metricsstore.SanitizeHeaders(contentType, m.metricsWriters) requestedResources := parseResources(r.URL.Query()["resources"]) excludedResources := parseResources(r.URL.Query()["exclude_resources"]) - for _, w := range m.metricsWriters { + for _, w := range sanitizedWriters { if requestedResources != nil { if _, ok := requestedResources[w.ResourceName]; !ok { continue