Skip to content

Commit 4e3244c

Browse files
authored
Fix aggregator window and shutdown of multiple aggregators (influxdata#5644)
1 parent 3045ffb commit 4e3244c

File tree

10 files changed

+219
-94
lines changed

10 files changed

+219
-94
lines changed

accumulator.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ type Accumulator interface {
4141
// AddMetric adds an metric to the accumulator.
4242
AddMetric(Metric)
4343

44-
// SetPrecision takes two time.Duration objects. If the first is non-zero,
45-
// it sets that as the precision. Otherwise, it takes the second argument
46-
// as the order of time that the metrics should be rounded to, with the
47-
// maximum being 1s.
48-
SetPrecision(precision, interval time.Duration)
44+
// SetPrecision sets the timestamp rounding precision. All metrics addeds
45+
// added to the accumulator will have their timestamp rounded to the
46+
// nearest multiple of precision.
47+
SetPrecision(precision time.Duration)
4948

5049
// Report an error.
5150
AddError(err error)

agent/accumulator.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,21 +114,8 @@ func (ac *accumulator) AddError(err error) {
114114
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
115115
}
116116

117-
func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
118-
if precision > 0 {
119-
ac.precision = precision
120-
return
121-
}
122-
switch {
123-
case interval >= time.Second:
124-
ac.precision = time.Second
125-
case interval >= time.Millisecond:
126-
ac.precision = time.Millisecond
127-
case interval >= time.Microsecond:
128-
ac.precision = time.Microsecond
129-
default:
130-
ac.precision = time.Nanosecond
131-
}
117+
func (ac *accumulator) SetPrecision(precision time.Duration) {
118+
ac.precision = precision
132119
}
133120

134121
func (ac *accumulator) getTime(t []time.Time) time.Time {

agent/accumulator_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ func TestSetPrecision(t *testing.T) {
7474
name string
7575
unset bool
7676
precision time.Duration
77-
interval time.Duration
7877
timestamp time.Time
7978
expected time.Time
8079
}{
@@ -86,13 +85,13 @@ func TestSetPrecision(t *testing.T) {
8685
},
8786
{
8887
name: "second interval",
89-
interval: time.Second,
88+
precision: time.Second,
9089
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
9190
expected: time.Date(2006, time.February, 10, 12, 0, 0, 0, time.UTC),
9291
},
9392
{
9493
name: "microsecond interval",
95-
interval: time.Microsecond,
94+
precision: time.Microsecond,
9695
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
9796
expected: time.Date(2006, time.February, 10, 12, 0, 0, 82913000, time.UTC),
9897
},
@@ -109,7 +108,7 @@ func TestSetPrecision(t *testing.T) {
109108

110109
a := NewAccumulator(&TestMetricMaker{}, metrics)
111110
if !tt.unset {
112-
a.SetPrecision(tt.precision, tt.interval)
111+
a.SetPrecision(tt.precision)
113112
}
114113

115114
a.AddFields("acctest",

agent/agent.go

Lines changed: 86 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,15 @@ func (a *Agent) Test(ctx context.Context) error {
180180
}
181181

182182
acc := NewAccumulator(input, metricC)
183-
acc.SetPrecision(a.Config.Agent.Precision.Duration,
184-
a.Config.Agent.Interval.Duration)
183+
acc.SetPrecision(a.Precision())
185184
input.SetDefaultTags(a.Config.Tags)
186185

187186
// Special instructions for some inputs. cpu, for example, needs to be
188187
// run twice in order to return cpu usage percentages.
189188
switch input.Name() {
190189
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
191190
nulAcc := NewAccumulator(input, nulC)
192-
nulAcc.SetPrecision(a.Config.Agent.Precision.Duration,
193-
a.Config.Agent.Interval.Duration)
191+
nulAcc.SetPrecision(a.Precision())
194192
if err := input.Input.Gather(nulAcc); err != nil {
195193
return err
196194
}
@@ -222,7 +220,6 @@ func (a *Agent) runInputs(
222220
var wg sync.WaitGroup
223221
for _, input := range a.Config.Inputs {
224222
interval := a.Config.Agent.Interval.Duration
225-
precision := a.Config.Agent.Precision.Duration
226223
jitter := a.Config.Agent.CollectionJitter.Duration
227224

228225
// Overwrite agent interval if this plugin has its own.
@@ -231,7 +228,7 @@ func (a *Agent) runInputs(
231228
}
232229

233230
acc := NewAccumulator(input, dst)
234-
acc.SetPrecision(precision, interval)
231+
acc.SetPrecision(a.Precision())
235232

236233
wg.Add(1)
237234
go func(input *models.RunningInput) {
@@ -339,17 +336,41 @@ func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric {
339336
return metrics
340337
}
341338

342-
// runAggregators triggers the periodic push for Aggregators.
339+
func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) {
340+
var until time.Time
341+
if roundInterval {
342+
until = internal.AlignTime(start, period)
343+
if until == start {
344+
until = internal.AlignTime(start.Add(time.Nanosecond), period)
345+
}
346+
} else {
347+
until = start.Add(period)
348+
}
349+
350+
since := until.Add(-period)
351+
352+
return since, until
353+
}
354+
355+
// runAggregators adds metrics to the aggregators and triggers their periodic
356+
// push call.
343357
//
344-
// When the context is done a final push will occur and then this function
345-
// will return.
358+
// Runs until src is closed and all metrics have been processed. Will call
359+
// push one final time before returning.
346360
func (a *Agent) runAggregators(
347361
startTime time.Time,
348362
src <-chan telegraf.Metric,
349363
dst chan<- telegraf.Metric,
350364
) error {
351365
ctx, cancel := context.WithCancel(context.Background())
352366

367+
// Before calling Add, initialize the aggregation window. This ensures
368+
// that any metric created after start time will be aggregated.
369+
for _, agg := range a.Config.Aggregators {
370+
since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period())
371+
agg.UpdateWindow(since, until)
372+
}
373+
353374
var wg sync.WaitGroup
354375
wg.Add(1)
355376
go func() {
@@ -371,73 +392,72 @@ func (a *Agent) runAggregators(
371392
cancel()
372393
}()
373394

374-
precision := a.Config.Agent.Precision.Duration
375-
interval := a.Config.Agent.Interval.Duration
376395
aggregations := make(chan telegraf.Metric, 100)
377-
for _, agg := range a.Config.Aggregators {
378-
wg.Add(1)
379-
go func(agg *models.RunningAggregator) {
380-
defer func() {
381-
wg.Done()
382-
close(aggregations)
383-
}()
384-
385-
if a.Config.Agent.RoundInterval {
386-
// Aggregators are aligned to the agent interval regardless of
387-
// their period.
388-
err := internal.SleepContext(ctx, internal.AlignDuration(startTime, interval))
389-
if err != nil {
390-
return
391-
}
392-
}
396+
wg.Add(1)
397+
go func() {
398+
defer wg.Done()
393399

394-
agg.SetPeriodStart(startTime)
400+
var aggWg sync.WaitGroup
401+
for _, agg := range a.Config.Aggregators {
402+
aggWg.Add(1)
403+
go func(agg *models.RunningAggregator) {
404+
defer aggWg.Done()
405+
406+
acc := NewAccumulator(agg, aggregations)
407+
acc.SetPrecision(a.Precision())
408+
fmt.Println(1)
409+
a.push(ctx, agg, acc)
410+
fmt.Println(2)
411+
}(agg)
412+
}
395413

396-
acc := NewAccumulator(agg, aggregations)
397-
acc.SetPrecision(precision, interval)
398-
a.push(ctx, agg, acc)
399-
}(agg)
400-
}
414+
aggWg.Wait()
415+
fmt.Println(3)
416+
close(aggregations)
417+
}()
401418

402419
for metric := range aggregations {
403420
metrics := a.applyProcessors(metric)
404421
for _, metric := range metrics {
405422
dst <- metric
406423
}
407424
}
425+
fmt.Println(4)
408426

409427
wg.Wait()
428+
fmt.Println(5)
410429
return nil
411430
}
412431

413-
// push runs the push for a single aggregator every period. More simple than
414-
// the output/input version as timeout should be less likely.... not really
415-
// because the output channel can block for now.
432+
// push runs the push for a single aggregator every period.
416433
func (a *Agent) push(
417434
ctx context.Context,
418435
aggregator *models.RunningAggregator,
419436
acc telegraf.Accumulator,
420437
) {
421-
ticker := time.NewTicker(aggregator.Period())
422-
defer ticker.Stop()
423-
424438
for {
439+
// Ensures that Push will be called for each period, even if it has
440+
// already elapsed before this function is called. This is guaranteed
441+
// because so long as only Push updates the EndPeriod. This method
442+
// also avoids drift by not using a ticker.
443+
until := time.Until(aggregator.EndPeriod())
444+
425445
select {
426-
case <-ticker.C:
446+
case <-time.After(until):
447+
aggregator.Push(acc)
427448
break
428449
case <-ctx.Done():
429450
aggregator.Push(acc)
430451
return
431452
}
432-
433-
aggregator.Push(acc)
434453
}
435454
}
436455

437456
// runOutputs triggers the periodic write for Outputs.
438457
//
439-
// When the context is done, outputs continue to run until their buffer is
440-
// closed, afterwich they run flush once more.
458+
459+
// Runs until src is closed and all metrics have been processed. Will call
460+
// Write one final time before returning.
441461
func (a *Agent) runOutputs(
442462
startTime time.Time,
443463
src <-chan telegraf.Metric,
@@ -608,7 +628,7 @@ func (a *Agent) startServiceInputs(
608628
// Gather() accumulator does apply rounding according to the
609629
// precision agent setting.
610630
acc := NewAccumulator(input, dst)
611-
acc.SetPrecision(time.Nanosecond, 0)
631+
acc.SetPrecision(time.Nanosecond)
612632

613633
err := si.Start(acc)
614634
if err != nil {
@@ -638,6 +658,27 @@ func (a *Agent) stopServiceInputs() {
638658
}
639659
}
640660

661+
// Returns the rounding precision for metrics.
662+
func (a *Agent) Precision() time.Duration {
663+
precision := a.Config.Agent.Precision.Duration
664+
interval := a.Config.Agent.Interval.Duration
665+
666+
if precision > 0 {
667+
return precision
668+
}
669+
670+
switch {
671+
case interval >= time.Second:
672+
return time.Second
673+
case interval >= time.Millisecond:
674+
return time.Millisecond
675+
case interval >= time.Microsecond:
676+
return time.Microsecond
677+
default:
678+
return time.Nanosecond
679+
}
680+
}
681+
641682
// panicRecover displays an error if an input panics.
642683
func panicRecover(input *models.RunningInput) {
643684
if err := recover(); err != nil {

agent/agent_test.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ package agent
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/influxdata/telegraf/internal/config"
7-
8-
// needing to load the plugins
98
_ "github.com/influxdata/telegraf/plugins/inputs/all"
10-
// needing to load the outputs
119
_ "github.com/influxdata/telegraf/plugins/outputs/all"
12-
1310
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1412
)
1513

1614
func TestAgent_OmitHostname(t *testing.T) {
@@ -109,3 +107,62 @@ func TestAgent_LoadOutput(t *testing.T) {
109107
a, _ = NewAgent(c)
110108
assert.Equal(t, 3, len(a.Config.Outputs))
111109
}
110+
111+
func TestWindow(t *testing.T) {
112+
parse := func(s string) time.Time {
113+
tm, err := time.Parse(time.RFC3339, s)
114+
if err != nil {
115+
panic(err)
116+
}
117+
return tm
118+
}
119+
120+
tests := []struct {
121+
name string
122+
start time.Time
123+
roundInterval bool
124+
period time.Duration
125+
since time.Time
126+
until time.Time
127+
}{
128+
{
129+
name: "round with exact alignment",
130+
start: parse("2018-03-27T00:00:00Z"),
131+
roundInterval: true,
132+
period: 30 * time.Second,
133+
since: parse("2018-03-27T00:00:00Z"),
134+
until: parse("2018-03-27T00:00:30Z"),
135+
},
136+
{
137+
name: "round with alignment needed",
138+
start: parse("2018-03-27T00:00:05Z"),
139+
roundInterval: true,
140+
period: 30 * time.Second,
141+
since: parse("2018-03-27T00:00:00Z"),
142+
until: parse("2018-03-27T00:00:30Z"),
143+
},
144+
{
145+
name: "no round with exact alignment",
146+
start: parse("2018-03-27T00:00:00Z"),
147+
roundInterval: false,
148+
period: 30 * time.Second,
149+
since: parse("2018-03-27T00:00:00Z"),
150+
until: parse("2018-03-27T00:00:30Z"),
151+
},
152+
{
153+
name: "no found with alignment needed",
154+
start: parse("2018-03-27T00:00:05Z"),
155+
roundInterval: false,
156+
period: 30 * time.Second,
157+
since: parse("2018-03-27T00:00:05Z"),
158+
until: parse("2018-03-27T00:00:35Z"),
159+
},
160+
}
161+
for _, tt := range tests {
162+
t.Run(tt.name, func(t *testing.T) {
163+
since, until := updateWindow(tt.start, tt.roundInterval, tt.period)
164+
require.Equal(t, tt.since, since, "since")
165+
require.Equal(t, tt.until, until, "until")
166+
})
167+
}
168+
}

internal/internal.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,13 @@ func SleepContext(ctx context.Context, duration time.Duration) error {
288288
}
289289

290290
// AlignDuration returns the duration until next aligned interval.
291+
// If the current time is aligned a 0 duration is returned.
291292
func AlignDuration(tm time.Time, interval time.Duration) time.Duration {
292293
return AlignTime(tm, interval).Sub(tm)
293294
}
294295

295296
// AlignTime returns the time of the next aligned interval.
297+
// If the current time is aligned the current time is returned.
296298
func AlignTime(tm time.Time, interval time.Duration) time.Time {
297299
truncated := tm.Truncate(interval)
298300
if truncated == tm {

0 commit comments

Comments
 (0)