From 56207d9310ec1cb9fbe2255fcb5df112aa81dc96 Mon Sep 17 00:00:00 2001 From: salaboy Date: Fri, 28 Nov 2025 12:44:27 +0000 Subject: [PATCH] adding semantic conv for dapr and trace prop from context Signed-off-by: salaboy --- api/helpers/tracing.go | 34 +++++++++++----------- api/semconv/workflow.go | 63 +++++++++++++++++++++++++++++++++++++++++ backend/activity.go | 8 ++++-- backend/executor.go | 40 +++++++++++++++++++++++++- 4 files changed, 123 insertions(+), 22 deletions(-) create mode 100644 api/semconv/workflow.go diff --git a/api/helpers/tracing.go b/api/helpers/tracing.go index 450f8798..40192e81 100644 --- a/api/helpers/tracing.go +++ b/api/helpers/tracing.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/durabletask-go/api/semconv" ) var tracer = otel.Tracer("durabletask") @@ -22,9 +23,9 @@ func StartNewCreateOrchestrationSpan( ctx context.Context, name string, version string, instanceID string, ) (context.Context, trace.Span) { attributes := []attribute.KeyValue{ - {Key: "durabletask.type", Value: attribute.StringValue("orchestration")}, - {Key: "durabletask.task.name", Value: attribute.StringValue(name)}, - {Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)}, + semconv.DaprWorkflowTypeKey.String("orchestration"), + semconv.DaprWorkflowNameKey.String(name), + semconv.DaprWorkflowInstanceIdKey.String(instanceID), } return startNewSpan(ctx, "create_orchestration", name, version, attributes, trace.SpanKindClient, time.Now().UTC()) } @@ -36,9 +37,9 @@ func StartNewRunOrchestrationSpan( instanceID := es.OrchestrationInstance.InstanceId version := es.Version.GetValue() attributes := []attribute.KeyValue{ - {Key: "durabletask.type", Value: attribute.StringValue("orchestration")}, - {Key: "durabletask.task.name", Value: attribute.StringValue(name)}, - {Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)}, + semconv.DaprWorkflowTypeKey.String("orchestration"), + semconv.DaprWorkflowNameKey.String(name), + semconv.DaprWorkflowInstanceIdKey.String(instanceID), } return startNewSpan(ctx, "orchestration", name, version, attributes, trace.SpanKindServer, startedTime) } @@ -47,20 +48,20 @@ func StartNewActivitySpan( ctx context.Context, name string, version string, instanceID string, taskID int32, ) (context.Context, trace.Span) { attributes := []attribute.KeyValue{ - {Key: "durabletask.type", Value: attribute.StringValue("activity")}, - {Key: "durabletask.task.name", Value: attribute.StringValue(name)}, - {Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(taskID))}, - {Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)}, + semconv.DaprWorkflowTypeKey.String("activity"), + semconv.DaprWorkflowNameKey.String(name), + semconv.DaprWorkflowTaskIdKey.Int64(int64(taskID)), + semconv.DaprWorkflowInstanceIdKey.String(instanceID), } return startNewSpan(ctx, "activity", name, version, attributes, trace.SpanKindServer, time.Now().UTC()) } func StartAndEndNewTimerSpan(ctx context.Context, tf *protos.TimerFiredEvent, createdTime time.Time, instanceID string) error { attributes := []attribute.KeyValue{ - {Key: "durabletask.type", Value: attribute.StringValue("timer")}, - {Key: "durabletask.fire_at", Value: attribute.StringValue(tf.FireAt.AsTime().Format(time.RFC3339))}, // time.RFC3339 most closely maps to ISO 8601 - {Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(tf.TimerId))}, - {Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)}, + semconv.DaprWorkflowTypeKey.String("timer"), + semconv.DaprWorkflowTimerFireAtKey.String(tf.FireAt.AsTime().Format(time.RFC3339)), // time.RFC3339 most closely maps to ISO 8601 + semconv.DaprWorkflowTimerIdKey.Int64(int64(tf.TimerId)), + semconv.DaprWorkflowInstanceIdKey.String(instanceID), } _, span := startNewSpan(ctx, "timer", "", "", attributes, trace.SpanKindInternal, createdTime) @@ -80,10 +81,7 @@ func startNewSpan( var spanName string if taskVersion != "" { spanName = taskType + "||" + taskName + "||" + taskVersion - attributes = append(attributes, attribute.KeyValue{ - Key: "durabletask.task.version", - Value: attribute.StringValue(taskVersion), - }) + attributes = append(attributes, semconv.DaprWorkflowVersionKey.String(taskVersion)) } else if taskName != "" { spanName = taskType + "||" + taskName } else { diff --git a/api/semconv/workflow.go b/api/semconv/workflow.go new file mode 100644 index 00000000..61732cdd --- /dev/null +++ b/api/semconv/workflow.go @@ -0,0 +1,63 @@ +// Copyright 2025 The Dapr Authors +// SPDX-License-Identifier: Apache-2.0 + +// Code generated from semantic convention specification. DO NOT EDIT. + +package semconv + +import "go.opentelemetry.io/otel/attribute" + + +// Semantic conventions for Dapr workflow engine (durabletask). +const ( + // DaprWorkflowTypeKey is the attribute Key conforming to the "dapr.workflow.type" semantic conventions. + // The type of workflow component. + DaprWorkflowTypeKey = attribute.Key("dapr.workflow.type") + // DaprWorkflowNameKey is the attribute Key conforming to the "dapr.workflow.name" semantic conventions. + // The name of the workflow or activity being executed. + DaprWorkflowNameKey = attribute.Key("dapr.workflow.name") + // DaprWorkflowInstanceIdKey is the attribute Key conforming to the "dapr.workflow.instance_id" semantic conventions. + // The unique instance ID of the workflow execution. + DaprWorkflowInstanceIdKey = attribute.Key("dapr.workflow.instance_id") + // DaprWorkflowTaskIdKey is the attribute Key conforming to the "dapr.workflow.task_id" semantic conventions. + // The task ID of the activity or timer within the workflow. + DaprWorkflowTaskIdKey = attribute.Key("dapr.workflow.task_id") + // DaprWorkflowVersionKey is the attribute Key conforming to the "dapr.workflow.version" semantic conventions. + // The version of the workflow or activity. + DaprWorkflowVersionKey = attribute.Key("dapr.workflow.version") + // DaprWorkflowStatusKey is the attribute Key conforming to the "dapr.workflow.status" semantic conventions. + // The runtime status of the workflow. + DaprWorkflowStatusKey = attribute.Key("dapr.workflow.status") + // DaprWorkflowParentInstanceIdKey is the attribute Key conforming to the "dapr.workflow.parent_instance_id" semantic conventions. + // The instance ID of the parent workflow for sub-orchestrations. + DaprWorkflowParentInstanceIdKey = attribute.Key("dapr.workflow.parent_instance_id") + // DaprWorkflowParentNameKey is the attribute Key conforming to the "dapr.workflow.parent_name" semantic conventions. + // The name of the parent workflow for sub-orchestrations. + DaprWorkflowParentNameKey = attribute.Key("dapr.workflow.parent_name") +) + +// Attributes for async workflow operations like external events and timers. +const ( + // DaprWorkflowEventNameKey is the attribute Key conforming to the "dapr.workflow.event.name" semantic conventions. + // The name of the external event raised to the workflow. + DaprWorkflowEventNameKey = attribute.Key("dapr.workflow.event.name") + // DaprWorkflowTimerFireAtKey is the attribute Key conforming to the "dapr.workflow.timer.fire_at" semantic conventions. + // The ISO 8601 timestamp when the timer is scheduled to fire. + DaprWorkflowTimerFireAtKey = attribute.Key("dapr.workflow.timer.fire_at") + // DaprWorkflowTimerIdKey is the attribute Key conforming to the "dapr.workflow.timer.id" semantic conventions. + // The unique identifier of the timer within the workflow. + DaprWorkflowTimerIdKey = attribute.Key("dapr.workflow.timer.id") +) + +// Internal attributes for workflow actor implementation. +const ( + // DaprWorkflowActorTypeKey is the attribute Key conforming to the "dapr.workflow.actor.type" semantic conventions. + // The actor type used internally for workflow execution. + DaprWorkflowActorTypeKey = attribute.Key("dapr.workflow.actor.type") + // DaprWorkflowActorIdKey is the attribute Key conforming to the "dapr.workflow.actor.id" semantic conventions. + // The actor ID which corresponds to the workflow or activity instance. + DaprWorkflowActorIdKey = attribute.Key("dapr.workflow.actor.id") + // DaprWorkflowActorGenerationKey is the attribute Key conforming to the "dapr.workflow.actor.generation" semantic conventions. + // The generation number of the workflow state. + DaprWorkflowActorGenerationKey = attribute.Key("dapr.workflow.actor.generation") +) diff --git a/backend/activity.go b/backend/activity.go index 096c681b..81c90940 100644 --- a/backend/activity.go +++ b/backend/activity.go @@ -15,6 +15,7 @@ import ( type activityProcessor struct { be Backend executor ActivityExecutor + logger Logger } type ActivityExecutor interface { @@ -22,14 +23,15 @@ type ActivityExecutor interface { } func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem] { - processor := newActivityProcessor(be, executor) + processor := newActivityProcessor(be, executor, logger) return NewTaskWorker(processor, logger, opts...) } -func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor[*ActivityWorkItem] { +func newActivityProcessor(be Backend, executor ActivityExecutor, logger Logger) TaskProcessor[*ActivityWorkItem] { return &activityProcessor{ be: be, executor: executor, + logger: logger, } } @@ -65,7 +67,7 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo }() } - // set the parent trace context to be the newly created activity span + // Set the parent trace context to be the current span for downstream propagation ts.ParentTraceContext = helpers.TraceContextFromSpan(span) // Execute the activity and get its result diff --git a/backend/executor.go b/backend/executor.go index b2536488..c309677b 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -169,6 +170,13 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta task := e.GetTaskScheduled() + // Extract trace context from the incoming context to propagate to Java client + // If task.ParentTraceContext is not set, extract it from the context + parentTraceCtx := task.ParentTraceContext + if parentTraceCtx == nil { + parentTraceCtx = executor.extractTraceContextFromCtx(ctx) + } + req := &protos.ActivityRequest{ Name: task.Name, Version: task.Version, @@ -176,7 +184,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)}, TaskId: e.EventId, TaskExecutionId: task.TaskExecutionId, - ParentTraceContext: task.ParentTraceContext, + ParentTraceContext: parentTraceCtx, } workItem := &protos.WorkItem{ Request: &protos.WorkItem_ActivityRequest{ @@ -266,6 +274,36 @@ func (g *grpcExecutor) Shutdown(ctx context.Context) error { return nil } +// extractTraceContextFromCtx extracts the trace context from the context and returns a TraceContext protobuf message +// This is used to propagate trace context through the ActivityRequest protobuf +func (g *grpcExecutor) extractTraceContextFromCtx(ctx context.Context) *protos.TraceContext { + spanCtx := trace.SpanFromContext(ctx).SpanContext() + if !spanCtx.IsValid() { + g.logger.Debug("No valid span context found in context") + return nil + } + + // Build W3C Trace Context traceparent header: version-traceId-spanId-traceFlags + traceparent := fmt.Sprintf("00-%s-%s-%02x", + spanCtx.TraceID().String(), + spanCtx.SpanID().String(), + spanCtx.TraceFlags()) + + traceCtx := &protos.TraceContext{ + TraceParent: traceparent, + } + + // Add tracestate if present + if stateStr := spanCtx.TraceState().String(); stateStr != "" { + traceCtx.TraceState = wrapperspb.String(stateStr) + } + + g.logger.Infof("Extracted trace context from ctx: traceID=%s spanID=%s flags=%02x", + spanCtx.TraceID(), spanCtx.SpanID(), spanCtx.TraceFlags()) + + return traceCtx +} + // Hello implements protos.TaskHubSidecarServiceServer func (grpcExecutor) Hello(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) { return empty, nil