diff --git a/ddtrace/tracer/metrics_test.go b/ddtrace/tracer/metrics_test.go index 4aad5e373c..64548fce86 100644 --- a/ddtrace/tracer/metrics_test.go +++ b/ddtrace/tracer/metrics_test.go @@ -55,12 +55,36 @@ func TestReportHealthMetrics(t *testing.T) { tracer.StartSpan("operation").Finish() flush(1) - tg.Wait(assert, 3, 10*time.Second) + tg.Wait(assert, 4, 10*time.Second) counts := tg.Counts() assert.Equal(int64(1), counts["datadog.tracer.spans_started"]) assert.Equal(int64(1), counts["datadog.tracer.spans_finished"]) assert.Equal(int64(0), counts["datadog.tracer.traces_dropped"]) + assert.Equal(int64(1), counts["datadog.tracer.queue.enqueued.traces"]) +} + +func TestEnqueuedTracesHealthMetric(t *testing.T) { + assert := assert.New(t) + var tg statsdtest.TestStatsdClient + + defer func(old time.Duration) { statsInterval = old }(statsInterval) + statsInterval = time.Nanosecond + + tracer, _, flush, stop := startTestTracer(t, withStatsdClient(&tg)) + defer stop() + + for i := 0; i < 3; i++ { + tracer.StartSpan("operation").Finish() + } + flush(3) + tg.Wait(assert, 1, 10*time.Second) + + counts := tg.Counts() + assert.Equal(int64(3), counts["datadog.tracer.queue.enqueued.traces"]) + w, ok := tracer.traceWriter.(*agentTraceWriter) + assert.True(ok) + assert.Equal(uint32(0), w.tracesQueued) } func TestTracerMetrics(t *testing.T) { diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index 5ceadcb2e6..cff333c8c4 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -14,6 +14,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal" @@ -50,6 +51,8 @@ type agentTraceWriter struct { // statsd is used to send metrics statsd globalinternal.StatsdClient + + tracesQueued uint32 } func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter { @@ -67,6 +70,7 @@ func (h *agentTraceWriter) add(trace []*span) { h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1) log.Error("Error encoding msgpack: %v", err) } + atomic.AddUint32(&h.tracesQueued, 1) // TODO: This does not differentiate between complete traces and partial chunks if h.payload.size() > payloadSizeLimit { h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1) h.flush() @@ -94,6 +98,7 @@ func (h *agentTraceWriter) flush() { // collection to avoid a memory leak when references to this object // may still be kept by faulty transport implementations or the // standard library. See dd-trace-go#976 + h.statsd.Count("datadog.tracer.queue.enqueued.traces", int64(atomic.SwapUint32(&h.tracesQueued, 0)), nil, 1) p.clear() <-h.climit diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index bcdf529a58..224ae2a1ca 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -386,12 +386,14 @@ func TestTraceWriterFlushRetries(t *testing.T) { } sentCounts := map[string]int64{ - "datadog.tracer.decode_error": 1, - "datadog.tracer.flush_bytes": 185, - "datadog.tracer.flush_traces": 1, + "datadog.tracer.decode_error": 1, + "datadog.tracer.flush_bytes": 185, + "datadog.tracer.flush_traces": 1, + "datadog.tracer.queue.enqueued.traces": 1, } droppedCounts := map[string]int64{ - "datadog.tracer.traces_dropped": 1, + "datadog.tracer.queue.enqueued.traces": 1, + "datadog.tracer.traces_dropped": 1, } ss := []*span{makeSpan(0)}