Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs): Only copy metric if its not filtered out #15883

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,10 +873,10 @@ func (a *Agent) runOutputs(

for metric := range unit.src {
for i, output := range unit.outputs {
if i == len(a.Config.Outputs)-1 {
output.AddMetric(metric)
if i == len(unit.outputs)-1 {
output.AddMetricNoCopy(metric)
} else {
output.AddMetric(metric.Copy())
output.AddMetric(metric)
}
}
}
Expand Down
20 changes: 19 additions & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,22 @@ func (r *RunningOutput) Close() {
}

// AddMetric adds a metric to the output.
// Takes ownership of metric
// The given metric will be copied if the output selects the metric.
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
ok, err := r.Config.Filter.Select(metric)
if err != nil {
r.log.Errorf("filtering failed: %v", err)
} else if !ok {
r.MetricsFiltered.Incr(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just one minor nitpick: is there any reason this isn't calling r.metricFiltered(metric) like the similar functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that function also calls Drop on the metric, which we should not do if we haven't copied it yet, since we haven't taken ownership of it until then.

return
}

r.add(metric.Copy())
}

// AddMetricNoCopy adds a metric to the output.
// Takes ownership of metric regardless of whether the output selects it for outputting.
func (r *RunningOutput) AddMetricNoCopy(metric telegraf.Metric) {
ok, err := r.Config.Filter.Select(metric)
if err != nil {
r.log.Errorf("filtering failed: %v", err)
Expand All @@ -217,6 +231,10 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
return
}

r.add(metric)
}

func (r *RunningOutput) add(metric telegraf.Metric) {
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
r.metricFiltered(metric)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestServeHTTP(t *testing.T) {
for m := range d {
ro.AddMetric(m)
ro.Write() //nolint:errcheck // test will fail anyway if the write fails
m.Accept()
}
}(dst)

Expand Down
Loading