Skip to content

Commit

Permalink
ddtrace/tracer: fix a race condition happening with asynchronous flus…
Browse files Browse the repository at this point in the history
…hes (#440)

This change fixes a race condition that would happen on rare occasions
when using the priority sampler and flushing asynchronous child spans of
a trace that would finish later than the root, usually at the same time
when the root was being flushed.
  • Loading branch information
gbbr authored May 17, 2019
1 parent f3fe6e5 commit 746d442
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 9 deletions.
19 changes: 11 additions & 8 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *spanContext) baggageItem(key string) string {
}

// finish marks this span as finished in the trace.
func (c *spanContext) finish() { c.trace.finishedOne() }
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// trace contains shared context information about a trace, such as sampling
// priority, the root reference and a buffer of the spans which are part of the
Expand Down Expand Up @@ -177,9 +177,8 @@ func (t *trace) setSamplingPriorityLocked(p float64) {
return
}
if t.root == nil {
// this trace is part of a context that doesn't belong to a
// trace yet, meaning that the sampling priority is locked
// by a distributed trace.
// this trace is distributed (no local root); modifications
// to the sampling priority are not allowed.
t.locked = true
}
if t.priority == nil {
Expand Down Expand Up @@ -215,7 +214,7 @@ func (t *trace) push(sp *span) {
// finishedOne aknowledges that another span in the trace has finished, and checks
// if the trace is complete, in which case it calls the onFinish function. It uses
// the given priority, if non-nil, to mark the root span.
func (t *trace) finishedOne() {
func (t *trace) finishedOne(s *span) {
t.mu.Lock()
defer t.mu.Unlock()
if t.full {
Expand All @@ -226,14 +225,18 @@ func (t *trace) finishedOne() {
return
}
t.finished++
if s == t.root && t.priority != nil {
// after the root has finished we lock down the priority;
// we won't be able to make changes to a span after finishing
// without causing a race condition.
t.root.Metrics[keySamplingPriority] = *t.priority
t.locked = true
}
if len(t.spans) != t.finished {
return
}
if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have a tracer that can receive completed traces.
if t.priority != nil {
t.root.Metrics[keySamplingPriority] = *t.priority
}
tr.pushTrace(t.spans)
}
t.spans = nil
Expand Down
52 changes: 52 additions & 0 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tracer

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -40,6 +42,56 @@ func TestNewSpanContextPushError(t *testing.T) {
}
}

func TestAsyncSpanRace(t *testing.T) {
// This tests a regression where asynchronously finishing spans would
// modify a flushing root's sampling priority.
_, _, stop := startTestTracer()
defer stop()

for i := 0; i < 100; i++ {
// The test has 100 iterations because it is not easy to reproduce the race.
t.Run("", func(t *testing.T) {
root, ctx := StartSpanFromContext(context.Background(), "root", Tag(ext.SamplingPriority, ext.PriorityUserKeep))
var wg sync.WaitGroup
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-done:
root.Finish()
for i := 0; i < 500; i++ {
for range root.(*span).Metrics {
// this range simulates iterating over the metrics map
// as we do when encoding msgpack upon flushing.
}
}
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-done:
for i := 0; i < 50; i++ {
// to trigger the bug, the child should be created after the root was finished,
// as its being flushed
child, _ := StartSpanFromContext(ctx, "child", Tag(ext.SamplingPriority, ext.PriorityUserKeep))
child.Finish()
}
return
}
}()
// closing will attempt trigger the two goroutines at approximately the same time.
close(done)
wg.Wait()
})
}

// Test passes if no panic occurs while running.
}

func TestSpanTracePushOne(t *testing.T) {
defer setupteardown(2, 5)()

Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var (
// TODO(gbbr): find a more effective way to keep this up to date,
// e.g. via `go generate`
tracerVersion = "v1.10.0"
tracerVersion = "v1.13.1"

// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
Expand Down

0 comments on commit 746d442

Please sign in to comment.