Skip to content

Commit

Permalink
ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health…
Browse files Browse the repository at this point in the history
… metric (#3019)
  • Loading branch information
mtoffl01 authored and e-n-0 committed Dec 23, 2024
1 parent 7ea0a8e commit 7118aeb
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
26 changes: 25 additions & 1 deletion ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,36 @@ func TestReportHealthMetrics(t *testing.T) {

tracer.StartSpan("operation").Finish()
flush(1)
tg.Wait(assert, 3, 10*time.Second)
tg.Wait(assert, 4, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(1), counts["datadog.tracer.spans_started"])
assert.Equal(int64(1), counts["datadog.tracer.spans_finished"])
assert.Equal(int64(0), counts["datadog.tracer.traces_dropped"])
assert.Equal(int64(1), counts["datadog.tracer.queue.enqueued.traces"])
}

func TestEnqueuedTracesHealthMetric(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient

defer func(old time.Duration) { statsInterval = old }(statsInterval)
statsInterval = time.Nanosecond

tracer, _, flush, stop := startTestTracer(t, withStatsdClient(&tg))
defer stop()

for i := 0; i < 3; i++ {
tracer.StartSpan("operation").Finish()
}
flush(3)
tg.Wait(assert, 1, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(3), counts["datadog.tracer.queue.enqueued.traces"])
w, ok := tracer.traceWriter.(*agentTraceWriter)
assert.True(ok)
assert.Equal(uint32(0), w.tracesQueued)
}

func TestTracerMetrics(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions ddtrace/tracer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
Expand Down Expand Up @@ -50,6 +51,8 @@ type agentTraceWriter struct {

// statsd is used to send metrics
statsd globalinternal.StatsdClient

tracesQueued uint32
}

func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter {
Expand All @@ -67,6 +70,7 @@ func (h *agentTraceWriter) add(trace []*span) {
h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1)
log.Error("Error encoding msgpack: %v", err)
}
atomic.AddUint32(&h.tracesQueued, 1) // TODO: This does not differentiate between complete traces and partial chunks
if h.payload.size() > payloadSizeLimit {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1)
h.flush()
Expand Down Expand Up @@ -94,6 +98,7 @@ func (h *agentTraceWriter) flush() {
// collection to avoid a memory leak when references to this object
// may still be kept by faulty transport implementations or the
// standard library. See dd-trace-go#976
h.statsd.Count("datadog.tracer.queue.enqueued.traces", int64(atomic.SwapUint32(&h.tracesQueued, 0)), nil, 1)
p.clear()

<-h.climit
Expand Down
10 changes: 6 additions & 4 deletions ddtrace/tracer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,14 @@ func TestTraceWriterFlushRetries(t *testing.T) {
}

sentCounts := map[string]int64{
"datadog.tracer.decode_error": 1,
"datadog.tracer.flush_bytes": 185,
"datadog.tracer.flush_traces": 1,
"datadog.tracer.decode_error": 1,
"datadog.tracer.flush_bytes": 185,
"datadog.tracer.flush_traces": 1,
"datadog.tracer.queue.enqueued.traces": 1,
}
droppedCounts := map[string]int64{
"datadog.tracer.traces_dropped": 1,
"datadog.tracer.queue.enqueued.traces": 1,
"datadog.tracer.traces_dropped": 1,
}

ss := []*span{makeSpan(0)}
Expand Down

0 comments on commit 7118aeb

Please sign in to comment.