Skip to content

Commit

Permalink
ddtrace/tracer: add support for DD_PROPAGATION_STYLE_INJECT + EXTRACT (
Browse files Browse the repository at this point in the history
…#417)

This permits using B3 headers for injecting and extracting traces when propagated between different programs.
It is backward-compatible with existing behavior, but provides support for either Datadog, B3 or both headers when they are propagated.

The environment variables DD_PROPAGATION_STYLE_INJECT and DD_PROPAGATION_STYLE_EXTRACT support values that are comma separated (current valid values are: datadog and b3). They are case insensitive.
  • Loading branch information
cgilmour authored and gbbr committed Mar 26, 2019
1 parent 29c19af commit b58d56c
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 10 deletions.
171 changes: 161 additions & 10 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package tracer

import (
"net/http"
"os"
"strconv"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)

// HTTPHeadersCarrier wraps an http.Header as a TextMapWriter and TextMapReader, allowing
Expand Down Expand Up @@ -54,6 +56,11 @@ func (c TextMapCarrier) ForeachKey(handler func(key, val string) error) error {
return nil
}

const (
headerPropagationStyleInject = "DD_PROPAGATION_STYLE_INJECT"
headerPropagationStyleExtract = "DD_PROPAGATION_STYLE_EXTRACT"
)

const (
// DefaultBaggageHeaderPrefix specifies the prefix that will be used in
// HTTP headers or text maps to prefix baggage keys.
Expand Down Expand Up @@ -114,21 +121,86 @@ func NewPropagator(cfg *PropagatorConfig) Propagator {
if cfg.PriorityHeader == "" {
cfg.PriorityHeader = DefaultPriorityHeader
}
return &propagator{cfg}
return &chainedPropagator{
injectors: getPropagators(cfg, headerPropagationStyleInject),
extractors: getPropagators(cfg, headerPropagationStyleExtract),
}
}

// propagator implements a propagator which uses TextMap internally.
// It propagates the trace and span IDs, as well as the baggage from the
// context.
type propagator struct{ cfg *PropagatorConfig }
// chainedPropagator implements Propagator and applies a list of injectors and extractors.
// When injecting, all injectors are called to propagate the span context.
// When extracting, it tries each extractor, selecting the first successful one.
type chainedPropagator struct {
injectors []Propagator
extractors []Propagator
}

// getPropagators returns a list of propagators based on the list found in the
// given environment variable. If the list doesn't contain a value or has invalid
// values, the default propagator will be returned.
func getPropagators(cfg *PropagatorConfig, env string) []Propagator {
dd := &propagator{cfg}
ps := os.Getenv(env)
if ps == "" {
return []Propagator{dd}
}
var list []Propagator
for _, v := range strings.Split(ps, ",") {
switch strings.ToLower(v) {
case "datadog":
list = append(list, dd)
case "b3":
list = append(list, &propagatorB3{})
default:
// TODO(cgilmour): consider logging something for invalid/unknown styles.
}
}
if len(list) == 0 {
// return the default
return []Propagator{dd}
}
return list
}

// Inject defines the Propagator to propagate SpanContext data
// out of the current process. The implementation propagates the
// TraceID and the current active SpanID, as well as the Span baggage.
func (p *chainedPropagator) Inject(spanCtx ddtrace.SpanContext, carrier interface{}) error {
for _, v := range p.injectors {
err := v.Inject(spanCtx, carrier)
if err != nil {
return err
}
}
return nil
}

// Extract implements Propagator.
func (p *chainedPropagator) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
for _, v := range p.extractors {
ctx, err := v.Extract(carrier)
if ctx != nil {
// first extractor returns
return ctx, nil
}
if err == ErrSpanContextNotFound {
continue
}
return nil, err
}
return nil, ErrSpanContextNotFound
}

// propagator implements Propagator and injects/extracts span contexts
// using datadog headers. Only TextMap carriers are supported.
type propagator struct {
cfg *PropagatorConfig
}

func (p *propagator) Inject(spanCtx ddtrace.SpanContext, carrier interface{}) error {
switch v := carrier.(type) {
switch c := carrier.(type) {
case TextMapWriter:
return p.injectTextMap(spanCtx, v)
return p.injectTextMap(spanCtx, c)
default:
return ErrInvalidCarrier
}
Expand All @@ -155,11 +227,10 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr
return nil
}

// Extract implements Propagator.
func (p *propagator) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
switch v := carrier.(type) {
switch c := carrier.(type) {
case TextMapReader:
return p.extractTextMap(v)
return p.extractTextMap(c)
default:
return nil, ErrInvalidCarrier
}
Expand Down Expand Up @@ -204,3 +275,83 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
}
return &ctx, nil
}

const (
b3TraceIDHeader = "x-b3-traceid"
b3SpanIDHeader = "x-b3-spanid"
b3SampledHeader = "x-b3-sampled"
)

// propagatorB3 implements Propagator and injects/extracts span contexts
// using B3 headers. Only TextMap carriers are supported.
type propagatorB3 struct{}

func (p *propagatorB3) Inject(spanCtx ddtrace.SpanContext, carrier interface{}) error {
switch c := carrier.(type) {
case TextMapWriter:
return p.injectTextMap(spanCtx, c)
default:
return ErrInvalidCarrier
}
}

func (*propagatorB3) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWriter) error {
ctx, ok := spanCtx.(*spanContext)
if !ok || ctx.traceID == 0 || ctx.spanID == 0 {
return ErrInvalidSpanContext
}
writer.Set(b3TraceIDHeader, strconv.FormatUint(ctx.traceID, 16))
writer.Set(b3SpanIDHeader, strconv.FormatUint(ctx.spanID, 16))
if ctx.hasSamplingPriority() {
if ctx.samplingPriority() >= ext.PriorityAutoKeep {
writer.Set(b3SampledHeader, "1")
} else {
writer.Set(b3SampledHeader, "0")
}
}
return nil
}

func (p *propagatorB3) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
switch c := carrier.(type) {
case TextMapReader:
return p.extractTextMap(c)
default:
return nil, ErrInvalidCarrier
}
}

func (*propagatorB3) extractTextMap(reader TextMapReader) (ddtrace.SpanContext, error) {
var ctx spanContext
err := reader.ForeachKey(func(k, v string) error {
var err error
key := strings.ToLower(k)
switch key {
case b3TraceIDHeader:
ctx.traceID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return ErrSpanContextCorrupted
}
case b3SpanIDHeader:
ctx.spanID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return ErrSpanContextCorrupted
}
case b3SampledHeader:
priority, err := strconv.Atoi(v)
if err != nil {
return ErrSpanContextCorrupted
}
ctx.setSamplingPriority(priority)
default:
}
return nil
})
if err != nil {
return nil, err
}
if ctx.traceID == 0 || ctx.spanID == 0 {
return nil, ErrSpanContextNotFound
}
return &ctx, nil
}
83 changes: 83 additions & 0 deletions ddtrace/tracer/textmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tracer
import (
"errors"
"net/http"
"os"
"strconv"
"testing"

Expand Down Expand Up @@ -195,3 +196,85 @@ func TestTextMapPropagatorInjectExtract(t *testing.T) {
assert.Equal(xctx.priority, ctx.priority)
assert.Equal(xctx.hasPriority, ctx.hasPriority)
}

func TestB3(t *testing.T) {
t.Run("inject", func(t *testing.T) {
os.Setenv("DD_PROPAGATION_STYLE_INJECT", "B3")
defer os.Unsetenv("DD_PROPAGATION_STYLE_INJECT")

tracer := newTracer()
root := tracer.StartSpan("web.request").(*span)
root.SetTag(ext.SamplingPriority, -1)
root.SetBaggageItem("item", "x")
ctx := root.Context().(*spanContext)
headers := TextMapCarrier(map[string]string{})
err := tracer.Inject(ctx, headers)

assert := assert.New(t)
assert.Nil(err)

assert.Equal(headers[b3TraceIDHeader], strconv.FormatUint(root.TraceID, 16))
assert.Equal(headers[b3SpanIDHeader], strconv.FormatUint(root.SpanID, 16))
assert.Equal(headers[b3SampledHeader], "0")
})

t.Run("extract", func(t *testing.T) {
os.Setenv("DD_PROPAGATION_STYLE_EXTRACT", "b3")
defer os.Unsetenv("DD_PROPAGATION_STYLE_EXTRACT")

headers := TextMapCarrier(map[string]string{
b3TraceIDHeader: "1",
b3SpanIDHeader: "1",
})

tracer := newTracer()
assert := assert.New(t)
ctx, err := tracer.Extract(headers)
assert.Nil(err)
sctx, ok := ctx.(*spanContext)
assert.True(ok)

assert.Equal(sctx.traceID, uint64(1))
assert.Equal(sctx.spanID, uint64(1))
})

t.Run("multiple", func(t *testing.T) {
os.Setenv("DD_PROPAGATION_STYLE_EXTRACT", "Datadog,B3")
defer os.Unsetenv("DD_PROPAGATION_STYLE_EXTRACT")

b3Headers := TextMapCarrier(map[string]string{
b3TraceIDHeader: "1",
b3SpanIDHeader: "1",
b3SampledHeader: "1",
})

tracer := newTracer()
assert := assert.New(t)

ctx, err := tracer.Extract(b3Headers)
assert.Nil(err)
sctx, ok := ctx.(*spanContext)
assert.True(ok)

assert.Equal(sctx.traceID, uint64(1))
assert.Equal(sctx.spanID, uint64(1))
assert.True(sctx.hasSamplingPriority())
assert.Equal(sctx.samplingPriority(), 1)

ddHeaders := TextMapCarrier(map[string]string{
DefaultTraceIDHeader: "2",
DefaultParentIDHeader: "2",
DefaultPriorityHeader: "2",
})

ctx, err = tracer.Extract(ddHeaders)
assert.Nil(err)
sctx, ok = ctx.(*spanContext)
assert.True(ok)

assert.Equal(sctx.traceID, uint64(2))
assert.Equal(sctx.spanID, uint64(2))
assert.True(sctx.hasSamplingPriority())
assert.Equal(sctx.samplingPriority(), 2)
})
}

0 comments on commit b58d56c

Please sign in to comment.