Skip to content

Commit 9a1fcef

Browse files
committed
adding semantic conv for dapr and trace prop from context
1 parent f3afee2 commit 9a1fcef

File tree

4 files changed

+123
-22
lines changed

4 files changed

+123
-22
lines changed

api/helpers/tracing.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/protobuf/types/known/wrapperspb"
1515

1616
"github.com/dapr/durabletask-go/api/protos"
17+
"github.com/dapr/durabletask-go/api/semconv"
1718
)
1819

1920
var tracer = otel.Tracer("durabletask")
@@ -22,9 +23,9 @@ func StartNewCreateOrchestrationSpan(
2223
ctx context.Context, name string, version string, instanceID string,
2324
) (context.Context, trace.Span) {
2425
attributes := []attribute.KeyValue{
25-
{Key: "durabletask.type", Value: attribute.StringValue("orchestration")},
26-
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
27-
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
26+
semconv.DaprWorkflowTypeKey.String("orchestration"),
27+
semconv.DaprWorkflowNameKey.String(name),
28+
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
2829
}
2930
return startNewSpan(ctx, "create_orchestration", name, version, attributes, trace.SpanKindClient, time.Now().UTC())
3031
}
@@ -36,9 +37,9 @@ func StartNewRunOrchestrationSpan(
3637
instanceID := es.OrchestrationInstance.InstanceId
3738
version := es.Version.GetValue()
3839
attributes := []attribute.KeyValue{
39-
{Key: "durabletask.type", Value: attribute.StringValue("orchestration")},
40-
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
41-
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
40+
semconv.DaprWorkflowTypeKey.String("orchestration"),
41+
semconv.DaprWorkflowNameKey.String(name),
42+
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
4243
}
4344
return startNewSpan(ctx, "orchestration", name, version, attributes, trace.SpanKindServer, startedTime)
4445
}
@@ -47,20 +48,20 @@ func StartNewActivitySpan(
4748
ctx context.Context, name string, version string, instanceID string, taskID int32,
4849
) (context.Context, trace.Span) {
4950
attributes := []attribute.KeyValue{
50-
{Key: "durabletask.type", Value: attribute.StringValue("activity")},
51-
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
52-
{Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(taskID))},
53-
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
51+
semconv.DaprWorkflowTypeKey.String("activity"),
52+
semconv.DaprWorkflowNameKey.String(name),
53+
semconv.DaprWorkflowTaskIdKey.Int64(int64(taskID)),
54+
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
5455
}
5556
return startNewSpan(ctx, "activity", name, version, attributes, trace.SpanKindServer, time.Now().UTC())
5657
}
5758

5859
func StartAndEndNewTimerSpan(ctx context.Context, tf *protos.TimerFiredEvent, createdTime time.Time, instanceID string) error {
5960
attributes := []attribute.KeyValue{
60-
{Key: "durabletask.type", Value: attribute.StringValue("timer")},
61-
{Key: "durabletask.fire_at", Value: attribute.StringValue(tf.FireAt.AsTime().Format(time.RFC3339))}, // time.RFC3339 most closely maps to ISO 8601
62-
{Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(tf.TimerId))},
63-
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
61+
semconv.DaprWorkflowTypeKey.String("timer"),
62+
semconv.DaprWorkflowTimerFireAtKey.String(tf.FireAt.AsTime().Format(time.RFC3339)), // time.RFC3339 most closely maps to ISO 8601
63+
semconv.DaprWorkflowTimerIdKey.Int64(int64(tf.TimerId)),
64+
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
6465
}
6566

6667
_, span := startNewSpan(ctx, "timer", "", "", attributes, trace.SpanKindInternal, createdTime)
@@ -80,10 +81,7 @@ func startNewSpan(
8081
var spanName string
8182
if taskVersion != "" {
8283
spanName = taskType + "||" + taskName + "||" + taskVersion
83-
attributes = append(attributes, attribute.KeyValue{
84-
Key: "durabletask.task.version",
85-
Value: attribute.StringValue(taskVersion),
86-
})
84+
attributes = append(attributes, semconv.DaprWorkflowVersionKey.String(taskVersion))
8785
} else if taskName != "" {
8886
spanName = taskType + "||" + taskName
8987
} else {

api/semconv/workflow.go

Lines changed: 63 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/activity.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,23 @@ import (
1515
type activityProcessor struct {
1616
be Backend
1717
executor ActivityExecutor
18+
logger Logger
1819
}
1920

2021
type ActivityExecutor interface {
2122
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
2223
}
2324

2425
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem] {
25-
processor := newActivityProcessor(be, executor)
26+
processor := newActivityProcessor(be, executor, logger)
2627
return NewTaskWorker(processor, logger, opts...)
2728
}
2829

29-
func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor[*ActivityWorkItem] {
30+
func newActivityProcessor(be Backend, executor ActivityExecutor, logger Logger) TaskProcessor[*ActivityWorkItem] {
3031
return &activityProcessor{
3132
be: be,
3233
executor: executor,
34+
logger: logger,
3335
}
3436
}
3537

@@ -65,7 +67,7 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo
6567
}()
6668
}
6769

68-
// set the parent trace context to be the newly created activity span
70+
// Set the parent trace context to be the current span for downstream propagation
6971
ts.ParentTraceContext = helpers.TraceContextFromSpan(span)
7072

7173
// Execute the activity and get its result

backend/executor.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/google/uuid"
13+
"go.opentelemetry.io/otel/trace"
1314
"google.golang.org/grpc"
1415
"google.golang.org/grpc/codes"
1516
"google.golang.org/grpc/metadata"
@@ -169,14 +170,21 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
169170

170171
task := e.GetTaskScheduled()
171172

173+
// Extract trace context from the incoming context to propagate to Java client
174+
// If task.ParentTraceContext is not set, extract it from the context
175+
parentTraceCtx := task.ParentTraceContext
176+
if parentTraceCtx == nil {
177+
parentTraceCtx = executor.extractTraceContextFromCtx(ctx)
178+
}
179+
172180
req := &protos.ActivityRequest{
173181
Name: task.Name,
174182
Version: task.Version,
175183
Input: task.Input,
176184
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
177185
TaskId: e.EventId,
178186
TaskExecutionId: task.TaskExecutionId,
179-
ParentTraceContext: task.ParentTraceContext,
187+
ParentTraceContext: parentTraceCtx,
180188
}
181189
workItem := &protos.WorkItem{
182190
Request: &protos.WorkItem_ActivityRequest{
@@ -266,6 +274,36 @@ func (g *grpcExecutor) Shutdown(ctx context.Context) error {
266274
return nil
267275
}
268276

277+
// extractTraceContextFromCtx extracts the trace context from the context and returns a TraceContext protobuf message
278+
// This is used to propagate trace context through the ActivityRequest protobuf
279+
func (g *grpcExecutor) extractTraceContextFromCtx(ctx context.Context) *protos.TraceContext {
280+
spanCtx := trace.SpanFromContext(ctx).SpanContext()
281+
if !spanCtx.IsValid() {
282+
g.logger.Debug("No valid span context found in context")
283+
return nil
284+
}
285+
286+
// Build W3C Trace Context traceparent header: version-traceId-spanId-traceFlags
287+
traceparent := fmt.Sprintf("00-%s-%s-%02x",
288+
spanCtx.TraceID().String(),
289+
spanCtx.SpanID().String(),
290+
spanCtx.TraceFlags())
291+
292+
traceCtx := &protos.TraceContext{
293+
TraceParent: traceparent,
294+
}
295+
296+
// Add tracestate if present
297+
if stateStr := spanCtx.TraceState().String(); stateStr != "" {
298+
traceCtx.TraceState = wrapperspb.String(stateStr)
299+
}
300+
301+
g.logger.Infof("Extracted trace context from ctx: traceID=%s spanID=%s flags=%02x",
302+
spanCtx.TraceID(), spanCtx.SpanID(), spanCtx.TraceFlags())
303+
304+
return traceCtx
305+
}
306+
269307
// Hello implements protos.TaskHubSidecarServiceServer
270308
func (grpcExecutor) Hello(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
271309
return empty, nil

0 commit comments

Comments
 (0)