Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support remote write v2 #6292

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))

# Manually declared dependencies And what goes into each exe
pkg/cortexpb/cortex.pb.go: pkg/cortexpb/cortex.proto
pkg/cortexpbv2/cortexv2.pb.go: pkg/cortexpbv2/cortexv2.proto
pkg/ingester/client/ingester.pb.go: pkg/ingester/client/ingester.proto
pkg/distributor/distributorpb/distributor.pb.go: pkg/distributor/distributorpb/distributor.proto
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
Expand Down
77 changes: 77 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)

Expand Down Expand Up @@ -149,6 +151,40 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
return
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()

lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
for _, lbl := range additionalLabels {
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph cortexpbv2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h)
}

// Generate the series
series = append(series, cortexpbv2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []cortexpbv2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

Expand Down Expand Up @@ -188,6 +224,47 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
series = append(series, cortexpbv2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []cortexpbv2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: cortexpbv2.Metadata{
Type: cortexpbv2.METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}

func GenerateSeriesWithSamples(
name string,
startTime time.Time,
Expand Down
42 changes: 40 additions & 2 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
yaml "gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
)
Expand Down Expand Up @@ -113,6 +113,39 @@ func NewPromQueryClient(address string) (*Client, error) {
return c, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
// Create write request
Expand Down Expand Up @@ -336,6 +369,11 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
Loading
Loading