Skip to content

Commit 2b5ac96

Browse files
committed
Implement remote write v2
Signed-off-by: SungJin1212 <[email protected]>
1 parent d54383e commit 2b5ac96

24 files changed

+8990
-98
lines changed

.github/workflows/test-build-deploy.yml

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ jobs:
144144
- integration_querier
145145
- integration_ruler
146146
- integration_query_fuzz
147+
- integration_remote_write_v2
147148
steps:
148149
- name: Upgrade golang
149150
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
18+
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
1819
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1920
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
2021
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228

integration/e2e/util.go

+76
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/prometheus/model/histogram"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/prompb"
21+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com/prometheus/prometheus/storage"
2223
"github.com/prometheus/prometheus/tsdb"
2324
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -149,6 +150,40 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
149150
return
150151
}
151152

153+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
154+
tsMillis := TimeToMilliseconds(ts)
155+
156+
st := writev2.NewSymbolTable()
157+
158+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
159+
for _, lbl := range additionalLabels {
160+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
161+
}
162+
163+
var (
164+
h *histogram.Histogram
165+
fh *histogram.FloatHistogram
166+
ph writev2.Histogram
167+
)
168+
if floatHistogram {
169+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
170+
ph = writev2.FromFloatHistogram(tsMillis, fh)
171+
} else {
172+
h = tsdbutil.GenerateTestHistogram(int(i))
173+
ph = writev2.FromIntHistogram(tsMillis, h)
174+
}
175+
176+
// Generate the series
177+
series = append(series, writev2.TimeSeries{
178+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
179+
Histograms: []writev2.Histogram{ph},
180+
})
181+
182+
symbols = st.Symbols()
183+
184+
return
185+
}
186+
152187
func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
153188
tsMillis := TimeToMilliseconds(ts)
154189

@@ -188,6 +223,47 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
188223
return
189224
}
190225

226+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
227+
tsMillis := TimeToMilliseconds(ts)
228+
value := rand.Float64()
229+
230+
st := writev2.NewSymbolTable()
231+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
232+
233+
for _, label := range additionalLabels {
234+
lbs = append(lbs, labels.Label{
235+
Name: label.Name,
236+
Value: label.Value,
237+
})
238+
}
239+
series = append(series, writev2.TimeSeries{
240+
// Generate the series
241+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
242+
Samples: []writev2.Sample{
243+
{Value: value, Timestamp: tsMillis},
244+
},
245+
Metadata: writev2.Metadata{
246+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
247+
},
248+
})
249+
symbols = st.Symbols()
250+
251+
// Generate the expected vector when querying it
252+
metric := model.Metric{}
253+
metric[labels.MetricName] = model.LabelValue(name)
254+
for _, lbl := range additionalLabels {
255+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
256+
}
257+
258+
vector = append(vector, &model.Sample{
259+
Metric: metric,
260+
Value: model.SampleValue(value),
261+
Timestamp: model.Time(tsMillis),
262+
})
263+
264+
return
265+
}
266+
191267
func GenerateSeriesWithSamples(
192268
name string,
193269
startTime time.Time,

integration/e2ecortex/client.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424
"github.com/prometheus/prometheus/model/labels"
2525
"github.com/prometheus/prometheus/model/rulefmt"
2626
"github.com/prometheus/prometheus/prompb"
27+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com/prometheus/prometheus/storage"
2829
"github.com/prometheus/prometheus/storage/remote"
29-
yaml "gopkg.in/yaml.v3"
30-
3130
"go.opentelemetry.io/collector/pdata/pcommon"
3231
"go.opentelemetry.io/collector/pdata/pmetric"
3332
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
33+
yaml "gopkg.in/yaml.v3"
3434

3535
"github.com/cortexproject/cortex/pkg/ruler"
3636
"github.com/cortexproject/cortex/pkg/util/backoff"
@@ -114,6 +114,39 @@ func NewPromQueryClient(address string) (*Client, error) {
114114
return c, nil
115115
}
116116

117+
// PushV2 the input timeseries to the remote endpoint
118+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
119+
// Create write request
120+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
121+
if err != nil {
122+
return nil, err
123+
}
124+
125+
// Create HTTP request
126+
compressed := snappy.Encode(nil, data)
127+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
128+
if err != nil {
129+
return nil, err
130+
}
131+
132+
req.Header.Add("Content-Encoding", "snappy")
133+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
134+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
135+
req.Header.Set("X-Scope-OrgID", c.orgID)
136+
137+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
138+
defer cancel()
139+
140+
// Execute HTTP request
141+
res, err := c.httpClient.Do(req.WithContext(ctx))
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
defer res.Body.Close()
147+
return res, nil
148+
}
149+
117150
// Push the input timeseries to the remote endpoint
118151
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
119152
// Create write request
@@ -356,6 +389,11 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
356389
return value, err
357390
}
358391

392+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
393+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
394+
return metadata, err
395+
}
396+
359397
// QueryExemplars runs an exemplars query
360398
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
361399
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

integration/remote_write_v2_test.go

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//go:build integration_remote_write_v2
2+
// +build integration_remote_write_v2
3+
4+
package integration
5+
6+
import (
7+
"math/rand"
8+
"net/http"
9+
"path"
10+
"testing"
11+
"time"
12+
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/prompb"
15+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
16+
"github.com/prometheus/prometheus/tsdb/tsdbutil"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
20+
"github.com/cortexproject/cortex/integration/e2e"
21+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22+
"github.com/cortexproject/cortex/integration/e2ecortex"
23+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24+
)
25+
26+
func TestIngest(t *testing.T) {
27+
const blockRangePeriod = 5 * time.Second
28+
29+
s, err := e2e.NewScenario(networkName)
30+
require.NoError(t, err)
31+
defer s.Close()
32+
33+
// Start dependencies.
34+
consul := e2edb.NewConsulWithName("consul")
35+
require.NoError(t, s.StartAndWaitReady(consul))
36+
37+
flags := mergeFlags(
38+
AlertmanagerLocalFlags(),
39+
map[string]string{
40+
"-store.engine": blocksStorageEngine,
41+
"-blocks-storage.backend": "filesystem",
42+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
43+
"-blocks-storage.bucket-store.sync-interval": "15m",
44+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
45+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
46+
"-querier.query-store-for-labels-enabled": "true",
47+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
48+
"-blocks-storage.tsdb.ship-interval": "1s",
49+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
50+
"-blocks-storage.tsdb.enable-native-histograms": "true",
51+
// Ingester.
52+
"-ring.store": "consul",
53+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
54+
// Distributor.
55+
"-distributor.replication-factor": "1",
56+
// Store-gateway.
57+
"-store-gateway.sharding-enabled": "false",
58+
// alert manager
59+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
60+
},
61+
)
62+
63+
// make alert manager config dir
64+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
65+
66+
path := path.Join(s.SharedDir(), "cortex-1")
67+
68+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
69+
// Start Cortex replicas.
70+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
71+
require.NoError(t, s.StartAndWaitReady(cortex))
72+
73+
// Wait until Cortex replicas have updated the ring state.
74+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
75+
76+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
77+
require.NoError(t, err)
78+
79+
now := time.Now()
80+
81+
// series push
82+
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
83+
res, err := c.PushV2(symbols1, series)
84+
require.NoError(t, err)
85+
require.Equal(t, 200, res.StatusCode)
86+
testPushHeader(t, res.Header, "1", "0", "0")
87+
88+
// sample
89+
result, err := c.Query("test_series", now)
90+
require.NoError(t, err)
91+
assert.Equal(t, expectedVector, result.(model.Vector))
92+
93+
// metadata
94+
metadata, err := c.Metadata("test_series", "")
95+
require.NoError(t, err)
96+
require.Equal(t, 1, len(metadata["test_series"]))
97+
98+
// histogram
99+
histogramIdx := rand.Uint32()
100+
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
101+
res, err = c.PushV2(symbols2, histogramSeries)
102+
require.NoError(t, err)
103+
require.Equal(t, 200, res.StatusCode)
104+
testPushHeader(t, res.Header, "1", "1", "0")
105+
106+
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
107+
res, err = c.PushV2(symbols3, histogramFloatSeries)
108+
require.NoError(t, err)
109+
require.Equal(t, 200, res.StatusCode)
110+
testPushHeader(t, res.Header, "1", "1", "0")
111+
112+
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
113+
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
114+
result, err = c.Query(`test_histogram`, testHistogramTimestamp)
115+
require.NoError(t, err)
116+
require.Equal(t, model.ValVector, result.Type())
117+
v := result.(model.Vector)
118+
require.Equal(t, 2, v.Len())
119+
for _, s := range v {
120+
require.NotNil(t, s.Histogram)
121+
require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count))
122+
require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum))
123+
}
124+
}
125+
126+
func TestExemplar(t *testing.T) {
127+
s, err := e2e.NewScenario(networkName)
128+
require.NoError(t, err)
129+
defer s.Close()
130+
131+
// Start dependencies.
132+
consul := e2edb.NewConsulWithName("consul")
133+
require.NoError(t, s.StartAndWaitReady(consul))
134+
135+
flags := mergeFlags(
136+
AlertmanagerLocalFlags(),
137+
map[string]string{
138+
"-store.engine": blocksStorageEngine,
139+
"-blocks-storage.backend": "filesystem",
140+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
141+
"-blocks-storage.bucket-store.sync-interval": "15m",
142+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
143+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
144+
"-querier.query-store-for-labels-enabled": "true",
145+
"-blocks-storage.tsdb.ship-interval": "1s",
146+
"-blocks-storage.tsdb.enable-native-histograms": "true",
147+
// Ingester.
148+
"-ring.store": "consul",
149+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
150+
"-ingester.max-exemplars": "100",
151+
// Distributor.
152+
"-distributor.replication-factor": "1",
153+
// Store-gateway.
154+
"-store-gateway.sharding-enabled": "false",
155+
// alert manager
156+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
157+
},
158+
)
159+
160+
// make alert manager config dir
161+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
162+
163+
path := path.Join(s.SharedDir(), "cortex-1")
164+
165+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
166+
// Start Cortex replicas.
167+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
168+
require.NoError(t, s.StartAndWaitReady(cortex))
169+
170+
// Wait until Cortex replicas have updated the ring state.
171+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
172+
173+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
174+
require.NoError(t, err)
175+
176+
now := time.Now()
177+
tsMillis := e2e.TimeToMilliseconds(now)
178+
179+
symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
180+
timeseries := []writev2.TimeSeries{
181+
{
182+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
183+
Metadata: writev2.Metadata{
184+
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.
185+
186+
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
187+
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
188+
},
189+
Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}},
190+
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
191+
},
192+
}
193+
194+
res, err := c.PushV2(symbols, timeseries)
195+
require.NoError(t, err)
196+
require.Equal(t, 200, res.StatusCode)
197+
testPushHeader(t, res.Header, "1", "0", "1")
198+
199+
start := time.Now().Add(-time.Minute)
200+
end := now.Add(time.Minute)
201+
202+
exemplars, err := c.QueryExemplars("test_metric", start, end)
203+
require.NoError(t, err)
204+
require.Equal(t, 1, len(exemplars))
205+
}
206+
207+
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
208+
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
209+
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
210+
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
211+
}

0 commit comments

Comments
 (0)