Skip to content

Commit f32b064

Browse files
prydindanielnelson
authored andcommitted
Fix race condition in the Wavefront parser (influxdata#5764)
1 parent f5b44fd commit f32b064

File tree

1 file changed

+46
-23
lines changed

1 file changed

+46
-23
lines changed

plugins/parsers/wavefront/parser.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"log"
88
"strconv"
9+
"sync"
910
"time"
1011

1112
"github.com/influxdata/telegraf"
@@ -22,18 +23,23 @@ type Point struct {
2223
Tags map[string]string
2324
}
2425

25-
// Parser represents a parser.
26+
type WavefrontParser struct {
27+
parsers *sync.Pool
28+
defaultTags map[string]string
29+
}
30+
31+
// PointParser is a thread-unsafe parser and must be kept in a pool.
2632
type PointParser struct {
2733
s *PointScanner
2834
buf struct {
2935
tok []Token // last read n tokens
3036
lit []string // last read n literals
3137
n int // unscanned buffer size (max=2)
3238
}
33-
scanBuf bytes.Buffer // buffer reused for scanning tokens
34-
writeBuf bytes.Buffer // buffer reused for parsing elements
35-
Elements []ElementParser
36-
defaultTags map[string]string
39+
scanBuf bytes.Buffer // buffer reused for scanning tokens
40+
writeBuf bytes.Buffer // buffer reused for parsing elements
41+
Elements []ElementParser
42+
parent *WavefrontParser
3743
}
3844

3945
// Returns a slice of ElementParser's for the Graphite format
@@ -47,9 +53,40 @@ func NewWavefrontElements() []ElementParser {
4753
return elements
4854
}
4955

50-
func NewWavefrontParser(defaultTags map[string]string) *PointParser {
56+
func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser {
57+
wp := &WavefrontParser{defaultTags: defaultTags}
58+
wp.parsers = &sync.Pool{
59+
New: func() interface{} {
60+
return NewPointParser(wp)
61+
},
62+
}
63+
return wp
64+
}
65+
66+
func NewPointParser(parent *WavefrontParser) *PointParser {
5167
elements := NewWavefrontElements()
52-
return &PointParser{Elements: elements, defaultTags: defaultTags}
68+
return &PointParser{Elements: elements, parent: parent}
69+
}
70+
71+
func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
72+
buf := []byte(line)
73+
74+
metrics, err := p.Parse(buf)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
if len(metrics) > 0 {
80+
return metrics[0], nil
81+
}
82+
83+
return nil, nil
84+
}
85+
86+
func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) {
87+
pp := p.parsers.Get().(*PointParser)
88+
defer p.parsers.Put(pp)
89+
return pp.Parse(buf)
5390
}
5491

5592
func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
@@ -91,21 +128,7 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
91128
return metrics, nil
92129
}
93130

94-
func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) {
95-
buf := []byte(line)
96-
metrics, err := p.Parse(buf)
97-
if err != nil {
98-
return nil, err
99-
}
100-
101-
if len(metrics) > 0 {
102-
return metrics[0], nil
103-
}
104-
105-
return nil, nil
106-
}
107-
108-
func (p *PointParser) SetDefaultTags(tags map[string]string) {
131+
func (p *WavefrontParser) SetDefaultTags(tags map[string]string) {
109132
p.defaultTags = tags
110133
}
111134

@@ -119,7 +142,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M
119142
tags[k] = v
120143
}
121144
// apply default tags after parsed tags
122-
for k, v := range p.defaultTags {
145+
for k, v := range p.parent.defaultTags {
123146
tags[k] = v
124147
}
125148

0 commit comments

Comments
 (0)