Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/google.golang.org/grpc: add WithErrorCheck option #2035

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions contrib/google.golang.org/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
}
defer func() { finishWithError(span, err, cs.cfg) }()
defer func() { finishWithError(span, err, cs.method, cs.cfg) }()
}
err = cs.ClientStream.RecvMsg(m)
return err
Expand All @@ -64,7 +64,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
}
defer func() { finishWithError(span, err, cs.cfg) }()
defer func() { finishWithError(span, err, cs.method, cs.cfg) }()
}
err = cs.ClientStream.SendMsg(m)
return err
Expand Down Expand Up @@ -104,7 +104,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
return err
})
if err != nil {
finishWithError(span, err, cfg)
finishWithError(span, err, method, cfg)
return nil, err
}

Expand All @@ -116,7 +116,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {

go func() {
<-stream.Context().Done()
finishWithError(span, stream.Context().Err(), cfg)
finishWithError(span, stream.Context().Err(), method, cfg)
}()
} else {
// if call tracing is disabled, just call streamer, but still return
Expand Down Expand Up @@ -158,7 +158,7 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
func(ctx context.Context, opts []grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
})
finishWithError(span, err, cfg)
finishWithError(span, err, method, cfg)
return err
}
}
Expand Down
7 changes: 5 additions & 2 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func init() {
// cache a constant option: saves one allocation per call
var spanTypeRPC = tracer.SpanType(ext.AppTypeRPC)

type fullMethodNameKey struct{}

func (cfg *config) startSpanOptions(opts ...tracer.StartSpanOption) []tracer.StartSpanOption {
if len(cfg.tags) == 0 && len(cfg.spanOpts) == 0 {
return opts
Expand Down Expand Up @@ -69,16 +71,17 @@ func startSpanFromContext(
if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil {
opts = append(opts, tracer.ChildOf(sctx))
}
ctx = context.WithValue(ctx, fullMethodNameKey{}, method)
return tracer.StartSpanFromContext(ctx, operation, opts...)
}

// finishWithError applies finish option and a tag with gRPC status code, disregarding OK, EOF and Canceled errors.
func finishWithError(span ddtrace.Span, err error, cfg *config) {
func finishWithError(span ddtrace.Span, err error, method string, cfg *config) {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
err = nil
}
errcode := status.Code(err)
if errcode == codes.OK || cfg.nonErrorCodes[errcode] {
if errcode == codes.OK || cfg.nonErrorCodes[errcode] || (cfg.nonErrorFunc != nil && cfg.nonErrorFunc(method, err)) {
err = nil
}
span.SetTag(tagCode, errcode.String())
Expand Down
215 changes: 215 additions & 0 deletions contrib/google.golang.org/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,221 @@ func waitForSpans(mt mocktracer.Tracer, sz int) {
}
}

func TestNonErrorFunc(t *testing.T) {
t.Run("unary", func(t *testing.T) {
for name, tt := range map[string]struct {
nonErrorFunc func(method string, err error) bool
message string
withError bool
wantCode string
wantMessage string
}{
"Invalid_with_no_error": {
message: "invalid",
nonErrorFunc: func(method string, err error) bool {
if err == nil {
return true
}

errCode := status.Code(err)
if errCode == codes.InvalidArgument && method == "/grpc.Fixture/Ping" {
return true
}

return false
},
withError: false,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
"Invalid_with_error": {
message: "invalid",
nonErrorFunc: func(method string, err error) bool {
if err == nil {
return true
}

errCode := status.Code(err)
if errCode == codes.InvalidArgument && method == "/some/endpoint" {
return true
}

return false
},
withError: true,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
"Invalid_with_error_without_nonErrorFunc": {
message: "invalid",
nonErrorFunc: nil,
withError: true,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
} {
t.Run(name, func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

var (
rig *rig
err error
)
if tt.nonErrorFunc == nil {
rig, err = newRig(true)
} else {
rig, err = newRig(true, NonErrorFunc(tt.nonErrorFunc))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var (
rig *rig
err error
)
if tt.nonErrorFunc == nil {
rig, err = newRig(true)
} else {
rig, err = newRig(true, NonErrorFunc(tt.nonErrorFunc))
}
var opts []Option
if tt.nonErrorFunc != nil {
opts = append(opts, NonErrorFunc(tt.nonErrorFunc))
}
rig, err := newRig(true, opts...)

if err != nil {
t.Fatalf("error setting up rig: %s", err)
}

client := rig.client
_, err = client.Ping(context.Background(), &FixtureRequest{Name: tt.message})
assert.Error(t, err)
assert.Equal(t, tt.wantCode, status.Code(err).String())
assert.Equal(t, tt.wantMessage, status.Convert(err).Message())

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)

var serverSpan, clientSpan mocktracer.Span

for _, s := range spans {
// order of traces in buffer is not garanteed
switch s.OperationName() {
case "grpc.server":
serverSpan = s
case "grpc.client":
clientSpan = s
}
}

if tt.withError {
assert.NotNil(t, clientSpan.Tag(ext.Error))
assert.NotNil(t, serverSpan.Tag(ext.Error))
} else {
assert.Nil(t, clientSpan.Tag(ext.Error))
assert.Nil(t, serverSpan.Tag(ext.Error))
}

rig.Close()
mt.Reset()
})
}
})

t.Run("stream", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the start of the mocktracer this into the test case run

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct revision of your point?
4fca224

for name, tt := range map[string]struct {
nonErrorFunc func(method string, err error) bool
message string
withError bool
wantCode string
wantMessage string
}{
"Invalid_with_no_error": {
message: "invalid",
nonErrorFunc: func(method string, err error) bool {
if err == nil {
return true
}

errCode := status.Code(err)
if errCode == codes.InvalidArgument && method == "/grpc.Fixture/StreamPing" {
return true
}

return false
},
withError: false,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
"Invalid_with_error": {
message: "invalid",
nonErrorFunc: func(method string, err error) bool {
if err == nil {
return true
}

errCode := status.Code(err)
if errCode == codes.InvalidArgument && method == "/some/endpoint" {
return true
}

return false
},
withError: true,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
"Invalid_with_error_without_nonErrorFunc": {
message: "invalid",
nonErrorFunc: nil,
withError: true,
wantCode: codes.InvalidArgument.String(),
wantMessage: "invalid",
},
} {
t.Run(name, func(t *testing.T) {
var (
rig *rig
err error
)
if tt.nonErrorFunc == nil {
rig, err = newRig(true)
} else {
rig, err = newRig(true, NonErrorFunc(tt.nonErrorFunc))
}
darccio marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}

ctx, done := context.WithCancel(context.Background())
client := rig.client
stream, err := client.StreamPing(ctx)
assert.NoError(t, err)

err = stream.Send(&FixtureRequest{Name: tt.message})
assert.NoError(t, err)

_, err = stream.Recv()
assert.Error(t, err)
assert.Equal(t, tt.wantCode, status.Code(err).String())
assert.Equal(t, tt.wantMessage, status.Convert(err).Message())

assert.NoError(t, stream.CloseSend())
done() // close stream from client side
rig.Close()

waitForSpans(mt, 5)

spans := mt.FinishedSpans()
assert.Len(t, spans, 5)

noError := true
for _, s := range spans {
if s.Tag(ext.Error) != nil {
noError = false
break
}
}

if tt.withError {
assert.False(t, noError)
} else {
assert.True(t, noError)
}
darccio marked this conversation as resolved.
Show resolved Hide resolved

mt.Reset()
})
}
})
}

func TestAnalyticsSettings(t *testing.T) {
assertRate := func(t *testing.T, mt mocktracer.Tracer, rate interface{}, opts ...InterceptorOption) {
rig, err := newRig(true, opts...)
Expand Down
10 changes: 10 additions & 0 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
serviceName func() string
spanName string
nonErrorCodes map[codes.Code]bool
nonErrorFunc func(method string, err error) bool
traceStreamCalls bool
traceStreamMessages bool
noDebugStack bool
Expand Down Expand Up @@ -129,6 +130,15 @@ func NonErrorCodes(cs ...codes.Code) InterceptorOption {
}
}

// NonErrorFunc sets a custom function to determine whether an error should not be considered as an error for tracing purposes.
// This function is evaluated when an error occurs, and if it returns true, the error will not be recorded in the trace.
// f: A function taking the gRPC method and error as arguments, returning a boolean to indicate if the error should be ignored.
func NonErrorFunc(f func(method string, err error) bool) Option {
return func(cfg *config) {
cfg.nonErrorFunc = f
}
}

darccio marked this conversation as resolved.
Show resolved Hide resolved
// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
Expand Down
8 changes: 4 additions & 4 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
defer func() {
withMetadataTags(ss.ctx, ss.cfg, span)
withRequestTags(ss.cfg, m, span)
finishWithError(span, err, ss.cfg)
finishWithError(span, err, ss.method, ss.cfg)
}()
}
err = ss.ServerStream.RecvMsg(m)
Expand All @@ -72,7 +72,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.cfg.startSpanOptions(tracer.Measured())...,
)
span.SetTag(ext.Component, componentName)
defer func() { finishWithError(span, err, ss.cfg) }()
defer func() { finishWithError(span, err, ss.method, ss.cfg) }()
}
err = ss.ServerStream.SendMsg(m)
return err
Expand Down Expand Up @@ -110,7 +110,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
case info.IsClientStream:
span.SetTag(tagMethodKind, methodKindClientStream)
}
defer func() { finishWithError(span, err, cfg) }()
defer func() { finishWithError(span, err, info.FullMethod, cfg) }()
if appsec.Enabled() {
handler = appsecStreamHandlerMiddleware(span, handler)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
handler = appsecUnaryHandlerMiddleware(span, handler)
}
resp, err := handler(ctx, req)
finishWithError(span, err, cfg)
finishWithError(span, err, info.FullMethod, cfg)
return resp, err
}
}
Expand Down
10 changes: 9 additions & 1 deletion contrib/google.golang.org/grpc/stats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
span.SetTag(ext.TargetPort, port)
}
case *stats.End:
finishWithError(span, rs.Error, h.cfg)
val := ctx.Value(fullMethodNameKey{})
if val == nil {
return
}
fullMethod, ok := val.(string)
if !ok {
return
}
finishWithError(span, rs.Error, fullMethod, h.cfg)
darccio marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
11 changes: 10 additions & 1 deletion contrib/google.golang.org/grpc/stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo)
h.cfg.serviceName,
spanOpts...,
)
ctx = context.WithValue(ctx, fullMethodNameKey{}, rti.FullMethodName)
return ctx
}

Expand All @@ -52,8 +53,16 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
if !ok {
return
}
val := ctx.Value(fullMethodNameKey{})
if val == nil {
return
}
fullMethod, ok := val.(string)
if !ok {
return
}
darccio marked this conversation as resolved.
Show resolved Hide resolved
if v, ok := rs.(*stats.End); ok {
finishWithError(span, v.Error, h.cfg)
finishWithError(span, v.Error, fullMethod, h.cfg)
}
}

Expand Down