Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/opentelemetry: add RWMutex to handle concurrent calls to setters #2521

Merged
merged 9 commits into from
Jan 30, 2024
12 changes: 11 additions & 1 deletion ddtrace/opentelemetry/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"strconv"
"strings"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
Expand All @@ -25,7 +26,8 @@ import (
var _ oteltrace.Span = (*span)(nil)

type span struct {
noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations
noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations
mu sync.RWMutex `msg:"-"` // all fields are protected by this RWMutex
DD tracer.Span
finished bool
attributes map[string]interface{}
Expand All @@ -38,10 +40,14 @@ type span struct {
func (s *span) TracerProvider() oteltrace.TracerProvider { return s.oteltracer.provider }

func (s *span) SetName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.attributes[ext.SpanName] = strings.ToLower(name)
}

func (s *span) End(options ...oteltrace.SpanEndOption) {
s.mu.Lock()
defer s.mu.Unlock()
if s.finished {
return
}
Expand Down Expand Up @@ -157,6 +163,8 @@ type statusInfo struct {
// value before (OK > Error > Unset), the code will not be changed.
// The code and description are set once when the span is finished.
func (s *span) SetStatus(code otelcodes.Code, description string) {
s.mu.Lock()
defer s.mu.Unlock()
if code >= s.statusInfo.code {
s.statusInfo = statusInfo{code, description}
}
Expand All @@ -175,6 +183,8 @@ func (s *span) SetStatus(code otelcodes.Code, description string) {
// The list of reserved tags might be extended in the future.
// Any other non-reserved tags will be set as provided.
func (s *span) SetAttributes(kv ...attribute.KeyValue) {
s.mu.Lock()
defer s.mu.Unlock()
for _, kv := range kv {
if k, v := toReservedAttributes(string(kv.Key), kv.Value); k != "" {
s.attributes[k] = v
Expand Down
19 changes: 19 additions & 0 deletions ddtrace/opentelemetry/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,25 @@ func TestSpanTelemetry(t *testing.T) {
telemetryClient.AssertNumberOfCalls(t, "Count", 1)
}

func TestConcurrentSetAttributes(_ *testing.T) {
tp := NewTracerProvider()
otel.SetTracerProvider(tp)
tr := otel.Tracer("")

_, span := tr.Start(context.Background(), "test")
defer span.End()

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func(val int) {
defer wg.Done()
span.SetAttributes(attribute.Float64("workerID", float64(i)))
}(i)
}
}

func BenchmarkOTelApiWithNoTags(b *testing.B) {
testData := struct {
env, srv, op string
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,11 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr
if ctx.origin != "" {
writer.Set(originHeader, ctx.origin)
}
// propagate OpenTracing baggage
for k, v := range ctx.baggage {
ctx.ForeachBaggageItem(func(k, v string) bool {
// Propagate OpenTracing baggage.
writer.Set(p.cfg.BaggagePrefix+k, v)
}
return true
})
if p.cfg.MaxTagsHeaderLen <= 0 {
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,28 @@ func TestTracerBaggageImmutability(t *testing.T) {
assert.Equal("changed!", childContext.baggage["key"])
}

func TestTracerInjectConcurrency(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
span, _ := StartSpanFromContext(context.Background(), "main")
defer span.Finish()

var wg sync.WaitGroup
for i := 0; i < 500; i++ {
wg.Add(1)
i := i
go func(val int) {
defer wg.Done()
span.SetBaggageItem("val", fmt.Sprintf("%d", val))

traceContext := map[string]string{}
_ = tracer.Inject(span.Context(), TextMapCarrier(traceContext))
}(i)
}

wg.Wait()
}

func TestTracerSpanTags(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
Expand Down