From 7807477a938712f5315e9aaae7f7cd3f191c3771 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Mon, 19 Jan 2026 02:19:44 -0800 Subject: [PATCH 1/6] Resolve conflicts after rebase --- pkg/metrics_store/metrics_writer.go | 21 ++++++++- pkg/metrics_store/metrics_writer_test.go | 55 ++++++++++++++++++++++-- pkg/metricshandler/metrics_handler.go | 4 +- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 713e14b5c..ca89328bd 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -95,8 +95,25 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { // SanitizeHeaders sanitizes the headers of the given MetricsWriterList. func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) MetricsWriterList { - var lastHeader string + clonedWriters := make(MetricsWriterList, 0, len(writers)) for _, writer := range writers { + clonedStores := make([]*MetricsStore, 0, len(writer.stores)) + for _, store := range writer.stores { + clonedHeaders := make([]string, len(store.headers)) + copy(clonedHeaders, store.headers) + + clonedStore := &MetricsStore{ + headers: clonedHeaders, + metrics: store.metrics, + generateMetricsFunc: store.generateMetricsFunc, + } + clonedStores = append(clonedStores, clonedStore) + } + clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName}) + } + + var lastHeader string + for _, writer := range clonedWriters { if len(writer.stores) > 0 { for i := 0; i < len(writer.stores[0].headers); { header := writer.stores[0].headers[i] @@ -139,5 +156,5 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri } } - return writers + return clonedWriters } diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index dd7b56e68..01e8d059c 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -354,16 +354,63 @@ func TestSanitizeHeaders(t *testing.T) { } for _, testcase := range testcases { - writer := NewMetricsWriter("test", NewMetricsStore(testcase.headers, nil)) + originalStore := NewMetricsStore(testcase.headers, nil) + writer := NewMetricsWriter("test", originalStore) t.Run(testcase.name, func(t *testing.T) { - SanitizeHeaders(testcase.contentType, MetricsWriterList{writer}) - if !reflect.DeepEqual(testcase.expectedHeaders, writer.stores[0].headers) { - t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, writer.stores[0].headers)) + sanitizedWriters := SanitizeHeaders(testcase.contentType, MetricsWriterList{writer}) + if !reflect.DeepEqual(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers) { + t.Fatalf("(-want, +got):\n%s", cmp.Diff(testcase.expectedHeaders, sanitizedWriters[0].stores[0].headers)) + } + if !reflect.DeepEqual(testcase.headers, originalStore.headers) { + t.Fatalf("Original headers were mutated. Expected: %v, Got: %v", testcase.headers, originalStore.headers) } }) } } +func TestSanitizeHeadersImmutability(t *testing.T) { + originalHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info info", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + + store := NewMetricsStore(originalHeaders, nil) + writer := NewMetricsWriter("test", store) + + textPlainContentType := expfmt.NewFormat(expfmt.TypeTextPlain) + sanitizedWriters1 := SanitizeHeaders(textPlainContentType, MetricsWriterList{writer}) + + if !reflect.DeepEqual(originalHeaders, store.headers) { + t.Fatalf("Original headers were mutated after first request. Expected: %v, Got: %v", originalHeaders, store.headers) + } + + expectedTextHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info gauge", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset gauge", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + if !reflect.DeepEqual(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers) { + t.Fatalf("First request headers mismatch. (-want, +got):\n%s", cmp.Diff(expectedTextHeaders, sanitizedWriters1[0].stores[0].headers)) + } + + openMetricsContentType := expfmt.NewFormat(expfmt.TypeOpenMetrics) + sanitizedWriters2 := SanitizeHeaders(openMetricsContentType, MetricsWriterList{writer}) + + if !reflect.DeepEqual(originalHeaders, store.headers) { + t.Fatalf("Original headers were mutated after second request. Expected: %v, Got: %v", originalHeaders, store.headers) + } + + expectedOpenMetricsHeaders := []string{ + "# HELP foo_info foo_help\n# TYPE foo_info info", + "# HELP foo_stateset foo_help\n# TYPE foo_stateset stateset", + "# HELP foo_gauge foo_help\n# TYPE foo_gauge gauge", + } + if !reflect.DeepEqual(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers) { + t.Fatalf("Second request headers mismatch. Expected OpenMetrics to preserve info/stateset. (-want, +got):\n%s", cmp.Diff(expectedOpenMetricsHeaders, sanitizedWriters2[0].stores[0].headers)) + } +} + func BenchmarkSanitizeHeaders(b *testing.B) { benchmarks := []struct { name string diff --git a/pkg/metricshandler/metrics_handler.go b/pkg/metricshandler/metrics_handler.go index 4adc88f3b..ecdd9a5fd 100644 --- a/pkg/metricshandler/metrics_handler.go +++ b/pkg/metricshandler/metrics_handler.go @@ -220,12 +220,12 @@ func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - m.metricsWriters = metricsstore.SanitizeHeaders(contentType, m.metricsWriters) + sanitizedWriters := metricsstore.SanitizeHeaders(contentType, m.metricsWriters) requestedResources := parseResources(r.URL.Query()["resources"]) excludedResources := parseResources(r.URL.Query()["exclude_resources"]) - for _, w := range m.metricsWriters { + for _, w := range sanitizedWriters { if requestedResources != nil { if _, ok := requestedResources[w.ResourceName]; !ok { continue From 0cd26f9745caaf8e128e284137d04c0145661e58 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Mon, 19 Jan 2026 02:45:03 -0800 Subject: [PATCH 2/6] fix ci erros --- pkg/metrics_store/metrics_writer.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index ca89328bd..17cd47d54 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "strings" + "unsafe" "github.com/prometheus/common/expfmt" @@ -93,6 +94,13 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { return nil } +// shareMetricsMap shares the metrics map of the given MetricsStore +func shareMetricsMap(dst, src *MetricsStore) { + srcPtr := unsafe.Pointer(src) + dstPtr := unsafe.Pointer(dst) + *(*MetricsStore)(dstPtr) = *(*MetricsStore)(srcPtr) +} + // SanitizeHeaders sanitizes the headers of the given MetricsWriterList. func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) MetricsWriterList { clonedWriters := make(MetricsWriterList, 0, len(writers)) @@ -102,11 +110,9 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri clonedHeaders := make([]string, len(store.headers)) copy(clonedHeaders, store.headers) - clonedStore := &MetricsStore{ - headers: clonedHeaders, - metrics: store.metrics, - generateMetricsFunc: store.generateMetricsFunc, - } + clonedStore := &MetricsStore{} + shareMetricsMap(clonedStore, store) + clonedStore.headers = clonedHeaders clonedStores = append(clonedStores, clonedStore) } clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName}) From 5ba50075de283d913dd4547ceec02fbe93156c58 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Mon, 19 Jan 2026 09:47:06 -0800 Subject: [PATCH 3/6] refactor: use *sync.Map for MetricsStore.metrics to enable safe sharing --- pkg/metrics_store/metrics_store.go | 4 +-- pkg/metrics_store/metrics_writer.go | 32 +++++++++++------------- pkg/metrics_store/metrics_writer_test.go | 20 +++++++++++++++ 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/pkg/metrics_store/metrics_store.go b/pkg/metrics_store/metrics_store.go index d0be4bdff..36519bce8 100644 --- a/pkg/metrics_store/metrics_store.go +++ b/pkg/metrics_store/metrics_store.go @@ -32,7 +32,7 @@ type MetricsStore struct { // metric families, containing a slice of metrics. We need to keep metrics // grouped by metric families in order to zip families with their help text in // MetricsStore.WriteAll(). - metrics sync.Map + metrics *sync.Map // generateMetricsFunc generates metrics based on a given Kubernetes object // and returns them grouped by metric family. @@ -48,7 +48,7 @@ func NewMetricsStore(headers []string, generateFunc func(interface{}) []metric.F return &MetricsStore{ generateMetricsFunc: generateFunc, headers: headers, - metrics: sync.Map{}, + metrics: &sync.Map{}, } } diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 17cd47d54..44de96bb3 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "strings" - "unsafe" "github.com/prometheus/common/expfmt" @@ -60,7 +59,12 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { } for i, help := range m.stores[0].headers { - if help != "" && help != "\n" { + // Skip empty headers (set by SanitizeHeaders for duplicates) + if help == "" { + continue + } + + if help != "\n" { help += "\n" } @@ -94,13 +98,6 @@ func (m MetricsWriter) WriteAll(w io.Writer) error { return nil } -// shareMetricsMap shares the metrics map of the given MetricsStore -func shareMetricsMap(dst, src *MetricsStore) { - srcPtr := unsafe.Pointer(src) - dstPtr := unsafe.Pointer(dst) - *(*MetricsStore)(dstPtr) = *(*MetricsStore)(srcPtr) -} - // SanitizeHeaders sanitizes the headers of the given MetricsWriterList. func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) MetricsWriterList { clonedWriters := make(MetricsWriterList, 0, len(writers)) @@ -109,17 +106,18 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri for _, store := range writer.stores { clonedHeaders := make([]string, len(store.headers)) copy(clonedHeaders, store.headers) - - clonedStore := &MetricsStore{} - shareMetricsMap(clonedStore, store) - clonedStore.headers = clonedHeaders + clonedStore := &MetricsStore{ + headers: clonedHeaders, + } + // Share the metrics backing storage by sharing the pointer. + clonedStore.metrics = store.metrics clonedStores = append(clonedStores, clonedStore) } clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName}) } - var lastHeader string for _, writer := range clonedWriters { + var lastHeader string if len(writer.stores) > 0 { for i := 0; i < len(writer.stores[0].headers); { header := writer.stores[0].headers[i] @@ -146,10 +144,10 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri } // Nullify duplicate headers after the sanitization to not miss out on any new candidates. + // Set to empty string instead of deleting to preserve slice length alignment with metricFamilies. if header == lastHeader { - writer.stores[0].headers = append(writer.stores[0].headers[:i], writer.stores[0].headers[i+1:]...) - - // Do not increment the index, as the next header is now at the current index. + writer.stores[0].headers[i] = "" + i++ continue } diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index 01e8d059c..d91768bc2 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -286,6 +286,7 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", "# HELP foo foo_help\n# TYPE foo gauge", "# HELP foo foo_help\n# TYPE foo info", "# HELP foo foo_help\n# TYPE foo stateset", @@ -309,10 +310,17 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", + "", + "", "# HELP foo foo_help\n# TYPE foo gauge", + "", "# HELP foo foo_help\n# TYPE foo info", + "", "# HELP foo foo_help\n# TYPE foo stateset", + "", "# HELP foo foo_help\n# TYPE foo counter", + "", }, }, { @@ -326,7 +334,10 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", "# HELP foo foo_help\n# TYPE foo gauge", + "", + "", "# HELP foo foo_help\n# TYPE foo counter", }, }, @@ -347,8 +358,17 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo counter", }, expectedHeaders: []string{ + "", + "", + "", "# HELP foo foo_help\n# TYPE foo gauge", + "", + "", + "", + "", + "", "# HELP foo foo_help\n# TYPE foo counter", + "", }, }, } From 4b84297426daf5808ee6428d1fc0fc8b63b79984 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Mon, 19 Jan 2026 20:13:49 -0800 Subject: [PATCH 4/6] fix: deduplicate HELP/TYPE headers by metric name instead of adjacency --- pkg/metrics_store/metrics_writer.go | 83 +++++++++++++++--------- pkg/metrics_store/metrics_writer_test.go | 16 ++--- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 44de96bb3..5329a36f0 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -116,46 +116,69 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri clonedWriters = append(clonedWriters, &MetricsWriter{stores: clonedStores, ResourceName: writer.ResourceName}) } + // Deduplicate by metric name across all writers to handle non-consecutive duplicates during CRS reload. + seenHELP := make(map[string]struct{}) + seenTYPE := make(map[string]struct{}) for _, writer := range clonedWriters { - var lastHeader string if len(writer.stores) > 0 { - for i := 0; i < len(writer.stores[0].headers); { + for i := 0; i < len(writer.stores[0].headers); i++ { header := writer.stores[0].headers[i] - - // Removes duplicate headers from the given MetricsWriterList for the same family (generated through CRS). - // These are expected to be consecutive since G** resolution generates groups of similar metrics with same headers before moving onto the next G** spec in the CRS configuration. - // Skip this step if we encounter a repeated header, as it will be removed. - if header != lastHeader && strings.HasPrefix(header, "# HELP") { - - // If the requested content type is text/plain, replace "info" and "statesets" with "gauge", as they are not recognized by Prometheus' plain text machinery. - // When Prometheus requests proto-based formats, this branch is also used because any requested format that is not OpenMetrics falls back to text/plain in metrics_handler.go - if contentType.FormatType() == expfmt.TypeTextPlain { - infoTypeString := string(metric.Info) - stateSetTypeString := string(metric.StateSet) - if strings.HasSuffix(header, infoTypeString) { - header = header[:len(header)-len(infoTypeString)] + string(metric.Gauge) - writer.stores[0].headers[i] = header + lines := strings.Split(header, "\n") + shouldRemove := false + modifiedLines := make([]string, 0, len(lines)) + + for _, line := range lines { + if strings.HasPrefix(line, "# HELP ") { + fields := strings.Fields(line) + if len(fields) >= 3 { + metricName := fields[2] + if _, seen := seenHELP[metricName]; seen { + shouldRemove = true + break + } else { + seenHELP[metricName] = struct{}{} + modifiedLines = append(modifiedLines, line) + } + } else { + modifiedLines = append(modifiedLines, line) + } + } else if strings.HasPrefix(line, "# TYPE ") { + if shouldRemove { + break } - if strings.HasSuffix(header, stateSetTypeString) { - header = header[:len(header)-len(stateSetTypeString)] + string(metric.Gauge) - writer.stores[0].headers[i] = header + fields := strings.Fields(line) + if len(fields) >= 3 { + metricName := fields[2] + modifiedLine := line + if contentType.FormatType() == expfmt.TypeTextPlain { + infoTypeString := string(metric.Info) + stateSetTypeString := string(metric.StateSet) + if strings.HasSuffix(line, infoTypeString) { + modifiedLine = line[:len(line)-len(infoTypeString)] + string(metric.Gauge) + } else if strings.HasSuffix(line, stateSetTypeString) { + modifiedLine = line[:len(line)-len(stateSetTypeString)] + string(metric.Gauge) + } + } + if _, seen := seenTYPE[metricName]; seen { + shouldRemove = true + break + } else { + seenTYPE[metricName] = struct{}{} + modifiedLines = append(modifiedLines, modifiedLine) + } + } else { + modifiedLines = append(modifiedLines, line) } + } else { + modifiedLines = append(modifiedLines, line) } } - // Nullify duplicate headers after the sanitization to not miss out on any new candidates. - // Set to empty string instead of deleting to preserve slice length alignment with metricFamilies. - if header == lastHeader { + if shouldRemove { writer.stores[0].headers[i] = "" - i++ - continue + } else if len(modifiedLines) > 0 { + writer.stores[0].headers[i] = strings.Join(modifiedLines, "\n") } - - // Update the last header. - lastHeader = header - - // Move to the next header. - i++ } } } diff --git a/pkg/metrics_store/metrics_writer_test.go b/pkg/metrics_store/metrics_writer_test.go index d91768bc2..5f61f08fd 100644 --- a/pkg/metrics_store/metrics_writer_test.go +++ b/pkg/metrics_store/metrics_writer_test.go @@ -288,9 +288,9 @@ func TestSanitizeHeaders(t *testing.T) { expectedHeaders: []string{ "", "# HELP foo foo_help\n# TYPE foo gauge", - "# HELP foo foo_help\n# TYPE foo info", - "# HELP foo foo_help\n# TYPE foo stateset", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", }, }, { @@ -315,11 +315,11 @@ func TestSanitizeHeaders(t *testing.T) { "", "# HELP foo foo_help\n# TYPE foo gauge", "", - "# HELP foo foo_help\n# TYPE foo info", "", - "# HELP foo foo_help\n# TYPE foo stateset", "", - "# HELP foo foo_help\n# TYPE foo counter", + "", + "", + "", "", }, }, @@ -338,7 +338,7 @@ func TestSanitizeHeaders(t *testing.T) { "# HELP foo foo_help\n# TYPE foo gauge", "", "", - "# HELP foo foo_help\n# TYPE foo counter", + "", }, }, { @@ -367,7 +367,7 @@ func TestSanitizeHeaders(t *testing.T) { "", "", "", - "# HELP foo foo_help\n# TYPE foo counter", + "", "", }, }, From f1a18f82a9144178fa09a665995a952744a0cd35 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Tue, 20 Jan 2026 00:24:03 -0800 Subject: [PATCH 5/6] metrics_writer.go: replace if blocks with switch --- pkg/metrics_store/metrics_writer.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/metrics_store/metrics_writer.go b/pkg/metrics_store/metrics_writer.go index 5329a36f0..c7a8c32ba 100644 --- a/pkg/metrics_store/metrics_writer.go +++ b/pkg/metrics_store/metrics_writer.go @@ -128,21 +128,21 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri modifiedLines := make([]string, 0, len(lines)) for _, line := range lines { - if strings.HasPrefix(line, "# HELP ") { + switch { + case strings.HasPrefix(line, "# HELP "): fields := strings.Fields(line) if len(fields) >= 3 { metricName := fields[2] if _, seen := seenHELP[metricName]; seen { shouldRemove = true break - } else { - seenHELP[metricName] = struct{}{} - modifiedLines = append(modifiedLines, line) } + seenHELP[metricName] = struct{}{} + modifiedLines = append(modifiedLines, line) } else { modifiedLines = append(modifiedLines, line) } - } else if strings.HasPrefix(line, "# TYPE ") { + case strings.HasPrefix(line, "# TYPE "): if shouldRemove { break } @@ -162,14 +162,13 @@ func SanitizeHeaders(contentType expfmt.Format, writers MetricsWriterList) Metri if _, seen := seenTYPE[metricName]; seen { shouldRemove = true break - } else { - seenTYPE[metricName] = struct{}{} - modifiedLines = append(modifiedLines, modifiedLine) } + seenTYPE[metricName] = struct{}{} + modifiedLines = append(modifiedLines, modifiedLine) } else { modifiedLines = append(modifiedLines, line) } - } else { + default: modifiedLines = append(modifiedLines, line) } } From 4ccef574c825a05e43082ed9136fcea20e19fb6f Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Fri, 30 Jan 2026 00:37:57 -0800 Subject: [PATCH 6/6] docs: clarify metrics sync.Map is a shared pointer --- pkg/metrics_store/metrics_store.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/metrics_store/metrics_store.go b/pkg/metrics_store/metrics_store.go index 36519bce8..912ab3277 100644 --- a/pkg/metrics_store/metrics_store.go +++ b/pkg/metrics_store/metrics_store.go @@ -28,10 +28,9 @@ import ( // interface. Instead of storing entire Kubernetes objects, it stores metrics // generated based on those objects. type MetricsStore struct { - // metrics is a map indexed by Kubernetes object id, containing a slice of - // metric families, containing a slice of metrics. We need to keep metrics - // grouped by metric families in order to zip families with their help text in - // MetricsStore.WriteAll(). + // metrics points to a sync.Map indexed by Kubernetes object id, containing a slice of + // metric families, containing a slice of metrics. It's a pointer so cloned stores can + // safely share the same backing storage without copying or mutating it. metrics *sync.Map // generateMetricsFunc generates metrics based on a given Kubernetes object