diff --git a/ddtrace/opentelemetry/otel_test.go b/ddtrace/opentelemetry/otel_test.go index 81ea64d62a..78c17670f9 100644 --- a/ddtrace/opentelemetry/otel_test.go +++ b/ddtrace/opentelemetry/otel_test.go @@ -11,7 +11,6 @@ import ( "context" "net/http" "net/http/httptest" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -23,6 +22,7 @@ import ( ) func TestHttpDistributedTrace(t *testing.T) { + assert := assert.New(t) tp, payloads, cleanup := mockTracerProvider(t) defer cleanup() otel.SetTracerProvider(tp) @@ -33,11 +33,10 @@ func TestHttpDistributedTrace(t *testing.T) { w := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { receivedSpan := oteltrace.SpanFromContext(r.Context()) - assert.Equal(t, rootSpan.SpanContext().TraceID(), receivedSpan.SpanContext().TraceID()) + assert.Equal(rootSpan.SpanContext().TraceID(), receivedSpan.SpanContext().TraceID()) }), "testOperation") testServer := httptest.NewServer(w) defer testServer.Close() - c := http.Client{Transport: otelhttp.NewTransport(nil)} req, err := http.NewRequestWithContext(sctx, http.MethodGet, testServer.URL, nil) require.NoError(t, err) @@ -47,12 +46,11 @@ func TestHttpDistributedTrace(t *testing.T) { rootSpan.End() p := <-payloads - numSpans := strings.Count(p, "\"span_id\"") - assert.Equal(t, 3, numSpans) - assert.Contains(t, p, `"name":"internal"`) - assert.Contains(t, p, `"name":"server.request`) - assert.Contains(t, p, `"name":"client.request"`) - assert.Contains(t, p, `"resource":"testRootSpan"`) - assert.Contains(t, p, `"resource":"testOperation"`) - assert.Contains(t, p, `"resource":"HTTP GET"`) + assert.Len(p, 2) + assert.Equal("server.request", p[0][0]["name"]) + assert.Equal("internal", p[1][0]["name"]) + assert.Equal("client.request", p[1][1]["name"]) + assert.Equal("testOperation", p[0][0]["resource"]) + assert.Equal("testRootSpan", p[1][0]["resource"]) + assert.Equal("HTTP GET", p[1][1]["resource"]) } diff --git a/ddtrace/opentelemetry/span.go b/ddtrace/opentelemetry/span.go index 10c68433ce..cbfb76daa4 100644 --- a/ddtrace/opentelemetry/span.go +++ b/ddtrace/opentelemetry/span.go @@ -10,6 +10,7 @@ import ( "errors" "strconv" "strings" + "sync" "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" @@ -24,7 +25,8 @@ import ( var _ oteltrace.Span = (*span)(nil) type span struct { - noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations + noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations + mu sync.RWMutex `msg:"-"` // all fields are protected by this RWMutex DD *tracer.Span finished bool attributes map[string]interface{} @@ -37,10 +39,14 @@ type span struct { func (s *span) TracerProvider() oteltrace.TracerProvider { return s.oteltracer.provider } func (s *span) SetName(name string) { + s.mu.Lock() + defer s.mu.Unlock() s.attributes[ext.SpanName] = strings.ToLower(name) } func (s *span) End(options ...oteltrace.SpanEndOption) { + s.mu.Lock() + defer s.mu.Unlock() if s.finished { return } @@ -151,6 +157,8 @@ type statusInfo struct { // value before (OK > Error > Unset), the code will not be changed. // The code and description are set once when the span is finished. func (s *span) SetStatus(code otelcodes.Code, description string) { + s.mu.Lock() + defer s.mu.Unlock() if code >= s.statusInfo.code { s.statusInfo = statusInfo{code, description} } @@ -169,6 +177,8 @@ func (s *span) SetStatus(code otelcodes.Code, description string) { // The list of reserved tags might be extended in the future. // Any other non-reserved tags will be set as provided. func (s *span) SetAttributes(kv ...attribute.KeyValue) { + s.mu.Lock() + defer s.mu.Unlock() for _, kv := range kv { if k, v := toReservedAttributes(string(kv.Key), kv.Value); k != "" { s.attributes[k] = v diff --git a/ddtrace/opentelemetry/span_test.go b/ddtrace/opentelemetry/span_test.go index 051fa8e826..d8fa19af02 100644 --- a/ddtrace/opentelemetry/span_test.go +++ b/ddtrace/opentelemetry/span_test.go @@ -8,6 +8,7 @@ package opentelemetry import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -28,21 +29,41 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" ) -func mockTracerProvider(t *testing.T, opts ...tracer.StartOption) (tp *TracerProvider, payloads chan string, cleanup func()) { - payloads = make(chan string) +type traces [][]map[string]interface{} + +func mockTracerProvider(t *testing.T, opts ...tracer.StartOption) (tp *TracerProvider, payloads chan traces, cleanup func()) { + payloads = make(chan traces) s, c := httpmem.ServerAndClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/v0.4/traces": if h := r.Header.Get("X-Datadog-Trace-Count"); h == "0" { return } - buf, err := io.ReadAll(r.Body) + req := r.Clone(context.Background()) + defer req.Body.Close() + buf, err := io.ReadAll(req.Body) if err != nil || len(buf) == 0 { - t.Fatalf("Test agent: Error receiving traces") + t.Fatalf("Test agent: Error receiving traces: %v", err) + } + var payload bytes.Buffer + _, err = msgp.UnmarshalAsJSON(&payload, buf) + if err != nil { + t.Fatalf("Failed to unmarshal payload bytes as JSON: %v", err) + } + var tr [][]map[string]interface{} + err = json.Unmarshal(payload.Bytes(), &tr) + if err != nil || len(tr) == 0 { + t.Fatalf("Failed to unmarshal payload bytes as trace: %v", err) + } + payloads <- tr + default: + if r.Method == "GET" { + // Write an empty JSON object to the output, to avoid spurious decoding + // errors to be reported in the logs, which may lead someone + // investigating a test failure into the wrong direction. + w.Write([]byte("{}")) + return } - var js bytes.Buffer - msgp.UnmarshalAsJSON(&js, buf) - payloads <- js.String() } w.WriteHeader(200) })) @@ -50,24 +71,27 @@ func mockTracerProvider(t *testing.T, opts ...tracer.StartOption) (tp *TracerPro tp = NewTracerProvider(opts...) otel.SetTracerProvider(tp) return tp, payloads, func() { - s.Close() - tp.Shutdown() + if err := s.Close(); err != nil { + t.Fatalf("Test Agent server Close failure: %v", err) + } + if err := tp.Shutdown(); err != nil { + t.Fatalf("Tracer Provider shutdown failure: %v", err) + } } } -func waitForPayload(ctx context.Context, payloads chan string) (string, error) { +func waitForPayload(payloads chan traces) (traces, error) { select { - case <-ctx.Done(): - return "", fmt.Errorf("Timed out waiting for traces") case p := <-payloads: return p, nil + case <-time.After(10 * time.Second): + return nil, fmt.Errorf("Timed out waiting for traces") } } func TestSpanResourceNameDefault(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx := context.Background() _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") @@ -77,39 +101,39 @@ func TestSpanResourceNameDefault(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, `"name":"internal"`) - assert.Contains(p, `"resource":"OperationName"`) + p := traces[0] + assert.Equal("internal", p[0]["name"]) + assert.Equal("OperationName", p[0]["resource"]) } func TestSpanSetName(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() _, sp := tr.Start(ctx, "OldName") sp.SetName("NewName") sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, strings.ToLower("NewName")) + p := traces[0] + assert.Equal(strings.ToLower("NewName"), p[0]["name"]) } func TestSpanEnd(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() @@ -139,22 +163,22 @@ func TestSpanEnd(t *testing.T) { } tracer.Flush() - payload, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } + p := traces[0] - assert.Contains(payload, name) - assert.NotContains(payload, ignoredName) - assert.Contains(payload, msg) - assert.NotContains(payload, ignoredMsg) - assert.Contains(payload, `"error":1`) // this should be an error span - + assert.Equal(name, p[0]["resource"]) + assert.Equal(ext.SpanKindInternal, p[0]["name"]) // default + assert.Equal(1.0, p[0]["error"]) // this should be an error span + meta := fmt.Sprintf("%v", p[0]["meta"]) + assert.Contains(meta, msg) for k, v := range attributes { - assert.Contains(payload, fmt.Sprintf("\"%s\":\"%s\"", k, v)) + assert.Contains(meta, fmt.Sprintf("%s:%s", k, v)) } for k, v := range ignoredAttributes { - assert.NotContains(payload, fmt.Sprintf("\"%s\":\"%s\"", k, v)) + assert.NotContains(meta, fmt.Sprintf("%s:%s", k, v)) } } @@ -193,26 +217,25 @@ func TestSpanSetStatus(t *testing.T) { for _, test := range testData { t.Run(fmt.Sprintf("Setting Code: %d", test.code), func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - var sp oteltrace.Span testStatus := func() { sp.End() tracer.Flush() - payload, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } + p := traces[0] // An error description is set IFF the span has an error // status code value. Messages related to any other status code // are ignored. + meta := fmt.Sprintf("%v", p[0]["meta"]) if test.code == codes.Error { - assert.Contains(payload, test.msg) + assert.Contains(meta, test.msg) } else { - assert.NotContains(payload, test.msg) + assert.NotContains(meta, test.msg) } - assert.NotContains(payload, test.ignoredCode) + assert.NotContains(meta, test.ignoredCode) } _, sp = tr.Start(context.Background(), "test") sp.SetStatus(test.code, test.msg) @@ -229,8 +252,6 @@ func TestSpanSetStatus(t *testing.T) { func TestSpanContextWithStartOptions(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() @@ -262,27 +283,28 @@ func TestSpanContextWithStartOptions(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - if strings.Count(p, "span_id") != 2 { - t.Fatalf("payload does not contain two spans\n%s", p) - } - assert.Contains(p, `"service":"persisted_srv"`) - assert.Contains(p, `"resource":"persisted_ctx_rsc"`) - assert.Contains(p, `"span.kind":"producer"`) - assert.Contains(p, fmt.Sprint(spanID)) - assert.Contains(p, fmt.Sprint(startTime.UnixNano())) - assert.Contains(p, fmt.Sprint(duration.Nanoseconds())) + p := traces[0] + t.Logf("%v", p[0]) + assert.Len(p, 2) + assert.Equal("persisted_srv", p[0]["service"]) + assert.Equal("persisted_ctx_rsc", p[0]["resource"]) + assert.Equal(1234567890.0, p[0]["span_id"]) + assert.Equal("producer", p[0]["name"]) + meta := fmt.Sprintf("%v", p[0]["meta"]) + assert.Contains(meta, "producer") + assert.Equal(float64(startTime.UnixNano()), p[0]["start"]) + assert.Equal(float64(duration.Nanoseconds()), p[0]["duration"]) assert.NotContains(p, "discarded") - assert.Equal(1, strings.Count(p, `"span_id":1234567890`)) + assert.NotEqual(1234567890.0, p[1]["span_id"]) } func TestSpanContextWithStartOptionsPriorityOrder(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() @@ -299,20 +321,21 @@ func TestSpanContextWithStartOptionsPriorityOrder(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, "persisted_ctx_rsc") - assert.Contains(p, "persisted_srv") - assert.Contains(p, `"span.kind":"producer"`) + p := traces[0] + assert.Equal("persisted_srv", p[0]["service"]) + assert.Equal("persisted_ctx_rsc", p[0]["resource"]) + meta := fmt.Sprintf("%v", p[0]["meta"]) + assert.Contains(meta, "producer") assert.NotContains(p, "discarded") } func TestSpanEndOptionsPriorityOrder(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() @@ -331,100 +354,112 @@ func TestSpanEndOptionsPriorityOrder(t *testing.T) { EndOptions(sp, tracer.FinishTime(startTime.Add(time.Second*5))) // EndOptions timestamp should prevail sp.End(oteltrace.WithTimestamp(startTime.Add(time.Second * 3))) + duration := time.Second * 5 // making sure end options don't have effect after the span has returned - EndOptions(sp, tracer.FinishTime(startTime.Add(time.Second*2))) + EndOptions(sp, tracer.FinishTime(startTime.Add(duration))) sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, `"duration":5000000000,`) - assert.NotContains(p, `"duration":2000000000,`) - assert.NotContains(p, `"duration":1000000000,`) - assert.NotContains(p, `"duration":3000000000,`) + p := traces[0] + assert.Equal(float64(duration.Nanoseconds()), p[0]["duration"]) } func TestSpanEndOptions(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() + spanID := uint64(1234567890) startTime := time.Now() + duration := time.Second * 5 _, sp := tr.Start( ContextWithStartOptions(context.Background(), tracer.ResourceName("ctx_rsc"), tracer.ServiceName("ctx_srv"), tracer.StartTime(startTime), - tracer.WithSpanID(1234567890), + tracer.WithSpanID(spanID), ), "op_name") - - EndOptions(sp, tracer.FinishTime(startTime.Add(time.Second*5)), + EndOptions(sp, tracer.FinishTime(startTime.Add(duration)), tracer.WithError(errors.New("persisted_option"))) sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, "ctx_srv") - assert.Contains(p, "ctx_rsc") - assert.Contains(p, "1234567890") - assert.Contains(p, fmt.Sprint(startTime.UnixNano())) - assert.Contains(p, `"duration":5000000000,`) - assert.Contains(p, `persisted_option`) - assert.Contains(p, `"error":1`) + p := traces[0] + assert.Equal("ctx_srv", p[0]["service"]) + assert.Equal("ctx_rsc", p[0]["resource"]) + assert.Equal(1234567890.0, p[0]["span_id"]) + assert.Equal(float64(startTime.UnixNano()), p[0]["start"]) + assert.Equal(float64(duration.Nanoseconds()), p[0]["duration"]) + meta := fmt.Sprintf("%v", p[0]["meta"]) + assert.Contains(meta, "persisted_option") + assert.Equal(1.0, p[0]["error"]) // this should be an error span } func TestSpanSetAttributes(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, payloads, cleanup := mockTracerProvider(t) tr := otel.Tracer("") defer cleanup() - attributes := [][]string{{"k1", "v1_old"}, - {"k2", "v2"}, - {"k1", "v1_new"}, + toBeIgnored := map[string]string{"k1": "v1_old"} + attributes := map[string]string{ + "k2": "v2", + "k1": "v1_new", // maps to 'name' - {"operation.name", "ops"}, + "operation.name": "ops", // maps to 'service' - {"service.name", "srv"}, + "service.name": "srv", // maps to 'resource' - {"resource.name", "rsr"}, + "resource.name": "rsr", // maps to 'type' - {"span.type", "db"}, + "span.type": "db", } _, sp := tr.Start(context.Background(), "test") - for _, tag := range attributes { - sp.SetAttributes(attribute.String(tag[0], tag[1])) + for k, v := range toBeIgnored { + sp.SetAttributes(attribute.String(k, v)) + } + for k, v := range attributes { + sp.SetAttributes(attribute.String(k, v)) } // maps to '_dd1.sr.eausr' sp.SetAttributes(attribute.Int("analytics.event", 1)) sp.End() tracer.Flush() - payload, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(payload, `"k1":"v1_new"`) - assert.Contains(payload, `"k2":"v2"`) - assert.NotContains(payload, "v1_old") + p := traces[0] + meta := fmt.Sprintf("%v", p[0]["meta"]) + for k, v := range toBeIgnored { + assert.NotContains(meta, fmt.Sprintf("%s:%s", k, v)) + } + assert.Contains(meta, fmt.Sprintf("%s:%s", "k1", "v1_new")) + assert.Contains(meta, fmt.Sprintf("%s:%s", "k2", "v2")) // reserved attributes - assert.Contains(payload, `"name":"ops"`) - assert.Contains(payload, `"service":"srv"`) - assert.Contains(payload, `"resource":"rsr"`) - assert.Contains(payload, `"type":"db"`) - assert.Contains(payload, `"_dd1.sr.eausr":1`) + assert.NotContains(meta, fmt.Sprintf("%s:%s", "name", "ops")) + assert.NotContains(meta, fmt.Sprintf("%s:%s", "service", "srv")) + assert.NotContains(meta, fmt.Sprintf("%s:%s", "resource", "rsr")) + assert.NotContains(meta, fmt.Sprintf("%s:%s", "type", "db")) + assert.Equal("ops", p[0]["name"]) + assert.Equal("srv", p[0]["service"]) + assert.Equal("rsr", p[0]["resource"]) + assert.Equal("db", p[0]["type"]) + metrics := fmt.Sprintf("%v", p[0]["metrics"]) + assert.Contains(metrics, fmt.Sprintf("%s:%s", "_dd1.sr.eausr", "1")) } func TestSpanSetAttributesWithRemapping(t *testing.T) { @@ -443,17 +478,16 @@ func TestSpanSetAttributesWithRemapping(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, "graphql.server.request") + p := traces[0] + assert.Equal("graphql.server.request", p[0]["name"]) } func TestTracerStartOptions(t *testing.T) { assert := assert.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, payloads, cleanup := mockTracerProvider(t, tracer.WithEnv("test_env"), tracer.WithService("test_serv")) tr := otel.Tracer("") @@ -462,12 +496,14 @@ func TestTracerStartOptions(t *testing.T) { _, sp := tr.Start(context.Background(), "test") sp.End() tracer.Flush() - payload, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(payload, "\"service\":\"test_serv\"") - assert.Contains(payload, "\"env\":\"test_env\"") + p := traces[0] + assert.Equal("test_serv", p[0]["service"]) + meta := fmt.Sprintf("%v", p[0]["meta"]) + assert.Contains(meta, fmt.Sprintf("%s:%s", "env", "test_env")) } func TestOperationNameRemapping(t *testing.T) { @@ -483,13 +519,15 @@ func TestOperationNameRemapping(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(p, "graphql.server.request") + p := traces[0] + assert.Equal("graphql.server.request", p[0]["name"]) } func TestRemapName(t *testing.T) { + assert := assert.New(t) testCases := []struct { spanKind oteltrace.SpanKind in []attribute.KeyValue @@ -597,10 +635,6 @@ func TestRemapName(t *testing.T) { out: "internal", }, } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _, payloads, cleanup := mockTracerProvider(t, tracer.WithEnv("test_env"), tracer.WithService("test_serv")) tr := otel.Tracer("") defer cleanup() @@ -612,18 +646,18 @@ func TestRemapName(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(t, p, test.out) + p := traces[0] + assert.Equal(test.out, p[0]["name"]) }) } } func TestRemapWithMultipleSetAttributes(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + assert := assert.New(t) _, payloads, cleanup := mockTracerProvider(t, tracer.WithEnv("test_env"), tracer.WithService("test_serv")) tr := otel.Tracer("") @@ -641,13 +675,15 @@ func TestRemapWithMultipleSetAttributes(t *testing.T) { sp.End() tracer.Flush() - p, err := waitForPayload(ctx, payloads) + traces, err := waitForPayload(payloads) if err != nil { t.Fatalf(err.Error()) } - assert.Contains(t, p, `"name":"overriden.name"`) - assert.Contains(t, p, `"resource":"new.name"`) - assert.Contains(t, p, `"service":"new.service.name"`) - assert.Contains(t, p, `"type":"new.span.type"`) - assert.Contains(t, p, `"_dd1.sr.eausr":1`) + p := traces[0] + assert.Equal("overriden.name", p[0]["name"]) + assert.Equal("new.name", p[0]["resource"]) + assert.Equal("new.service.name", p[0]["service"]) + assert.Equal("new.span.type", p[0]["type"]) + metrics := fmt.Sprintf("%v", p[0]["metrics"]) + assert.Contains(metrics, fmt.Sprintf("%s:%s", "_dd1.sr.eausr", "1")) } diff --git a/ddtrace/opentelemetry/tracer_test.go b/ddtrace/opentelemetry/tracer_test.go index e007ddf15a..e0056e143a 100644 --- a/ddtrace/opentelemetry/tracer_test.go +++ b/ddtrace/opentelemetry/tracer_test.go @@ -135,8 +135,6 @@ func TestForceFlush(t *testing.T) { } for _, tc := range testData { t.Run(fmt.Sprintf("Flush success: %t", tc.flushed), func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() tp, payloads, cleanup := mockTracerProvider(t) defer cleanup() @@ -152,10 +150,10 @@ func TestForceFlush(t *testing.T) { _, sp := tr.Start(context.Background(), "test_span") sp.End() tp.forceFlush(tc.timeOut, setFlushStatus, tc.flushFunc) - payload, err := waitForPayload(ctx, payloads) + p, err := waitForPayload(payloads) if tc.flushed { assert.NoError(err) - assert.Contains(payload, "test_span") + assert.Equal("test_span", p[0][0]["resource"]) assert.Equal(OK, flushStatus) } else { assert.Equal(ERROR, flushStatus) @@ -203,6 +201,25 @@ func TestSpanTelemetry(t *testing.T) { telemetryClient.AssertNumberOfCalls(t, "Count", 1) } +func TestConcurrentSetAttributes(_ *testing.T) { + tp := NewTracerProvider() + otel.SetTracerProvider(tp) + tr := otel.Tracer("") + + _, span := tr.Start(context.Background(), "test") + defer span.End() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + i := i + go func(val int) { + defer wg.Done() + span.SetAttributes(attribute.Float64("workerID", float64(i))) + }(i) + } +} + func BenchmarkOTelApiWithNoTags(b *testing.B) { testData := struct { env, srv, op string diff --git a/ddtrace/tracer/textmap.go b/ddtrace/tracer/textmap.go index 3b570fc8b4..2e886561b5 100644 --- a/ddtrace/tracer/textmap.go +++ b/ddtrace/tracer/textmap.go @@ -368,10 +368,11 @@ func (p *propagator) injectTextMap(spanCtx *SpanContext, writer TextMapWriter) e if ctx.origin != "" { writer.Set(originHeader, ctx.origin) } - // propagate OpenTracing baggage - for k, v := range ctx.baggage { + ctx.ForeachBaggageItem(func(k, v string) bool { + // Propagate OpenTracing baggage. writer.Set(p.cfg.BaggagePrefix+k, v) - } + return true + }) if p.cfg.MaxTagsHeaderLen <= 0 { return nil } diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 31e59a7836..4092542349 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -403,7 +403,7 @@ func (t *tracer) worker(tick <-chan time.Time) { t.statsd.Flush() t.stats.flushAndSend(time.Now(), withCurrentBucket) // TODO(x): In reality, the traceWriter.flush() call is not synchronous - // when using the agent traceWriter. However, this functionnality is used + // when using the agent traceWriter. However, this functionality is used // in Lambda so for that purpose this mechanism should suffice. done <- struct{}{} diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index af0d36dd7f..f80f1f8af1 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -942,6 +942,29 @@ func TestTracerBaggageImmutability(t *testing.T) { assert.Equal("changed!", childContext.baggage["key"]) } +func TestTracerInjectConcurrency(t *testing.T) { + tracer, _, _, stop, err := startTestTracer(t) + assert.NoError(t, err) + defer stop() + span, _ := StartSpanFromContext(context.Background(), "main") + defer span.Finish() + + var wg sync.WaitGroup + for i := 0; i < 500; i++ { + wg.Add(1) + i := i + go func(val int) { + defer wg.Done() + span.SetBaggageItem("val", fmt.Sprintf("%d", val)) + + traceContext := map[string]string{} + _ = tracer.Inject(span.Context(), TextMapCarrier(traceContext)) + }(i) + } + + wg.Wait() +} + func TestTracerSpanTags(t *testing.T) { tracer, err := newTracer() defer tracer.Stop() @@ -1652,46 +1675,56 @@ func TestTracerFlush(t *testing.T) { func TestTracerReportsHostname(t *testing.T) { const hostname = "hostname-test" - t.Run("DD_TRACE_REPORT_HOSTNAME/set", func(t *testing.T) { - t.Setenv("DD_TRACE_REPORT_HOSTNAME", "true") - - tracer, _, _, stop, err := startTestTracer(t) - assert.Nil(t, err) - defer stop() - - root := tracer.StartSpan("root") - child := tracer.StartSpan("child", ChildOf(root.Context())) - child.Finish() - root.Finish() - - assert := assert.New(t) + testReportHostnameEnabled := func(t *testing.T, name string, withComputeStats bool) { + t.Run(name, func(t *testing.T) { + t.Setenv("DD_TRACE_REPORT_HOSTNAME", "true") + t.Setenv("DD_TRACE_COMPUTE_STATS", fmt.Sprintf("%t", withComputeStats)) - name, ok := root.meta[keyHostname] - assert.True(ok) - assert.Equal(name, tracer.config.hostname) + tracer, _, _, stop, err := startTestTracer(t) + assert.Nil(t, err) + defer stop() - name, ok = child.meta[keyHostname] - assert.True(ok) - assert.Equal(name, tracer.config.hostname) - }) + root := tracer.StartSpan("root") + child := tracer.StartSpan("child", ChildOf(root.Context())) + child.Finish() + root.Finish() - t.Run("DD_TRACE_REPORT_HOSTNAME/unset", func(t *testing.T) { - tracer, _, _, stop, err := startTestTracer(t) - assert.Nil(t, err) - defer stop() - - root := tracer.StartSpan("root") - child := tracer.StartSpan("child", ChildOf(root.Context())) - child.Finish() - root.Finish() + assert := assert.New(t) - assert := assert.New(t) + name, ok := root.meta[keyHostname] + assert.True(ok) + assert.Equal(name, tracer.config.hostname) - _, ok := root.meta[keyHostname] - assert.False(ok) - _, ok = child.meta[keyHostname] - assert.False(ok) - }) + name, ok = child.meta[keyHostname] + assert.True(ok) + assert.Equal(name, tracer.config.hostname) + }) + } + testReportHostnameEnabled(t, "DD_TRACE_REPORT_HOSTNAME/set,DD_TRACE_COMPUTE_STATS/true", true) + testReportHostnameEnabled(t, "DD_TRACE_REPORT_HOSTNAME/set,DD_TRACE_COMPUTE_STATS/false", false) + + testReportHostnameDisabled := func(t *testing.T, name string, withComputeStats bool) { + t.Run(name, func(t *testing.T) { + t.Setenv("DD_TRACE_COMPUTE_STATS", fmt.Sprintf("%t", withComputeStats)) + tracer, _, _, stop, err := startTestTracer(t) + assert.Nil(t, err) + defer stop() + + root := tracer.StartSpan("root") + child := tracer.StartSpan("child", ChildOf(root.Context())) + child.Finish() + root.Finish() + + assert := assert.New(t) + + _, ok := root.meta[keyHostname] + assert.False(ok) + _, ok = child.meta[keyHostname] + assert.False(ok) + }) + } + testReportHostnameDisabled(t, "DD_TRACE_REPORT_HOSTNAME/unset,DD_TRACE_COMPUTE_STATS/true", true) + testReportHostnameDisabled(t, "DD_TRACE_REPORT_HOSTNAME/unset,DD_TRACE_COMPUTE_STATS/false", false) t.Run("WithHostname", func(t *testing.T) { tracer, _, _, stop, err := startTestTracer(t, WithHostname(hostname)) diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index 017fda9513..477a1ec0d5 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -106,7 +106,8 @@ func (h *agentTraceWriter) flush() { for attempt := 0; attempt <= h.config.sendRetries; attempt++ { size, count = p.size(), p.itemCount() log.Debug("Sending payload: size: %d traces: %d\n", size, count) - rc, err := h.config.transport.send(p) + var rc io.ReadCloser + rc, err = h.config.transport.send(p) if err == nil { log.Debug("sent traces after %d attempts", attempt+1) h.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1) diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index 09fd230f0c..ad38bbdde4 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -546,6 +546,8 @@ func TestExecutionTraceMisconfiguration(t *testing.T) { } func TestExecutionTraceRandom(t *testing.T) { + t.Skip("flaky test, see: https://github.com/DataDog/dd-trace-go/issues/2529") + collectTraces := func(t *testing.T, profilePeriod, tracePeriod time.Duration, count int) int { t.Setenv("DD_PROFILING_EXECUTION_TRACE_ENABLED", "true") t.Setenv("DD_PROFILING_EXECUTION_TRACE_PERIOD", tracePeriod.String()) diff --git a/v2/contrib/labstack/echo.v4/echotrace.go b/v2/contrib/labstack/echo.v4/echotrace.go index e8d560b722..ff895627a7 100644 --- a/v2/contrib/labstack/echo.v4/echotrace.go +++ b/v2/contrib/labstack/echo.v4/echotrace.go @@ -88,7 +88,7 @@ func Middleware(opts ...Option) echo.MiddlewareFunc { } // serve the request to the next middleware err := next(c) - if err != nil { + if err != nil && !shouldIgnoreError(cfg, err) { // invokes the registered HTTP error handler c.Error(err) @@ -108,12 +108,16 @@ func Middleware(opts ...Option) echo.MiddlewareFunc { } } else if status := c.Response().Status; status > 0 { if cfg.isStatusError(status) { - finishOpts = append(finishOpts, tracer.WithError(fmt.Errorf("%d: %s", status, http.StatusText(status)))) + if statusErr := errorFromStatusCode(status); !shouldIgnoreError(cfg, statusErr) { + finishOpts = append(finishOpts, tracer.WithError(statusErr)) + } } span.SetTag(ext.HTTPCode, strconv.Itoa(status)) } else { if cfg.isStatusError(200) { - finishOpts = append(finishOpts, tracer.WithError(fmt.Errorf("%d: %s", 200, http.StatusText(200)))) + if statusErr := errorFromStatusCode(200); !shouldIgnoreError(cfg, statusErr) { + finishOpts = append(finishOpts, tracer.WithError(statusErr)) + } } span.SetTag(ext.HTTPCode, "200") } @@ -121,3 +125,11 @@ func Middleware(opts ...Option) echo.MiddlewareFunc { } } } + +func errorFromStatusCode(statusCode int) error { + return fmt.Errorf("%d: %s", statusCode, http.StatusText(statusCode)) +} + +func shouldIgnoreError(cfg *config, err error) bool { + return cfg.errCheck != nil && !cfg.errCheck(err) +} diff --git a/v2/contrib/labstack/echo.v4/echotrace_test.go b/v2/contrib/labstack/echo.v4/echotrace_test.go index 5f0dae48ff..2a289a047d 100644 --- a/v2/contrib/labstack/echo.v4/echotrace_test.go +++ b/v2/contrib/labstack/echo.v4/echotrace_test.go @@ -589,6 +589,128 @@ func TestWithHeaderTags(t *testing.T) { }) } +func TestWithErrorCheck(t *testing.T) { + tests := []struct { + name string + err error + opts []Option + wantErr error + }{ + { + name: "ignore-4xx-404-error", + err: &echo.HTTPError{ + Code: http.StatusNotFound, + Message: "not found", + Internal: errors.New("not found"), + }, + opts: []Option{ + WithErrorCheck(func(err error) bool { + var he *echo.HTTPError + if errors.As(err, &he) { + // do not tag 4xx errors + return !(he.Code < 500 && he.Code >= 400) + } + return true + }), + }, + wantErr: nil, // 404 is returned, hence not tagged + }, + { + name: "ignore-4xx-500-error", + err: &echo.HTTPError{ + Code: http.StatusInternalServerError, + Message: "internal error", + Internal: errors.New("internal error"), + }, + opts: []Option{ + WithErrorCheck(func(err error) bool { + var he *echo.HTTPError + if errors.As(err, &he) { + // do not tag 4xx errors + return !(he.Code < 500 && he.Code >= 400) + } + return true + }), + }, + wantErr: &echo.HTTPError{ + Code: http.StatusInternalServerError, + Message: "internal error", + Internal: errors.New("internal error"), + }, // this is 500, tagged + }, + { + name: "ignore-none", + err: errors.New("any error"), + opts: []Option{ + WithErrorCheck(func(err error) bool { + return true + }), + }, + wantErr: errors.New("any error"), + }, + { + name: "ignore-all", + err: errors.New("any error"), + opts: []Option{ + WithErrorCheck(func(err error) bool { + return false + }), + }, + wantErr: nil, + }, + { + // withErrorCheck also runs for the errors created from the WithStatusCheck option. + name: "ignore-errors-from-status-check", + err: &echo.HTTPError{ + Code: http.StatusNotFound, + Message: "internal error", + Internal: errors.New("internal error"), + }, + opts: []Option{ + WithStatusCheck(func(statusCode int) bool { + return statusCode == http.StatusNotFound + }), + WithErrorCheck(func(err error) bool { + return false + }), + }, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + router := echo.New() + router.Use(Middleware(tt.opts...)) + var called, traced bool + + // always return the specified error + router.GET("/err", func(c echo.Context) error { + _, traced = tracer.SpanFromContext(c.Request().Context()) + called = true + return tt.err + }) + r := httptest.NewRequest(http.MethodGet, "/err", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, r) + + assert.True(t, called) + assert.True(t, traced) + spans := mt.FinishedSpans() + require.Len(t, spans, 1) // fail at once if there is no span + + span := spans[0] + if tt.wantErr == nil { + assert.NotContains(t, span.Tags(), ext.Error) + return + } + assert.Equal(t, tt.wantErr, span.Tag(ext.Error)) + }) + } +} + func TestWithCustomTags(t *testing.T) { assert := assert.New(t) mt := mocktracer.Start() diff --git a/v2/contrib/labstack/echo.v4/option.go b/v2/contrib/labstack/echo.v4/option.go index 706bd119ca..90bf1fb260 100644 --- a/v2/contrib/labstack/echo.v4/option.go +++ b/v2/contrib/labstack/echo.v4/option.go @@ -27,6 +27,7 @@ type config struct { isStatusError func(statusCode int) bool translateError func(err error) (*echo.HTTPError, bool) headerTags *internal.LockMap + errCheck func(error) bool tags map[string]interface{} } @@ -138,6 +139,14 @@ func WithHeaderTags(headers []string) OptionFn { } } +// WithErrorCheck sets the func which determines if err would be ignored (if it returns true, the error is not tagged). +// This function also checks the errors created from the WithStatusCheck option. +func WithErrorCheck(errCheck func(error) bool) Option { + return func(cfg *config) { + cfg.errCheck = errCheck + } +} + // WithCustomTag will attach the value to the span tagged by the key. Standard // span tags cannot be replaced. func WithCustomTag(key string, value interface{}) OptionFn {