diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 0adc65135a..a16b62fae6 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -220,87 +220,231 @@ func TestOTLPConvertToPromTS(t *testing.T) { } } +// for testing +type resetReader struct { + *bytes.Reader + body []byte +} + +func newResetReader(body []byte) *resetReader { + return &resetReader{ + Reader: bytes.NewReader(body), + body: body, + } +} + +func (r *resetReader) Reset() { + r.Reader.Reset(r.body) +} + +func (r *resetReader) Close() error { + return nil +} + +func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, encodingType string) (*http.Request, error) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + var body []byte + var err error + switch contentType { + case jsonContentType: + body, err = otlpRequest.MarshalJSON() + if err != nil { + return nil, err + } + case pbContentType: + body, err = otlpRequest.MarshalProto() + if err != nil { + return nil, err + } + } + + if encodingType == "gzip" { + var gzipBody bytes.Buffer + gz := gzip.NewWriter(&gzipBody) + _, err = gz.Write(body) + if err != nil { + return nil, err + } + if err = gz.Close(); err != nil { + return nil, err + } + body = gzipBody.Bytes() + } + + req, err := http.NewRequestWithContext(ctx, "", "", newResetReader(body)) + if err != nil { + return nil, err + } + + switch contentType { + case jsonContentType: + req.Header.Set("Content-Type", jsonContentType) + case pbContentType: + req.Header.Set("Content-Type", pbContentType) + } + + if encodingType != "" { + req.Header.Set("Content-Encoding", encodingType) + } + req.ContentLength = int64(len(body)) + + return req, nil +} + +func BenchmarkOTLPWriteHandler(b *testing.B) { + cfg := distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: false, + } + overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + require.NoError(b, err) + + exportRequest := generateOTLPWriteRequest() + mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil + } + handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc) + + b.Run("json with no compression", func(b *testing.B) { + req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "") + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(b, http.StatusOK, resp.StatusCode) + req.Body.(*resetReader).Reset() + } + }) + b.Run("json with gzip", func(b *testing.B) { + req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "gzip") + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(b, http.StatusOK, resp.StatusCode) + req.Body.(*resetReader).Reset() + } + }) + b.Run("proto with no compression", func(b *testing.B) { + req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "") + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(b, http.StatusOK, resp.StatusCode) + req.Body.(*resetReader).Reset() + } + }) + b.Run("proto with gzip", func(b *testing.B) { + req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip") + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(b, http.StatusOK, resp.StatusCode) + req.Body.(*resetReader).Reset() + } + }) +} + func TestOTLPWriteHandler(t *testing.T) { cfg := distributor.OTLPConfig{ ConvertAllAttributes: false, DisableTargetInfo: false, } - exportRequest := generateOTLPWriteRequest(t) + exportRequest := generateOTLPWriteRequest() tests := []struct { description string maxRecvMsgSize int - format string + contentType string expectedStatusCode int expectedErrMsg string - gzipCompression bool encodingType string }{ { description: "Test proto format write with no compression", maxRecvMsgSize: 10000, - format: pbContentType, + contentType: pbContentType, expectedStatusCode: http.StatusOK, }, { description: "Test proto format write with gzip", maxRecvMsgSize: 10000, - format: pbContentType, + contentType: pbContentType, expectedStatusCode: http.StatusOK, encodingType: "gzip", - gzipCompression: true, }, { description: "Test json format write with no compression", maxRecvMsgSize: 10000, - format: jsonContentType, + contentType: jsonContentType, expectedStatusCode: http.StatusOK, }, { description: "Test json format write with gzip", maxRecvMsgSize: 10000, - format: jsonContentType, + contentType: jsonContentType, expectedStatusCode: http.StatusOK, encodingType: "gzip", - gzipCompression: true, }, { description: "request too big than maxRecvMsgSize (proto) with no compression", maxRecvMsgSize: 10, - format: pbContentType, + contentType: pbContentType, expectedStatusCode: http.StatusBadRequest, expectedErrMsg: "received message larger than max", }, { description: "request too big than maxRecvMsgSize (proto) with gzip", maxRecvMsgSize: 10, - format: pbContentType, + contentType: pbContentType, expectedStatusCode: http.StatusBadRequest, expectedErrMsg: "received message larger than max", encodingType: "gzip", - gzipCompression: true, }, { description: "request too big than maxRecvMsgSize (json) with no compression", maxRecvMsgSize: 10, - format: jsonContentType, + contentType: jsonContentType, expectedStatusCode: http.StatusBadRequest, expectedErrMsg: "received message larger than max", }, { description: "request too big than maxRecvMsgSize (json) with gzip", maxRecvMsgSize: 10, - format: jsonContentType, + contentType: jsonContentType, expectedStatusCode: http.StatusBadRequest, expectedErrMsg: "received message larger than max", encodingType: "gzip", - gzipCompression: true, }, { description: "invalid encoding type: snappy", maxRecvMsgSize: 10000, - format: jsonContentType, + contentType: jsonContentType, expectedStatusCode: http.StatusBadRequest, encodingType: "snappy", }, @@ -308,45 +452,8 @@ func TestOTLPWriteHandler(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - ctx := context.Background() - ctx = user.InjectOrgID(ctx, "user-1") - var req *http.Request - - compressionFunc := func(t *testing.T, body []byte) []byte { - var b bytes.Buffer - gz := gzip.NewWriter(&b) - _, err := gz.Write(body) - require.NoError(t, err) - require.NoError(t, gz.Close()) - - return b.Bytes() - } - - if test.format == pbContentType { - buf, err := exportRequest.MarshalProto() - require.NoError(t, err) - - if test.gzipCompression { - buf = compressionFunc(t, buf) - } - - req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) - require.NoError(t, err) - req.Header.Set("Content-Type", pbContentType) - req.Header.Set("Content-Encoding", test.encodingType) - } else { - buf, err := exportRequest.MarshalJSON() - require.NoError(t, err) - - if test.gzipCompression { - buf = compressionFunc(t, buf) - } - - req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) - require.NoError(t, err) - req.Header.Set("Content-Type", jsonContentType) - req.Header.Set("Content-Encoding", test.encodingType) - } + req, err := getOTLPHttpRequest(&exportRequest, test.contentType, test.encodingType) + require.NoError(t, err) push := verifyOTLPWriteRequestHandler(t, cortexpb.API) overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) @@ -368,7 +475,7 @@ func TestOTLPWriteHandler(t *testing.T) { } } -func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { +func generateOTLPWriteRequest() pmetricotlp.ExportRequest { d := pmetric.NewMetrics() // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram