Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serializers.prometheusremotewrite): Log metric conversion errors #15893

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/serializers/prometheusremotewrite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ it is not included in the final metric name.
Prometheus labels are produced for each tag.

**Note:** String fields are ignored and do not produce Prometheus metrics.
Set **log_level** to `trace` to see all serialization issues.
36 changes: 32 additions & 4 deletions plugins/serializers/prometheusremotewrite/prometheusremotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@ import (
type MetricKey uint64

type Serializer struct {
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
Log telegraf.Logger `toml:"-"`
}

func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeBatch([]telegraf.Metric{metric})
}

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var buf bytes.Buffer
var lastErr error
// traceAndKeepErr logs on Trace level every passed error.
// with each call it updates lastErr, so it can be logged later with higher level.
traceAndKeepErr := func(format string, a ...any) {
lastErr = fmt.Errorf(format, a...)
s.Log.Trace(lastErr)
}

var buf bytes.Buffer
var entries = make(map[MetricKey]prompb.TimeSeries)
var labels = make([]prompb.Label, 0)
for _, metric := range metrics {
Expand All @@ -41,6 +49,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
metricName, ok := prometheus.SanitizeMetricName(metricName)
if !ok {
traceAndKeepErr("failed to parse metric name %q", metricName)
continue
}

Expand All @@ -52,6 +61,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case telegraf.Untyped:
value, ok := prometheus.SampleValue(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
Expand All @@ -78,14 +88,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

le, ok := metric.GetTag("le")
if !ok {
traceAndKeepErr("failed to parse %q: can't find `le` label", metricName)
continue
}
bound, err := strconv.ParseFloat(le, 64)
if err != nil {
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, le, err)
continue
}
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -97,13 +110,15 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -119,35 +134,41 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
traceAndKeepErr("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key)
continue
}
case telegraf.Summary:
switch {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
quantileTag, ok := metric.GetTag("quantile")
if !ok {
traceAndKeepErr("failed to parse %q: can't find `quantile` label", metricName)
continue
}
quantile, err := strconv.ParseFloat(quantileTag, 64)
if err != nil {
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, quantileTag, err)
continue
}
value, ok := prometheus.SampleValue(field.Value)
if !ok {
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -162,18 +183,25 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
}

// A batch of metrics can contain multiple values for a single
// Prometheus sample. If this metric is older than the existing
// Prometheus sample. If this metric is older than the existing
// sample then we can skip over it.
m, ok := entries[metrickey]
if ok {
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
traceAndKeepErr("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time())
continue
}
}
entries[metrickey] = promts
}
}

if lastErr != nil {
// log only the last recorded error in the batch, as it could have many errors and logging each one
// could be too verbose. The following log line still provides enough info for user to act on.
s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr)
}

var promTS = make([]prompb.TimeSeries, len(entries))
var i int
for _, promts := range entries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BenchmarkRemoteWrite(b *testing.B) {
time.Unix(0, 0),
)
}
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
for n := 0; n < b.N; n++ {
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
s.SerializeBatch(batch)
Expand Down Expand Up @@ -188,6 +188,7 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Serializer{
Log: &testutil.CaptureLogger{},
SortMetrics: true,
}
data, err := s.Serialize(tt.metric)
Expand All @@ -201,6 +202,83 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
}
}

func TestRemoteWriteSerializeNegative(t *testing.T) {
clog := &testutil.CaptureLogger{}
s := &Serializer{Log: clog}

assert := func(msg string, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

lastMsg := clog.LastError()
if lastMsg == "" {
t.Fatal("expected non-empty last message")
}
if !strings.Contains(lastMsg, msg) {
t.Fatalf("expected to have log message %q; got %q instead", msg, lastMsg)
}
// reset logger so it can be reused again
clog.Clear()
}

m := testutil.MustMetric("@@!!", nil, map[string]interface{}{"!!": "@@"}, time.Unix(0, 0))
_, err := s.Serialize(m)
assert("failed to parse metric name", err)

m = testutil.MustMetric("prometheus", nil,
map[string]interface{}{
"http_requests_total": "asd",
},
time.Unix(0, 0),
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"le": "0.5",
},
map[string]interface{}{
"http_request_duration_seconds_bucket": "asd",
},
time.Unix(0, 0),
telegraf.Histogram,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"code": "400",
"method": "post",
},
map[string]interface{}{
"http_requests_total": 3.0,
"http_requests_errors_total": "3.0",
},
time.Unix(0, 0),
telegraf.Gauge,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01a"},
map[string]interface{}{
"rpc_duration_seconds": 3102.0,
},
time.Unix(0, 0),
telegraf.Summary,
)
_, err = s.Serialize(m)
assert("failed to parse", err)
}

func TestRemoteWriteSerializeBatch(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -679,6 +757,7 @@ rpc_duration_seconds_sum 17560473
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Serializer{
Log: &testutil.CaptureLogger{},
SortMetrics: true,
StringAsLabel: tt.stringAsLabel,
}
Expand Down Expand Up @@ -733,7 +812,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
}

func BenchmarkSerialize(b *testing.B) {
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
metrics := serializers.BenchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -743,7 +822,7 @@ func BenchmarkSerialize(b *testing.B) {
}

func BenchmarkSerializeBatch(b *testing.B) {
s := &Serializer{}
s := &Serializer{Log: &testutil.CaptureLogger{}}
m := serializers.BenchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
Expand Down
Loading