-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite.go
137 lines (109 loc) · 3.14 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Prometheus remote write client, the code is inspired from
// https://github.com/prometheus/prometheus/blob/main/storage/remote/queue_manager.go
package main
import (
"context"
"fmt"
"net/url"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/rs/zerolog/log"
)
// The timeout of requests sent to the API.
const clientTimeout = 10 * time.Second
// Writer allows writing to a Prometheus remote write API.
type Writer struct {
client remote.WriteClient
buf []byte
pBuf *proto.Buffer
}
type sample struct {
labels labels.Labels
value float64
// Timestamp in ms.
timestamp int64
}
// NewWriter returns a new initialized writer.
func NewWriter(rawURL string) *Writer {
u, err := url.Parse(rawURL)
if err != nil {
log.Fatal().Err(err).Msgf("Failed to parse remote write URL '%s'", rawURL)
}
conf := &remote.ClientConfig{
URL: &config.URL{URL: u},
Timeout: model.Duration(clientTimeout),
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
client, err := remote.NewWriteClient("", conf)
if err != nil {
log.Fatal().Err(err).Msgf("Failed to create remote write client")
}
w := &Writer{
client: client,
pBuf: proto.NewBuffer(nil),
buf: []byte{},
}
return w
}
// Write samples to the configured prometheus remote Write endpoint.
func (w *Writer) Write(ctx context.Context, samples []sample) error {
series := samplesToTimeseries(samples)
req, err := w.buildWriteRequest(series)
if err != nil {
return err
}
if _, err = w.client.Store(ctx, req, 0); err != nil {
return fmt.Errorf("remote write: %w", err)
}
return nil
}
func (w *Writer) buildWriteRequest(samples []prompb.TimeSeries) ([]byte, error) {
req := &prompb.WriteRequest{
Timeseries: samples,
}
w.pBuf.Reset()
err := w.pBuf.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal write request: %w", err)
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
w.buf = w.buf[0:cap(w.buf)]
// Reuse the buffer allocated by snappy.
w.buf = snappy.Encode(w.buf, w.pBuf.Bytes())
return w.buf, nil
}
func samplesToTimeseries(samples []sample) []prompb.TimeSeries {
series := make([]prompb.TimeSeries, len(samples))
for nPending, d := range samples {
series[nPending].Labels = labelsToLabelsProto(d.labels, series[nPending].Labels)
series[nPending].Samples = []prompb.Sample{
{
Value: d.value,
Timestamp: d.timestamp,
},
}
}
return series
}
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto(labels labels.Labels, buf []prompb.Label) []prompb.Label {
result := buf[:0]
if cap(buf) < len(labels) {
result = make([]prompb.Label, 0, len(labels))
}
for _, l := range labels {
result = append(result, prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
return result
}