Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serializers.prometheusremotewrite): Log metric conversion errors #15893

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions plugins/serializers/prometheusremotewrite/prometheusremotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
type MetricKey uint64

type Serializer struct {
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
SortMetrics bool `toml:"prometheus_sort_metrics"`
StringAsLabel bool `toml:"prometheus_string_as_label"`
Log telegraf.Logger `toml:"-"`
}

func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -30,6 +31,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var buf bytes.Buffer
var lastErr error

var entries = make(map[MetricKey]prompb.TimeSeries)
var labels = make([]prompb.Label, 0)
Expand All @@ -41,6 +43,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
metricName, ok := prometheus.SanitizeMetricName(metricName)
if !ok {
lastErr = fmt.Errorf("failed to parse metric name %q", metricName)
continue
}

Expand All @@ -52,6 +55,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case telegraf.Untyped:
value, ok := prometheus.SampleValue(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
Expand All @@ -78,14 +82,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

le, ok := metric.GetTag("le")
if !ok {
lastErr = fmt.Errorf("failed to parse %q: can't find `le` label", metricName)
continue
}
bound, err := strconv.ParseFloat(le, 64)
if err != nil {
lastErr = fmt.Errorf("failed to parse %q: can't parse %q value: %w", metricName, le, err)
continue
}
count, ok := prometheus.SampleCount(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -97,13 +104,15 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -119,35 +128,41 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
lastErr = fmt.Errorf("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key)
continue
}
case telegraf.Summary:
switch {
case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_sum", labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
quantileTag, ok := metric.GetTag("quantile")
if !ok {
lastErr = fmt.Errorf("failed to parse %q: can't find `quantile` label", metricName)
continue
}
quantile, err := strconv.ParseFloat(quantileTag, 64)
if err != nil {
lastErr = fmt.Errorf("failed to parse %q: can't parse %q value: %w", metricName, quantileTag, err)
continue
}
value, ok := prometheus.SampleValue(field.Value)
if !ok {
lastErr = fmt.Errorf("failed to parse %q: bad sample value %#v", metricName, field.Value)
continue
}

Expand All @@ -162,18 +177,24 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
}

// A batch of metrics can contain multiple values for a single
// Prometheus sample. If this metric is older than the existing
// Prometheus sample. If this metric is older than the existing
// sample then we can skip over it.
m, ok := entries[metrickey]
if ok {
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
lastErr = fmt.Errorf("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time())
continue
}
}
entries[metrickey] = promts
}
}

if lastErr != nil && s.Log != nil {
// log only the last recorded error, as it could be too verbose to log every one
s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr)
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved

var promTS = make([]prompb.TimeSeries, len(entries))
var i int
for _, promts := range entries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -201,6 +202,98 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
}
}

// fakeLogger immitates telegraf.Logger but preserves
// the last recorded message.
type fakeLogger struct {
telegraf.Logger
lastMsg string
}

// Errorf overrides telegraf.Logger method to store the message in lastMsg.
// lastMsg can be then used for testing the output
func (fl *fakeLogger) Errorf(format string, args ...interface{}) {
fl.lastMsg = fmt.Sprintf(format, args...)
}

func (fl *fakeLogger) has(msg string) bool {
return strings.Contains(fl.lastMsg, msg)
}

srebhan marked this conversation as resolved.
Show resolved Hide resolved
func TestRemoteWriteSerializeNegative(t *testing.T) {
log := &fakeLogger{Logger: logger.New("", "", "")}
s := &Serializer{Log: log}

assert := func(msg string, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if log.lastMsg == "" {
t.Fatal("expected non-empty last message")
}
if !log.has(msg) {
t.Fatalf("expected to have log message %q; got %q instead", msg, log.lastMsg)
}
// reset log message, so logger can be reused again
log.lastMsg = ""
}

m := testutil.MustMetric("@@!!", nil, map[string]interface{}{"!!": "@@"}, time.Unix(0, 0))
_, err := s.Serialize(m)
assert("failed to parse metric name", err)

m = testutil.MustMetric("prometheus", nil,
map[string]interface{}{
"http_requests_total": "asd",
},
time.Unix(0, 0),
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"le": "0.5",
},
map[string]interface{}{
"http_request_duration_seconds_bucket": "asd",
},
time.Unix(0, 0),
telegraf.Histogram,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{
"code": "400",
"method": "post",
},
map[string]interface{}{
"http_requests_total": 3.0,
"http_requests_errors_total": "3.0",
},
time.Unix(0, 0),
telegraf.Gauge,
)
_, err = s.Serialize(m)
assert("bad sample", err)

m = testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01a"},
map[string]interface{}{
"rpc_duration_seconds": 3102.0,
},
time.Unix(0, 0),
telegraf.Summary,
)
_, err = s.Serialize(m)
assert("failed to parse", err)
}

func TestRemoteWriteSerializeBatch(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading