From 00d0207570ddf0e40538425c4b2cac39951913a9 Mon Sep 17 00:00:00 2001 From: Nicolas Martyanoff Date: Tue, 3 Jan 2017 17:36:17 +0100 Subject: [PATCH 1/2] add support for services - Add (*Tracer).SetServiceInfo(). - The tracer regularly flushes, sending services to the API if they have been modified. --- .../gin-gonic/gintrace/gintrace_test.go | 18 ++- .../contrib/gorilla/muxtrace/muxtrace_test.go | 18 ++- tracer/contrib/tracegrpc/grpc_test.go | 18 ++- tracer/encoder.go | 32 +++-- tracer/encoder_test.go | 4 +- tracer/tracer.go | 121 ++++++++++++++++-- tracer/tracer_test.go | 56 +++++++- tracer/transport.go | 70 ++++++++-- tracer/transport_test.go | 52 +++++++- 9 files changed, 326 insertions(+), 63 deletions(-) diff --git a/tracer/contrib/gin-gonic/gintrace/gintrace_test.go b/tracer/contrib/gin-gonic/gintrace/gintrace_test.go index 91c2ab59d4..07602f982d 100644 --- a/tracer/contrib/gin-gonic/gintrace/gintrace_test.go +++ b/tracer/contrib/gin-gonic/gintrace/gintrace_test.go @@ -65,7 +65,7 @@ func TestTrace200(t *testing.T) { assert.Equal(response.StatusCode, 200) // verify traces look good - assert.Nil(testTracer.Flush()) + assert.Nil(testTracer.FlushTraces()) traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -109,7 +109,7 @@ func TestDisabled(t *testing.T) { assert.Equal(response.StatusCode, 200) // verify traces look good - testTracer.Flush() + testTracer.FlushTraces() spans := testTransport.Traces() assert.Len(spans, 0) } @@ -134,7 +134,7 @@ func TestError(t *testing.T) { assert.Equal(response.StatusCode, 500) // verify the errors and status are correct - testTracer.Flush() + testTracer.FlushTraces() traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -176,7 +176,7 @@ func TestHTML(t *testing.T) { assert.Equal("hello world", w.Body.String()) // verify the errors and status are correct - testTracer.Flush() + testTracer.FlushTraces() traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -220,14 +220,20 @@ func getTestTracer() (*tracer.Tracer, *dummyTransport) { // dummyTransport is a transport that just buffers spans and encoding type dummyTransport struct { - traces [][]*tracer.Span + traces [][]*tracer.Span + services map[string]tracer.Service } -func (t *dummyTransport) Send(traces [][]*tracer.Span) (*http.Response, error) { +func (t *dummyTransport) SendTraces(traces [][]*tracer.Span) (*http.Response, error) { t.traces = append(t.traces, traces...) return nil, nil } +func (t *dummyTransport) SendServices(services map[string]tracer.Service) (*http.Response, error) { + t.services = services + return nil, nil +} + func (t *dummyTransport) Traces() [][]*tracer.Span { traces := t.traces t.traces = nil diff --git a/tracer/contrib/gorilla/muxtrace/muxtrace_test.go b/tracer/contrib/gorilla/muxtrace/muxtrace_test.go index bdf20c289b..d751dc6c8f 100644 --- a/tracer/contrib/gorilla/muxtrace/muxtrace_test.go +++ b/tracer/contrib/gorilla/muxtrace/muxtrace_test.go @@ -33,7 +33,7 @@ func TestMuxTracerDisabled(t *testing.T) { assert.Equal(writer.Body.String(), "disabled!") // assert nothing was traced. - assert.Nil(testTracer.Flush()) + assert.Nil(testTracer.FlushTraces()) traces := testTransport.Traces() assert.Len(traces, 0) } @@ -51,7 +51,7 @@ func TestMuxTracerSubrequest(t *testing.T) { assert.Equal(writer.Body.String(), "200!") // ensure properly traced - assert.Nil(tracer.Flush()) + assert.Nil(tracer.FlushTraces()) traces := transport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -81,7 +81,7 @@ func TestMuxTracer200(t *testing.T) { assert.Equal(writer.Body.String(), "200!") // ensure properly traced - assert.Nil(tracer.Flush()) + assert.Nil(tracer.FlushTraces()) traces := transport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -110,7 +110,7 @@ func TestMuxTracer500(t *testing.T) { assert.Equal(writer.Body.String(), "500!\n") // ensure properly traced - assert.Nil(tracer.Flush()) + assert.Nil(tracer.FlushTraces()) traces := transport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -177,14 +177,20 @@ func getTestTracer(service string) (*tracer.Tracer, *dummyTransport, *MuxTracer) // dummyTransport is a transport that just buffers spans and encoding type dummyTransport struct { - traces [][]*tracer.Span + traces [][]*tracer.Span + services map[string]tracer.Service } -func (t *dummyTransport) Send(traces [][]*tracer.Span) (*http.Response, error) { +func (t *dummyTransport) SendTraces(traces [][]*tracer.Span) (*http.Response, error) { t.traces = append(t.traces, traces...) return nil, nil } +func (t *dummyTransport) SendServices(services map[string]tracer.Service) (*http.Response, error) { + t.services = services + return nil, nil +} + func (t *dummyTransport) Traces() [][]*tracer.Span { traces := t.traces t.traces = nil diff --git a/tracer/contrib/tracegrpc/grpc_test.go b/tracer/contrib/tracegrpc/grpc_test.go index e0ef72893a..d455008916 100644 --- a/tracer/contrib/tracegrpc/grpc_test.go +++ b/tracer/contrib/tracegrpc/grpc_test.go @@ -38,7 +38,7 @@ func TestClient(t *testing.T) { span.Finish() assert.Equal(resp.Message, "passed") - testTracer.Flush() + testTracer.FlushTraces() traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -73,7 +73,7 @@ func TestDisabled(t *testing.T) { resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "disabled"}) assert.Nil(err) assert.Equal(resp.Message, "disabled") - assert.Nil(testTracer.Flush()) + assert.Nil(testTracer.FlushTraces()) traces := testTransport.Traces() assert.Nil(traces) } @@ -93,7 +93,7 @@ func TestChild(t *testing.T) { resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "child"}) assert.Nil(err) assert.Equal(resp.Message, "child") - assert.Nil(testTracer.Flush()) + assert.Nil(testTracer.FlushTraces()) traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -127,7 +127,7 @@ func TestPass(t *testing.T) { resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) assert.Nil(err) assert.Equal(resp.Message, "passed") - assert.Nil(testTracer.Flush()) + assert.Nil(testTracer.FlushTraces()) traces := testTransport.Traces() assert.Len(traces, 1) spans := traces[0] @@ -233,14 +233,20 @@ func getTestTracer() (*tracer.Tracer, *dummyTransport) { // dummyTransport is a transport that just buffers spans and encoding type dummyTransport struct { - traces [][]*tracer.Span + traces [][]*tracer.Span + services map[string]tracer.Service } -func (t *dummyTransport) Send(traces [][]*tracer.Span) (*http.Response, error) { +func (t *dummyTransport) SendTraces(traces [][]*tracer.Span) (*http.Response, error) { t.traces = append(t.traces, traces...) return nil, nil } +func (t *dummyTransport) SendServices(services map[string]tracer.Service) (*http.Response, error) { + t.services = services + return nil, nil +} + func (t *dummyTransport) Traces() [][]*tracer.Span { traces := t.traces t.traces = nil diff --git a/tracer/encoder.go b/tracer/encoder.go index eee6098896..df11193de0 100644 --- a/tracer/encoder.go +++ b/tracer/encoder.go @@ -7,11 +7,11 @@ import ( "github.com/ugorji/go/codec" ) -// Encoder is a generic interface that expects an Encode() method -// for the encoding process, and a Read() method that will be used -// by the http handler +// Encoder is a generic interface that expects encoding methods for traces and +// services, and a Read() method that will be used by the http handler type Encoder interface { - Encode(traces [][]*Span) error + EncodeTraces(traces [][]*Span) error + EncodeServices(services map[string]Service) error Read(p []byte) (int, error) ContentType() string } @@ -36,13 +36,19 @@ func newMsgpackEncoder() *msgpackEncoder { } } -// Encode serializes the given traces list into the internal -// buffer, returning the error if any -func (e *msgpackEncoder) Encode(traces [][]*Span) error { +// EncodeTraces serializes the given trace list into the internal buffer, +// returning the error if any. +func (e *msgpackEncoder) EncodeTraces(traces [][]*Span) error { e.buffer.Reset() return e.encoder.Encode(traces) } +// EncodeServices serializes a service map into the internal buffer. +func (e *msgpackEncoder) EncodeServices(services map[string]Service) error { + e.buffer.Reset() + return e.encoder.Encode(services) +} + // Read values from the internal buffer func (e *msgpackEncoder) Read(p []byte) (int, error) { return e.buffer.Read(p) @@ -72,13 +78,19 @@ func newJSONEncoder() *jsonEncoder { } } -// Encode serializes the given traces list into the internal -// buffer, returning the error if any -func (e *jsonEncoder) Encode(traces [][]*Span) error { +// EncodeTraces serializes the given trace list into the internal buffer, +// returning the error if any. +func (e *jsonEncoder) EncodeTraces(traces [][]*Span) error { e.buffer.Reset() return e.encoder.Encode(traces) } +// EncodeServices serializes a service map into the internal buffer. +func (e *jsonEncoder) EncodeServices(services map[string]Service) error { + e.buffer.Reset() + return e.encoder.Encode(services) +} + // Read values from the internal buffer func (e *jsonEncoder) Read(p []byte) (int, error) { return e.buffer.Read(p) diff --git a/tracer/encoder_test.go b/tracer/encoder_test.go index 646ceabded..14eb93466c 100644 --- a/tracer/encoder_test.go +++ b/tracer/encoder_test.go @@ -40,7 +40,7 @@ func TestJSONEncoding(t *testing.T) { for _, tc := range testCases { payload := getTestTrace(tc.traces, tc.size) encoder := newJSONEncoder() - err := encoder.Encode(payload) + err := encoder.EncodeTraces(payload) assert.Nil(err) // decode to check the right encoding @@ -84,7 +84,7 @@ func TestMsgpackEncoding(t *testing.T) { for _, tc := range testCases { payload := getTestTrace(tc.traces, tc.size) encoder := newMsgpackEncoder() - err := encoder.Encode(payload) + err := encoder.EncodeTraces(payload) assert.Nil(err) // decode to check the right encoding diff --git a/tracer/tracer.go b/tracer/tracer.go index b76444f28d..74d2aaf186 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -2,6 +2,7 @@ package tracer import ( "log" + "sync" "time" "context" @@ -11,6 +12,16 @@ const ( flushInterval = 2 * time.Second ) +type Service struct { + Name string `json:"-"` // the internal of the service (e.g. acme_search, datadog_web) + App string `json:"app"` // the name of the application (e.g. rails, postgres, custom-app) + AppType string `json:"app_type"` // the type of the application (e.g. db, web) +} + +func (s Service) Equal(s2 Service) bool { + return s.Name == s2.Name && s.App == s2.App && s.AppType == s2.AppType +} + // Tracer creates, buffers and submits Spans which are used to time blocks of // compuration. // @@ -23,6 +34,13 @@ type Tracer struct { DebugLoggingEnabled bool enabled bool // defines if the Tracer is enabled or not + + services map[string]Service // name -> service + servicesModified bool + serviceChan chan Service + + exit chan struct{} + exitWG *sync.WaitGroup } // NewTracer creates a new Tracer. Most users should use the package's @@ -39,14 +57,27 @@ func NewTracerTransport(transport Transport) *Tracer { buffer: newSpansBuffer(spanBufferDefaultMaxSize), sampler: newAllSampler(), DebugLoggingEnabled: false, + + services: make(map[string]Service), + serviceChan: make(chan Service, 10), // we don't want to block when a flush is in progress + + exit: make(chan struct{}), + exitWG: &sync.WaitGroup{}, } // start a background worker + t.exitWG.Add(1) go t.worker() return t } +// Stop stops the tracer. +func (t *Tracer) Stop() { + close(t.exit) + t.exitWG.Wait() +} + // SetEnabled will enable or disable the tracer. func (t *Tracer) SetEnabled(enabled bool) { t.enabled = enabled @@ -69,6 +100,16 @@ func (t *Tracer) SetSampleRate(sampleRate float64) { } } +// SetServiceInfo update the application and application type for the given +// service. +func (t *Tracer) SetServiceInfo(name, app, appType string) { + t.serviceChan <- Service{ + Name: name, + App: app, + AppType: appType, + } +} + // NewRootSpan creates a span with no parent. Its ids will be randomly // assigned. func (t *Tracer) NewRootSpan(name, service, resource string) *Span { @@ -116,8 +157,10 @@ func (t *Tracer) record(span *Span) { } } -// Flush will push any currently buffered traces to the server. -func (t *Tracer) Flush() error { +// FlushTraces will push any currently buffered traces to the server. +// XXX Note that it is currently exported because some tests use it. They +// really should not. +func (t *Tracer) FlushTraces() error { spans := t.buffer.Pop() if t.DebugLoggingEnabled { @@ -132,7 +175,7 @@ func (t *Tracer) Flush() error { return nil } - // rebuild the traces list; this operation is done in the Flush() instead + // rebuild the traces list; this operation is done in the FlushTraces() instead // after each record() because this avoids a huge number of initializations // and RW mutex locks, keeping the same performance as before (except for this // little overhead). The overall optimization (and idiomatic code) could be @@ -146,16 +189,76 @@ func (t *Tracer) Flush() error { traces = append(traces, t) } - _, err := t.transport.Send(traces) + _, err := t.transport.SendTraces(traces) return err } -// worker periodically flushes traces to the transport. +func (t *Tracer) flushServices() error { + if !t.enabled || !t.servicesModified { + return nil + } + + if _, err := t.transport.SendServices(t.services); err != nil { + return err + } + + t.servicesModified = false + return nil +} + +func (t *Tracer) flush() { + nbSpans := t.buffer.Len() + if err := t.FlushTraces(); err != nil { + log.Printf("cannot flush traces: %v", err) + log.Printf("lost %d spans", nbSpans) + } + + if err := t.flushServices(); err != nil { + log.Printf("cannot flush services: %v", err) + } +} + +func (t *Tracer) appendService(service Service) { + if s, found := t.services[service.Name]; !found || !s.Equal(service) { + t.services[service.Name] = service + t.servicesModified = true + } +} + +func (t *Tracer) drainServices() { + for { + select { + case service := <-t.serviceChan: + t.appendService(service) + default: + return + } + } +} + +// worker periodically flushes traces and services to the transport. func (t *Tracer) worker() { - for range time.Tick(flushInterval) { - err := t.Flush() - if err != nil { - log.Printf("[WORKER] flush failed, lost spans: %s", err) + defer t.exitWG.Done() + + flushTicker := time.NewTicker(flushInterval) + defer flushTicker.Stop() + + for { + select { + case <-flushTicker.C: + t.flush() + + case service := <-t.serviceChan: + t.appendService(service) + + case <-t.exit: + // serviceChan being buffered, we drain it before the + // last flush to make sure we have all information. It + // is an edge case, but it is important for tests. + t.drainServices() + + t.flush() + return } } } diff --git a/tracer/tracer_test.go b/tracer/tracer_test.go index 7362eff917..bbff289078 100644 --- a/tracer/tracer_test.go +++ b/tracer/tracer_test.go @@ -171,7 +171,7 @@ func TestTracerConcurrent(t *testing.T) { }() wg.Wait() - tracer.Flush() + tracer.FlushTraces() traces := transport.Traces() assert.Len(traces, 3) assert.Len(traces[0], 1) @@ -203,13 +203,49 @@ func TestTracerConcurrentMultipleSpans(t *testing.T) { }() wg.Wait() - tracer.Flush() + tracer.FlushTraces() traces := transport.Traces() assert.Len(traces, 2) assert.Len(traces[0], 2) assert.Len(traces[1], 2) } +func TestTracerServices(t *testing.T) { + assert := assert.New(t) + tracer, transport := getTestTracer() + + tracer.SetServiceInfo("svc1", "a", "b") + tracer.SetServiceInfo("svc2", "c", "d") + tracer.SetServiceInfo("svc1", "e", "f") + + tracer.Stop() + + assert.Equal(2, len(transport.services)) + + svc1 := transport.services["svc1"] + assert.NotNil(svc1) + assert.Equal("svc1", svc1.Name) + assert.Equal("e", svc1.App) + assert.Equal("f", svc1.AppType) + + svc2 := transport.services["svc2"] + assert.NotNil(svc2) + assert.Equal("svc2", svc2.Name) + assert.Equal("c", svc2.App) + assert.Equal("d", svc2.AppType) +} + +func TestTracerServicesDisabled(t *testing.T) { + assert := assert.New(t) + tracer, transport := getTestTracer() + + tracer.SetEnabled(false) + tracer.SetServiceInfo("svc1", "a", "b") + tracer.Stop() + + assert.Equal(0, len(transport.services)) +} + // BenchmarkConcurrentTracing tests the performance of spawning a lot of // goroutines where each one creates a trace with a parent and a child. func BenchmarkConcurrentTracing(b *testing.B) { @@ -249,15 +285,23 @@ func getTestTracer() (*Tracer, *dummyTransport) { // Mock Transport with a real Encoder type dummyTransport struct { - pool *encoderPool - traces [][]*Span + pool *encoderPool + traces [][]*Span + services map[string]Service } -func (t *dummyTransport) Send(traces [][]*Span) (*http.Response, error) { +func (t *dummyTransport) SendTraces(traces [][]*Span) (*http.Response, error) { t.traces = append(t.traces, traces...) encoder := t.pool.Borrow() defer t.pool.Return(encoder) - return nil, encoder.Encode(traces) + return nil, encoder.EncodeTraces(traces) +} + +func (t *dummyTransport) SendServices(services map[string]Service) (*http.Response, error) { + t.services = services + encoder := t.pool.Borrow() + defer t.pool.Return(encoder) + return nil, encoder.EncodeServices(services) } func (t *dummyTransport) Traces() [][]*Span { diff --git a/tracer/transport.go b/tracer/transport.go index 3291a53be7..4ded832ded 100644 --- a/tracer/transport.go +++ b/tracer/transport.go @@ -19,7 +19,8 @@ const ( // Transport is an interface for span submission to the agent. type Transport interface { - Send(spans [][]*Span) (*http.Response, error) + SendTraces(spans [][]*Span) (*http.Response, error) + SendServices(services map[string]Service) (*http.Response, error) SetHeader(key, value string) } @@ -46,8 +47,10 @@ func newDefaultTransport() Transport { } type httpTransport struct { - url string // the delivery URL - legacyURL string // legacy delivery URL + traceURL string // the delivery URL for traces + legacyTraceURL string // the legacy delivery URL for traces + serviceURL string // the delivery URL for services + legacyServiceURL string // the legacy delivery URL for services pool *encoderPool // encoding allocates lot of buffers (which might then be resized) so we use a pool so they can be re-used client *http.Client // the HTTP client used in the POST headers map[string]string // the Transport headers @@ -62,9 +65,11 @@ func newHTTPTransport(hostname, port string) *httpTransport { defaultHeaders["Content-Type"] = contentType return &httpTransport{ - url: fmt.Sprintf("http://%s:%s/v0.3/traces", hostname, port), - legacyURL: fmt.Sprintf("http://%s:%s/v0.2/traces", hostname, port), - pool: pool, + traceURL: fmt.Sprintf("http://%s:%s/v0.3/traces", hostname, port), + legacyTraceURL: fmt.Sprintf("http://%s:%s/v0.2/traces", hostname, port), + serviceURL: fmt.Sprintf("http://%s:%s/v0.3/services", hostname, port), + legacyServiceURL: fmt.Sprintf("http://%s:%s/v0.2/services", hostname, port), + pool: pool, client: &http.Client{ Timeout: defaultHTTPTimeout, }, @@ -73,8 +78,8 @@ func newHTTPTransport(hostname, port string) *httpTransport { } } -func (t *httpTransport) Send(traces [][]*Span) (*http.Response, error) { - if t.url == "" { +func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) { + if t.traceURL == "" { return nil, errors.New("provided an empty URL, giving up") } @@ -83,13 +88,13 @@ func (t *httpTransport) Send(traces [][]*Span) (*http.Response, error) { defer t.pool.Return(encoder) // encode the spans and return the error if any - err := encoder.Encode(traces) + err := encoder.EncodeTraces(traces) if err != nil { return nil, err } // prepare the client and send the payload - req, _ := http.NewRequest("POST", t.url, encoder) + req, _ := http.NewRequest("POST", t.traceURL, encoder) for header, value := range t.headers { req.Header.Set(header, value) } @@ -102,9 +107,47 @@ func (t *httpTransport) Send(traces [][]*Span) (*http.Response, error) { // if we got a 404 we should downgrade the API to a stable version (at most once) if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode { - log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.url, response.StatusCode) + log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode) t.apiDowngrade() - return t.Send(traces) + return t.SendTraces(traces) + } + + response.Body.Close() + return response, err +} + +func (t *httpTransport) SendServices(services map[string]Service) (*http.Response, error) { + if t.serviceURL == "" { + return nil, errors.New("provided an empty URL, giving up") + } + + // Encode the service table + encoder := t.pool.Borrow() + defer t.pool.Return(encoder) + + if err := encoder.EncodeServices(services); err != nil { + return nil, err + } + + // Send it + req, err := http.NewRequest("POST", t.serviceURL, encoder) + if err != nil { + return nil, fmt.Errorf("cannot create http request: %v", err) + } + for header, value := range t.headers { + req.Header.Set(header, value) + } + + response, err := t.client.Do(req) + if err != nil { + return &http.Response{StatusCode: 0}, err + } + + // Downgrade if necessary + if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode { + log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode) + t.apiDowngrade() + return t.SendServices(services) } response.Body.Close() @@ -130,6 +173,7 @@ func (t *httpTransport) changeEncoder(encoderType int) { // executed only once. func (t *httpTransport) apiDowngrade() { t.compatibilityMode = true - t.url = t.legacyURL + t.traceURL = t.legacyTraceURL + t.serviceURL = t.legacyServiceURL t.changeEncoder(legacyEncoder) } diff --git a/tracer/transport_test.go b/tracer/transport_test.go index f1ce67bf1d..7412bce71b 100644 --- a/tracer/transport_test.go +++ b/tracer/transport_test.go @@ -38,6 +38,13 @@ func getTestTrace(traceN, size int) [][]*Span { return traces } +func getTestServices() map[string]Service { + return map[string]Service{ + "svc1": Service{Name: "scv1", App: "a", AppType: "b"}, + "svc2": Service{Name: "scv2", App: "c", AppType: "d"}, + } +} + func TestTracesAgentIntegration(t *testing.T) { assert := assert.New(t) @@ -52,7 +59,7 @@ func TestTracesAgentIntegration(t *testing.T) { for _, tc := range testCases { transport := newHTTPTransport(defaultHostname, defaultPort) - response, err := transport.Send(tc.payload) + response, err := transport.SendTraces(tc.payload) assert.Nil(err) assert.NotNil(response) assert.Equal(200, response.StatusCode) @@ -62,11 +69,11 @@ func TestTracesAgentIntegration(t *testing.T) { func TestAPIDowngrade(t *testing.T) { assert := assert.New(t) transport := newHTTPTransport(defaultHostname, defaultPort) - transport.url = "http://localhost:7777/v0.0/traces" + transport.traceURL = "http://localhost:7777/v0.0/traces" // if we get a 404 we should downgrade the API traces := getTestTrace(2, 2) - response, err := transport.Send(traces) + response, err := transport.SendTraces(traces) assert.Nil(err) assert.NotNil(response) assert.Equal(200, response.StatusCode) @@ -75,11 +82,46 @@ func TestAPIDowngrade(t *testing.T) { func TestEncoderDowngrade(t *testing.T) { assert := assert.New(t) transport := newHTTPTransport(defaultHostname, defaultPort) - transport.url = "http://localhost:7777/v0.2/traces" + transport.traceURL = "http://localhost:7777/v0.2/traces" // if we get a 415 because of a wrong encoder, we should downgrade the encoder traces := getTestTrace(2, 2) - response, err := transport.Send(traces) + response, err := transport.SendTraces(traces) + assert.Nil(err) + assert.NotNil(response) + assert.Equal(200, response.StatusCode) +} + +func TestTransportServices(t *testing.T) { + assert := assert.New(t) + + transport := newHTTPTransport(defaultHostname, defaultPort) + + response, err := transport.SendServices(getTestServices()) + assert.Nil(err) + assert.NotNil(response) + assert.Equal(200, response.StatusCode) +} + +func TestTransportServicesDowngrade_0_0(t *testing.T) { + assert := assert.New(t) + + transport := newHTTPTransport(defaultHostname, defaultPort) + transport.serviceURL = "http://localhost:7777/v0.0/services" + + response, err := transport.SendServices(getTestServices()) + assert.Nil(err) + assert.NotNil(response) + assert.Equal(200, response.StatusCode) +} + +func TestTransportServicesDowngrade_0_2(t *testing.T) { + assert := assert.New(t) + + transport := newHTTPTransport(defaultHostname, defaultPort) + transport.serviceURL = "http://localhost:7777/v0.2/services" + + response, err := transport.SendServices(getTestServices()) assert.Nil(err) assert.NotNil(response) assert.Equal(200, response.StatusCode) From 5eb09be551007c2b38c9026590b4a052a00c75df Mon Sep 17 00:00:00 2001 From: Nicolas Martyanoff Date: Fri, 6 Jan 2017 13:55:35 +0100 Subject: [PATCH 2/2] add service metadata for integrations The API of the grpc integration has to be modified to get access to the service name and tracer. --- tracer/contrib/gin-gonic/gintrace/gintrace.go | 1 + tracer/contrib/gorilla/muxtrace/muxtrace.go | 1 + tracer/contrib/tracegrpc/grpc.go | 7 +++++-- tracer/contrib/tracegrpc/grpc_test.go | 4 ++-- tracer/ext/app_types.go | 9 +++++++++ 5 files changed, 18 insertions(+), 4 deletions(-) create mode 100644 tracer/ext/app_types.go diff --git a/tracer/contrib/gin-gonic/gintrace/gintrace.go b/tracer/contrib/gin-gonic/gintrace/gintrace.go index 8641a68985..9551f89433 100644 --- a/tracer/contrib/gin-gonic/gintrace/gintrace.go +++ b/tracer/contrib/gin-gonic/gintrace/gintrace.go @@ -22,6 +22,7 @@ func Middleware(service string) gin.HandlerFunc { // MiddlewareTracer returns middleware that will trace requests with the given // tracer. func MiddlewareTracer(service string, t *tracer.Tracer) gin.HandlerFunc { + t.SetServiceInfo(service, "gin-gonic", ext.AppTypeWeb) mw := newMiddleware(service, t) return mw.Handle } diff --git a/tracer/contrib/gorilla/muxtrace/muxtrace.go b/tracer/contrib/gorilla/muxtrace/muxtrace.go index d035ddac1f..181723f04b 100644 --- a/tracer/contrib/gorilla/muxtrace/muxtrace.go +++ b/tracer/contrib/gorilla/muxtrace/muxtrace.go @@ -18,6 +18,7 @@ type MuxTracer struct { // NewMuxTracer creates a MuxTracer for the given service and tracer. func NewMuxTracer(service string, t *tracer.Tracer) *MuxTracer { + t.SetServiceInfo(service, "gorilla", ext.AppTypeWeb) return &MuxTracer{ tracer: t, service: service, diff --git a/tracer/contrib/tracegrpc/grpc.go b/tracer/contrib/tracegrpc/grpc.go index a0ea8191f3..be778547c8 100644 --- a/tracer/contrib/tracegrpc/grpc.go +++ b/tracer/contrib/tracegrpc/grpc.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/DataDog/dd-trace-go/tracer" + "github.com/DataDog/dd-trace-go/tracer/ext" context "golang.org/x/net/context" "google.golang.org/grpc" @@ -19,7 +20,8 @@ const ( ) // UnaryServerInterceptor will trace requests to the given grpc server. -func UnaryServerInterceptor(t *tracer.Tracer) grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(service string, t *tracer.Tracer) grpc.UnaryServerInterceptor { + t.SetServiceInfo(service, "grpc-server", ext.AppTypeRPC) return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if !t.Enabled() { return handler(ctx, req) @@ -33,7 +35,8 @@ func UnaryServerInterceptor(t *tracer.Tracer) grpc.UnaryServerInterceptor { } // UnaryClientInterceptor will add tracing to a gprc client. -func UnaryClientInterceptor() grpc.UnaryClientInterceptor { +func UnaryClientInterceptor(service string, t *tracer.Tracer) grpc.UnaryClientInterceptor { + t.SetServiceInfo(service, "grpc-client", ext.AppTypeRPC) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { var child *tracer.Span diff --git a/tracer/contrib/tracegrpc/grpc_test.go b/tracer/contrib/tracegrpc/grpc_test.go index d455008916..309900a9be 100644 --- a/tracer/contrib/tracegrpc/grpc_test.go +++ b/tracer/contrib/tracegrpc/grpc_test.go @@ -189,7 +189,7 @@ func (r *rig) Close() { func newRig(t *tracer.Tracer, traceClient bool) (*rig, error) { - server := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor(t))) + server := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor("foo", t))) RegisterFixtureServer(server, newFixtureServer()) @@ -206,7 +206,7 @@ func newRig(t *tracer.Tracer, traceClient bool) (*rig, error) { } if traceClient { - opts = append(opts, grpc.WithUnaryInterceptor(UnaryClientInterceptor())) + opts = append(opts, grpc.WithUnaryInterceptor(UnaryClientInterceptor("foo", t))) } conn, err := grpc.Dial(li.Addr().String(), opts...) diff --git a/tracer/ext/app_types.go b/tracer/ext/app_types.go new file mode 100644 index 0000000000..401e2f4a07 --- /dev/null +++ b/tracer/ext/app_types.go @@ -0,0 +1,9 @@ +package ext + +// Application types for services. +const ( + AppTypeWeb = "web" + AppTypeDB = "db" + AppTypeCache = "cache" + AppTypeRPC = "rpc" +)