forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzipkin.go
182 lines (151 loc) · 5.05 KB
/
zipkin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package zipkin
import (
"context"
"fmt"
"log"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
// DefaultPort is the default port zipkin listens on, which zipkin implementations
// expect.
DefaultPort = 9411
// DefaultRoute is the default route zipkin uses, and zipkin implementations
// expect.
DefaultRoute = "/api/v1/spans"
// DefaultShutdownTimeout is the max amount of time telegraf will wait
// for the plugin to shutdown
DefaultShutdownTimeout = 5
)
// Recorder represents a type which can record zipkin trace data as well as
// any accompanying errors, and process that data.
type Recorder interface {
Record(Trace) error
Error(error)
}
// Handler represents a type which can register itself with a router for
// http routing, and a Recorder for trace data collection.
type Handler interface {
Register(router *mux.Router, recorder Recorder) error
}
// BinaryAnnotation represents a zipkin binary annotation. It contains
// all of the same fields as might be found in its zipkin counterpart.
type BinaryAnnotation struct {
Key string
Value string
Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port
ServiceName string
Type string
}
// Annotation represents an ordinary zipkin annotation. It contains the data fields
// which will become fields/tags in influxdb
type Annotation struct {
Timestamp time.Time
Value string
Host string // annotation.endpoint.ipv4 + ":" + annotation.endpoint.port
ServiceName string
}
//Span represents a specific zipkin span. It holds the majority of the same
// data as a zipkin span sent via the thrift protocol, but is presented in a
// format which is more straightforward for storage purposes.
type Span struct {
ID string
TraceID string // zipkin traceid high concat with traceid
Name string
ParentID string
ServiceName string
Timestamp time.Time // If zipkin input is nil then time.Now()
Duration time.Duration
Annotations []Annotation
BinaryAnnotations []BinaryAnnotation
}
// Trace is an array (or a series) of spans
type Trace []Span
const sampleConfig = `
# path = "/api/v1/spans" # URL path for span data
# port = 9411 # Port on which Telegraf listens
`
// Zipkin is a telegraf configuration structure for the zipkin input plugin,
// but it also contains fields for the management of a separate, concurrent
// zipkin http server
type Zipkin struct {
ServiceAddress string
Port int
Path string
address string
handler Handler
server *http.Server
waitGroup *sync.WaitGroup
}
// Description is a necessary method implementation from telegraf.ServiceInput
func (z Zipkin) Description() string {
return "This plugin implements the Zipkin http server to gather trace and timing data needed to troubleshoot latency problems in microservice architectures."
}
// SampleConfig is a necessary method implementation from telegraf.ServiceInput
func (z Zipkin) SampleConfig() string {
return sampleConfig
}
// Gather is empty for the zipkin plugin; all gathering is done through
// the separate goroutine launched in (*Zipkin).Start()
func (z *Zipkin) Gather(acc telegraf.Accumulator) error { return nil }
// Start launches a separate goroutine for collecting zipkin client http requests,
// passing in a telegraf.Accumulator such that data can be collected.
func (z *Zipkin) Start(acc telegraf.Accumulator) error {
z.handler = NewSpanHandler(z.Path)
var wg sync.WaitGroup
z.waitGroup = &wg
router := mux.NewRouter()
converter := NewLineProtocolConverter(acc)
z.handler.Register(router, converter)
z.server = &http.Server{
Handler: router,
}
addr := ":" + strconv.Itoa(z.Port)
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
z.address = ln.Addr().String()
log.Printf("I! Started the zipkin listener on %s", z.address)
go func() {
wg.Add(1)
defer wg.Done()
z.Listen(ln, acc)
}()
return nil
}
// Stop shuts the internal http server down with via context.Context
func (z *Zipkin) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), DefaultShutdownTimeout)
defer z.waitGroup.Wait()
defer cancel()
z.server.Shutdown(ctx)
}
// Listen creates an http server on the zipkin instance it is called with, and
// serves http until it is stopped by Zipkin's (*Zipkin).Stop() method.
func (z *Zipkin) Listen(ln net.Listener, acc telegraf.Accumulator) {
if err := z.server.Serve(ln); err != nil {
// Because of the clean shutdown in `(*Zipkin).Stop()`
// We're expecting a server closed error at some point
// So we don't want to display it as an error.
// This interferes with telegraf's internal data collection,
// by making it appear as if a serious error occurred.
if err != http.ErrServerClosed {
acc.AddError(fmt.Errorf("E! Error listening: %v", err))
}
}
}
func init() {
inputs.Add("zipkin", func() telegraf.Input {
return &Zipkin{
Path: DefaultRoute,
Port: DefaultPort,
}
})
}