Skip to content

Commit 1ea7863

Browse files
authored
Add pivot and unpivot processors (influxdata#5991)
1 parent 7f04511 commit 1ea7863

File tree

8 files changed

+386
-2
lines changed

8 files changed

+386
-2
lines changed

metric/metric.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,11 @@ func (m *metric) Copy() telegraf.Metric {
240240
}
241241

242242
for i, tag := range m.tags {
243-
m2.tags[i] = tag
243+
m2.tags[i] = &telegraf.Tag{Key: tag.Key, Value: tag.Value}
244244
}
245245

246246
for i, field := range m.fields {
247-
m2.fields[i] = field
247+
m2.fields[i] = &telegraf.Field{Key: field.Key, Value: field.Value}
248248
}
249249
return m2
250250
}

plugins/processors/all/all.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
_ "github.com/influxdata/telegraf/plugins/processors/enum"
77
_ "github.com/influxdata/telegraf/plugins/processors/override"
88
_ "github.com/influxdata/telegraf/plugins/processors/parser"
9+
_ "github.com/influxdata/telegraf/plugins/processors/pivot"
910
_ "github.com/influxdata/telegraf/plugins/processors/printer"
1011
_ "github.com/influxdata/telegraf/plugins/processors/regex"
1112
_ "github.com/influxdata/telegraf/plugins/processors/rename"
1213
_ "github.com/influxdata/telegraf/plugins/processors/strings"
1314
_ "github.com/influxdata/telegraf/plugins/processors/topk"
15+
_ "github.com/influxdata/telegraf/plugins/processors/unpivot"
1416
)

plugins/processors/pivot/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Pivot Processor
2+
3+
You can use the `pivot` processor to rotate single valued metrics into a multi
4+
field metric. This transformation often results in data that is more easily
5+
to apply mathematical operators and comparisons between, and flatten into a
6+
more compact representation for write operations with some output data
7+
formats.
8+
9+
To perform the reverse operation use the [unpivot] processor.
10+
11+
### Configuration
12+
13+
```toml
14+
[[processors.pivot]]
15+
## Tag to use for naming the new field.
16+
tag_key = "name"
17+
## Field to use as the value of the new field.
18+
value_key = "value"
19+
```
20+
21+
### Example
22+
23+
```diff
24+
- cpu,cpu=cpu0,name=time_idle value=42i
25+
- cpu,cpu=cpu0,name=time_user value=43i
26+
+ cpu,cpu=cpu0 time_idle=42i
27+
+ cpu,cpu=cpu0 time_user=42i
28+
```
29+
30+
[unpivot]: /plugins/processors/unpivot/README.md

plugins/processors/pivot/pivot.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package pivot
2+
3+
import (
4+
"github.com/influxdata/telegraf"
5+
"github.com/influxdata/telegraf/plugins/processors"
6+
)
7+
8+
const (
9+
description = "Rotate a single valued metric into a multi field metric"
10+
sampleConfig = `
11+
## Tag to use for naming the new field.
12+
tag_key = "name"
13+
## Field to use as the value of the new field.
14+
value_key = "value"
15+
`
16+
)
17+
18+
type Pivot struct {
19+
TagKey string `toml:"tag_key"`
20+
ValueKey string `toml:"value_key"`
21+
}
22+
23+
func (p *Pivot) SampleConfig() string {
24+
return sampleConfig
25+
}
26+
27+
func (p *Pivot) Description() string {
28+
return description
29+
}
30+
31+
func (p *Pivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
32+
for _, m := range metrics {
33+
key, ok := m.GetTag(p.TagKey)
34+
if !ok {
35+
continue
36+
}
37+
38+
value, ok := m.GetField(p.ValueKey)
39+
if !ok {
40+
continue
41+
}
42+
43+
m.RemoveTag(p.TagKey)
44+
m.RemoveField(p.ValueKey)
45+
m.AddField(key, value)
46+
}
47+
return metrics
48+
}
49+
50+
func init() {
51+
processors.Add("pivot", func() telegraf.Processor {
52+
return &Pivot{}
53+
})
54+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package pivot
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/influxdata/telegraf"
8+
"github.com/influxdata/telegraf/testutil"
9+
)
10+
11+
func TestPivot(t *testing.T) {
12+
now := time.Now()
13+
tests := []struct {
14+
name string
15+
pivot *Pivot
16+
metrics []telegraf.Metric
17+
expected []telegraf.Metric
18+
}{
19+
{
20+
name: "simple",
21+
pivot: &Pivot{
22+
TagKey: "name",
23+
ValueKey: "value",
24+
},
25+
metrics: []telegraf.Metric{
26+
testutil.MustMetric("cpu",
27+
map[string]string{
28+
"name": "idle_time",
29+
},
30+
map[string]interface{}{
31+
"value": int64(42),
32+
},
33+
now,
34+
),
35+
},
36+
expected: []telegraf.Metric{
37+
testutil.MustMetric("cpu",
38+
map[string]string{},
39+
map[string]interface{}{
40+
"idle_time": int64(42),
41+
},
42+
now,
43+
),
44+
},
45+
},
46+
{
47+
name: "missing tag",
48+
pivot: &Pivot{
49+
TagKey: "name",
50+
ValueKey: "value",
51+
},
52+
metrics: []telegraf.Metric{
53+
testutil.MustMetric("cpu",
54+
map[string]string{
55+
"foo": "idle_time",
56+
},
57+
map[string]interface{}{
58+
"value": int64(42),
59+
},
60+
now,
61+
),
62+
},
63+
expected: []telegraf.Metric{
64+
testutil.MustMetric("cpu",
65+
map[string]string{
66+
"foo": "idle_time",
67+
},
68+
map[string]interface{}{
69+
"value": int64(42),
70+
},
71+
now,
72+
),
73+
},
74+
},
75+
{
76+
name: "missing field",
77+
pivot: &Pivot{
78+
TagKey: "name",
79+
ValueKey: "value",
80+
},
81+
metrics: []telegraf.Metric{
82+
testutil.MustMetric("cpu",
83+
map[string]string{
84+
"name": "idle_time",
85+
},
86+
map[string]interface{}{
87+
"foo": int64(42),
88+
},
89+
now,
90+
),
91+
},
92+
expected: []telegraf.Metric{
93+
testutil.MustMetric("cpu",
94+
map[string]string{
95+
"name": "idle_time",
96+
},
97+
map[string]interface{}{
98+
"foo": int64(42),
99+
},
100+
now,
101+
),
102+
},
103+
},
104+
}
105+
for _, tt := range tests {
106+
t.Run(tt.name, func(t *testing.T) {
107+
actual := tt.pivot.Apply(tt.metrics...)
108+
testutil.RequireMetricsEqual(t, tt.expected, actual)
109+
})
110+
}
111+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Unpivot Processor
2+
3+
You can use the `unpivot` processor to rotate a multi field series into single valued metrics. This transformation often results in data that is more easy to aggregate across fields.
4+
5+
To perform the reverse operation use the [pivot] processor.
6+
7+
### Configuration
8+
9+
```toml
10+
[[processors.unpivot]]
11+
## Tag to use for the name.
12+
tag_key = "name"
13+
## Field to use for the name of the value.
14+
value_key = "value"
15+
```
16+
17+
### Example
18+
19+
```diff
20+
- cpu,cpu=cpu0 time_idle=42i,time_user=43i
21+
+ cpu,cpu=cpu0,name=time_idle value=42i
22+
+ cpu,cpu=cpu0,name=time_user value=43i
23+
```
24+
25+
[pivot]: /plugins/processors/pivot/README.md
26+
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package unpivot
2+
3+
import (
4+
"github.com/influxdata/telegraf"
5+
"github.com/influxdata/telegraf/plugins/processors"
6+
)
7+
8+
const (
9+
description = "Rotate multi field metric into several single field metrics"
10+
sampleConfig = `
11+
## Tag to use for the name.
12+
tag_key = "name"
13+
## Field to use for the name of the value.
14+
value_key = "value"
15+
`
16+
)
17+
18+
type Unpivot struct {
19+
TagKey string `toml:"tag_key"`
20+
ValueKey string `toml:"value_key"`
21+
}
22+
23+
func (p *Unpivot) SampleConfig() string {
24+
return sampleConfig
25+
}
26+
27+
func (p *Unpivot) Description() string {
28+
return description
29+
}
30+
31+
func copyWithoutFields(metric telegraf.Metric) telegraf.Metric {
32+
m := metric.Copy()
33+
34+
fieldKeys := make([]string, 0, len(m.FieldList()))
35+
for _, field := range m.FieldList() {
36+
fieldKeys = append(fieldKeys, field.Key)
37+
}
38+
39+
for _, fk := range fieldKeys {
40+
m.RemoveField(fk)
41+
}
42+
43+
return m
44+
}
45+
46+
func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
47+
fieldCount := 0
48+
for _, m := range metrics {
49+
fieldCount += len(m.FieldList())
50+
}
51+
52+
results := make([]telegraf.Metric, 0, fieldCount)
53+
54+
for _, m := range metrics {
55+
base := copyWithoutFields(m)
56+
for _, field := range m.FieldList() {
57+
newMetric := base.Copy()
58+
newMetric.AddField(p.ValueKey, field.Value)
59+
newMetric.AddTag(p.TagKey, field.Key)
60+
results = append(results, newMetric)
61+
}
62+
m.Accept()
63+
}
64+
return results
65+
}
66+
67+
func init() {
68+
processors.Add("unpivot", func() telegraf.Processor {
69+
return &Unpivot{}
70+
})
71+
}

0 commit comments

Comments
 (0)