Skip to content

Commit 5459c0f

Browse files
author
Pavlo Sumkin
committed
GROUNDWORK-1690 Organize data processing
1 parent 67ea486 commit 5459c0f

File tree

6 files changed

+102
-85
lines changed

6 files changed

+102
-85
lines changed

connectors/checker-connector/main.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,31 +98,31 @@ func taskHandler(task ScheduleTask) func() {
9898
cmd := exec.Command(task.Command[0], task.Command[1:]...)
9999
cmd.Env = task.Environment
100100
var (
101-
handler func() ([]byte, error)
102-
err error
103-
res []byte
104-
span, spanN services.TraceSpan
105-
ctx, ctxN context.Context
101+
handler func() ([]byte, error)
102+
err error
103+
res []byte
104+
105+
monitoredResources *[]transit.DynamicMonitoredResource
106106
)
107107
if task.CombinedOutput {
108108
handler = cmd.CombinedOutput
109109
} else {
110110
handler = cmd.Output
111111
}
112112

113-
ctx, span = services.StartTraceSpan(context.Background(), "connectors", "taskHandler")
113+
ctx, span := services.StartTraceSpan(context.Background(), "connectors", "taskHandler")
114114
defer func() {
115115
services.EndTraceSpan(span,
116116
services.TraceAttrError(err),
117117
services.TraceAttrPayloadLen(res),
118118
services.TraceAttrString("task", task.String()),
119119
)
120120
}()
121-
_, spanN = services.StartTraceSpan(ctx, "connectors", "command")
122121

122+
_, span2 := services.StartTraceSpan(ctx, "connectors", "command")
123123
res, err = handler()
124124

125-
services.EndTraceSpan(spanN,
125+
services.EndTraceSpan(span2,
126126
services.TraceAttrError(err),
127127
services.TraceAttrPayloadLen(res),
128128
services.TraceAttrArray("command", task.Command),
@@ -140,18 +140,26 @@ func taskHandler(task ScheduleTask) func() {
140140
Bytes("res", res).
141141
Msg("task done")
142142

143-
ctxN, spanN = services.StartTraceSpan(ctx, "connectors", "processMetrics")
143+
_, span3 := services.StartTraceSpan(ctx, "connectors", "parse")
144+
monitoredResources, err = parser.Parse(res, task.DataFormat)
145+
146+
services.EndTraceSpan(span3,
147+
services.TraceAttrError(err),
148+
services.TraceAttrPayloadLen(res),
149+
)
144150

145-
if _, err = parser.ProcessMetrics(ctxN, res, task.DataFormat); err != nil {
151+
if err != nil {
146152
log.Warn().Err(err).
147153
Interface("task", task).
148154
Bytes("res", res).
149-
Msg("could not process metrics")
155+
Msg("could not parse metrics")
156+
return
157+
}
158+
if err = connectors.SendMetrics(ctx, *monitoredResources, nil); err != nil {
159+
log.Warn().Err(err).
160+
Interface("task", task).
161+
Bytes("res", res).
162+
Msg("could not send metrics")
150163
}
151-
152-
services.EndTraceSpan(spanN,
153-
services.TraceAttrError(err),
154-
services.TraceAttrPayloadLen(res),
155-
)
156164
}
157165
}

connectors/nsca-connector/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func main() {
4040

4141
ctx, cancel := context.WithCancel(context.Background())
4242
nscaCancel = cancel
43-
nsca.Start(ctx)
43+
nsca.Start(ctx, makeNSCAHandler())
4444

4545
/* return on quit signal */
4646
<-transitService.Quit()

connectors/nsca-connector/nsca/nsca.go

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,53 +13,54 @@ import (
1313
"strings"
1414
"time"
1515

16-
"github.com/gwos/tcg/connectors/nsca-connector/parser"
1716
"github.com/rs/zerolog"
1817
"github.com/rs/zerolog/log"
1918
"github.com/tubemogul/nscatools"
2019
)
2120

22-
func Handler(p *DataPacketExt) error {
21+
func AdaptHandler(h func([]byte) error) DataHandler {
2322
// It's unclear for now how to process multi-line metrics in the right way.
2423
// The nscatools.DataPacket provides host, service, and state info for the 1st line only.
2524
// And PluginOutput contains the rest.
2625
// The `fullPacket` payload that processed in the HandleClient() by nscatools.DataPacket.Read()
2726
// contains a lot of noise and looks like protocol defined structure.
2827
// So, here we try to reconstruct plain metrics payload and process it with own parser.
29-
s := strings.Replace(p.PluginOutput, `\n`, "\n", -1)
30-
buf := make([]byte, 0, 4+len(p.HostName)+len(p.Service)+len(s))
31-
buf = append(buf, p.HostName...)
32-
buf = append(buf, ';')
33-
buf = append(buf, p.Service...)
34-
buf = append(buf, ';')
35-
buf = strconv.AppendInt(buf, int64(p.State), 10)
36-
buf = append(buf, ';')
37-
buf = append(buf, s...)
38-
log.Debug().
39-
Int16("version", p.Version).
40-
Uint32("crc", p.Crc).
41-
Uint32("timestamp", p.Timestamp).
42-
Int16("state", p.State).
43-
Str("hostname", p.HostName).
44-
Str("service", p.Service).
45-
Str("pluginOutput", p.PluginOutput).
46-
Bytes("buf", buf).
47-
Func(func(e *zerolog.Event) {
48-
println("# NSCA plain packet #")
49-
println(string(buf))
50-
}).
51-
Msg("processing DataPacket")
52-
53-
_, err := parser.ProcessMetrics(context.Background(), buf, parser.NSCA)
54-
if err != nil {
55-
log.Warn().Err(err).
28+
return func(p *DataPacketExt) error {
29+
s := strings.Replace(p.PluginOutput, `\n`, "\n", -1)
30+
buf := make([]byte, 0, 4+len(p.HostName)+len(p.Service)+len(s))
31+
buf = append(buf, p.HostName...)
32+
buf = append(buf, ';')
33+
buf = append(buf, p.Service...)
34+
buf = append(buf, ';')
35+
buf = strconv.AppendInt(buf, int64(p.State), 10)
36+
buf = append(buf, ';')
37+
buf = append(buf, s...)
38+
log.Debug().
39+
Int16("version", p.Version).
40+
Uint32("crc", p.Crc).
41+
Uint32("timestamp", p.Timestamp).
42+
Int16("state", p.State).
43+
Str("hostname", p.HostName).
44+
Str("service", p.Service).
45+
Str("pluginOutput", p.PluginOutput).
5646
Bytes("buf", buf).
57-
Msg("could not process metrics")
47+
Func(func(e *zerolog.Event) {
48+
println("# NSCA plain packet #")
49+
println(string(buf))
50+
}).
51+
Msg("processing DataPacket")
52+
53+
err := h(buf)
54+
if err != nil {
55+
log.Warn().Err(err).
56+
Bytes("payload", buf).
57+
Msg("could not process incoming data")
58+
}
59+
return nil
5860
}
59-
return err
6061
}
6162

62-
func Start(ctx context.Context) {
63+
func Start(ctx context.Context, handler DataHandler) {
6364
nscaHost := "0.0.0.0"
6465
nscaPort := uint16(5667)
6566
nscaEncrypt := nscatools.EncryptNone
@@ -86,7 +87,7 @@ func Start(ctx context.Context) {
8687
}
8788

8889
go StartServerWithContext(ctx,
89-
NewConfigExt(nscaHost, nscaPort, nscaEncrypt, nscaPassword, Handler))
90+
NewConfigExt(nscaHost, nscaPort, nscaEncrypt, nscaPassword, handler))
9091
}
9192

9293
func StartServerWithContext(ctx context.Context, conf *ConfigExt) error {
@@ -156,14 +157,14 @@ func HandleClientExt(conf *ConfigExt, conn net.Conn) error {
156157
return err
157158
}
158159

159-
type dataHandler func(*DataPacketExt) error
160+
type DataHandler func(*DataPacketExt) error
160161

161162
type ConfigExt struct {
162163
nscatools.Config
163-
PacketHandler dataHandler
164+
PacketHandler DataHandler
164165
}
165166

166-
func NewConfigExt(host string, port uint16, encryption int, password string, handler dataHandler) *ConfigExt {
167+
func NewConfigExt(host string, port uint16, encryption int, password string, handler DataHandler) *ConfigExt {
167168
c := nscatools.NewConfig(host, port, encryption, password, nil)
168169
cfg := ConfigExt{*c, handler}
169170
return &cfg

connectors/nsca-connector/nscaConnector.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"net/http"
77

88
"github.com/gin-gonic/gin"
9+
"github.com/gwos/tcg/connectors"
10+
"github.com/gwos/tcg/connectors/nsca-connector/nsca"
911
"github.com/gwos/tcg/connectors/nsca-connector/parser"
1012
"github.com/gwos/tcg/services"
1113
"github.com/rs/zerolog/log"
@@ -46,7 +48,7 @@ func makeEntrypointHandler(dataFormat parser.DataFormat) func(*gin.Context) {
4648
c.JSON(http.StatusBadRequest, err.Error())
4749
return
4850
}
49-
if _, err = parser.ProcessMetrics(ctx, payload, dataFormat); err != nil {
51+
if err = processData(ctx, payload, dataFormat); err != nil {
5052
log.Warn().Err(err).
5153
Str("entrypoint", c.FullPath()).
5254
Str("dataFormat", string(dataFormat)).
@@ -58,3 +60,36 @@ func makeEntrypointHandler(dataFormat parser.DataFormat) func(*gin.Context) {
5860
c.JSON(http.StatusOK, nil)
5961
}
6062
}
63+
64+
func makeNSCAHandler() nsca.DataHandler {
65+
return nsca.AdaptHandler(func(p []byte) error {
66+
ctx, span := services.StartTraceSpan(context.Background(), "connectors", "EntrypointHandler")
67+
err := processData(ctx, p, parser.NSCA)
68+
if err != nil {
69+
log.Warn().Err(err).
70+
Str("entrypoint", "NSCA").
71+
Msg("could not process incoming request")
72+
}
73+
services.EndTraceSpan(span,
74+
services.TraceAttrError(err),
75+
services.TraceAttrPayloadLen(p),
76+
services.TraceAttrEntrypoint("NSCA"),
77+
)
78+
return err
79+
})
80+
}
81+
82+
func processData(ctx context.Context, payload []byte, dataFormat parser.DataFormat) error {
83+
ctxN, span := services.StartTraceSpan(ctx, "connectors", "processData")
84+
monitoredResources, err := parser.Parse(payload, dataFormat)
85+
86+
services.EndTraceSpan(span,
87+
services.TraceAttrError(err),
88+
services.TraceAttrPayloadLen(payload),
89+
)
90+
91+
if err != nil {
92+
return err
93+
}
94+
return connectors.SendMetrics(ctxN, *monitoredResources, nil)
95+
}

connectors/nsca-connector/parser/parser.go

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package parser
22

33
import (
44
"bytes"
5-
"context"
65
"errors"
76
"fmt"
87
"regexp"
@@ -12,7 +11,6 @@ import (
1211

1312
"github.com/gwos/tcg/connectors"
1413
"github.com/gwos/tcg/milliseconds"
15-
"github.com/gwos/tcg/services"
1614
"github.com/gwos/tcg/transit"
1715
)
1816

@@ -42,32 +40,7 @@ const (
4240
type MetricsMap map[string][]transit.TimeSeries
4341
type ServicesMap map[string][]transit.DynamicMonitoredService
4442

45-
func ProcessMetrics(ctx context.Context, payload []byte, dataFormat DataFormat) (*[]transit.DynamicMonitoredResource, error) {
46-
var (
47-
ctxN context.Context
48-
err error
49-
monitoredResources *[]transit.DynamicMonitoredResource
50-
span services.TraceSpan
51-
)
52-
53-
ctxN, span = services.StartTraceSpan(ctx, "connectors", "parseBody")
54-
monitoredResources, err = parse(payload, dataFormat)
55-
56-
services.EndTraceSpan(span,
57-
services.TraceAttrError(err),
58-
services.TraceAttrPayloadLen(payload),
59-
)
60-
61-
if err != nil {
62-
return nil, err
63-
}
64-
if err := connectors.SendMetrics(ctxN, *monitoredResources, nil); err != nil {
65-
return nil, err
66-
}
67-
return monitoredResources, nil
68-
}
69-
70-
func parse(payload []byte, dataFormat DataFormat) (*[]transit.DynamicMonitoredResource, error) {
43+
func Parse(payload []byte, dataFormat DataFormat) (*[]transit.DynamicMonitoredResource, error) {
7144
metricsLines := strings.Split(string(bytes.Trim(payload, " \n\r")), "\n")
7245

7346
var (

connectors/nsca-connector/parser/parser_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Server3;Disks3;2;CRITICAL - load average: 2.45, 2.32, 2.22|load1=2.450;0.000;0.0
1414
awips-demo-4;example-service-10;0;OK - example-service-10 (2021-08-09 15:47:38 :: 1628524058) | result=147ms;;;0;`,
1515
)
1616

17-
monitoredResources, err := parse(data, NSCA)
17+
monitoredResources, err := Parse(data, NSCA)
1818
assert.NoError(t, err)
1919

2020
assert.Equal(t, 4, len(*monitoredResources), "invalid count of monitored resources")
@@ -56,7 +56,7 @@ S;1628530909;awips-demo-2;example-service-6;0;OK - example-service-6 (2021-08-09
5656
S;1628530909;awips-demo-2;example-service-7;0;OK - example-service-7 (2021-08-09 17:41:49 :: 1628530909) | result=63ms;;;0;`,
5757
)
5858

59-
monitoredResources, err := parse(data, Bronx)
59+
monitoredResources, err := Parse(data, Bronx)
6060
assert.NoError(t, err)
6161

6262
assert.Equal(t, 4, len(*monitoredResources), "invalid count of monitored resources")
@@ -94,7 +94,7 @@ S;1628546296;awips-demo-2;example-service-4;0;OK - example-service-4 (2021-08-09
9494
S;1628546296;awips-demo-2;example-service-5;0;OK - example-service-5 (2021-08-09 21:58:16 :: 1628546296) | result=63ms;;;0;`,
9595
)
9696

97-
monitoredResources, err := parse(data, Bronx)
97+
monitoredResources, err := Parse(data, Bronx)
9898
assert.NoError(t, err)
9999

100100
assert.Equal(t, 1, len(*monitoredResources), "invalid count of monitored resources")

0 commit comments

Comments
 (0)