Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions api/helpers/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/api/semconv"
)

var tracer = otel.Tracer("durabletask")
Expand All @@ -22,9 +23,9 @@ func StartNewCreateOrchestrationSpan(
ctx context.Context, name string, version string, instanceID string,
) (context.Context, trace.Span) {
attributes := []attribute.KeyValue{
{Key: "durabletask.type", Value: attribute.StringValue("orchestration")},
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
semconv.DaprWorkflowTypeKey.String("orchestration"),
semconv.DaprWorkflowNameKey.String(name),
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
}
return startNewSpan(ctx, "create_orchestration", name, version, attributes, trace.SpanKindClient, time.Now().UTC())
}
Expand All @@ -36,9 +37,9 @@ func StartNewRunOrchestrationSpan(
instanceID := es.OrchestrationInstance.InstanceId
version := es.Version.GetValue()
attributes := []attribute.KeyValue{
{Key: "durabletask.type", Value: attribute.StringValue("orchestration")},
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
semconv.DaprWorkflowTypeKey.String("orchestration"),
semconv.DaprWorkflowNameKey.String(name),
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
}
return startNewSpan(ctx, "orchestration", name, version, attributes, trace.SpanKindServer, startedTime)
}
Expand All @@ -47,20 +48,20 @@ func StartNewActivitySpan(
ctx context.Context, name string, version string, instanceID string, taskID int32,
) (context.Context, trace.Span) {
attributes := []attribute.KeyValue{
{Key: "durabletask.type", Value: attribute.StringValue("activity")},
{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
{Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(taskID))},
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
semconv.DaprWorkflowTypeKey.String("activity"),
semconv.DaprWorkflowNameKey.String(name),
semconv.DaprWorkflowTaskIdKey.Int64(int64(taskID)),
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
}
return startNewSpan(ctx, "activity", name, version, attributes, trace.SpanKindServer, time.Now().UTC())
}

func StartAndEndNewTimerSpan(ctx context.Context, tf *protos.TimerFiredEvent, createdTime time.Time, instanceID string) error {
attributes := []attribute.KeyValue{
{Key: "durabletask.type", Value: attribute.StringValue("timer")},
{Key: "durabletask.fire_at", Value: attribute.StringValue(tf.FireAt.AsTime().Format(time.RFC3339))}, // time.RFC3339 most closely maps to ISO 8601
{Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(tf.TimerId))},
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
semconv.DaprWorkflowTypeKey.String("timer"),
semconv.DaprWorkflowTimerFireAtKey.String(tf.FireAt.AsTime().Format(time.RFC3339)), // time.RFC3339 most closely maps to ISO 8601
semconv.DaprWorkflowTimerIdKey.Int64(int64(tf.TimerId)),
semconv.DaprWorkflowInstanceIdKey.String(instanceID),
}

_, span := startNewSpan(ctx, "timer", "", "", attributes, trace.SpanKindInternal, createdTime)
Expand All @@ -80,10 +81,7 @@ func startNewSpan(
var spanName string
if taskVersion != "" {
spanName = taskType + "||" + taskName + "||" + taskVersion
attributes = append(attributes, attribute.KeyValue{
Key: "durabletask.task.version",
Value: attribute.StringValue(taskVersion),
})
attributes = append(attributes, semconv.DaprWorkflowVersionKey.String(taskVersion))
} else if taskName != "" {
spanName = taskType + "||" + taskName
} else {
Expand Down
63 changes: 63 additions & 0 deletions api/semconv/workflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ import (
type activityProcessor struct {
be Backend
executor ActivityExecutor
logger Logger
}

type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem] {
processor := newActivityProcessor(be, executor)
processor := newActivityProcessor(be, executor, logger)
return NewTaskWorker(processor, logger, opts...)
}

func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor[*ActivityWorkItem] {
func newActivityProcessor(be Backend, executor ActivityExecutor, logger Logger) TaskProcessor[*ActivityWorkItem] {
return &activityProcessor{
be: be,
executor: executor,
logger: logger,
}
}

Expand Down Expand Up @@ -65,7 +67,7 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo
}()
}

// set the parent trace context to be the newly created activity span
// Set the parent trace context to be the current span for downstream propagation
ts.ParentTraceContext = helpers.TraceContextFromSpan(span)

// Execute the activity and get its result
Expand Down
40 changes: 39 additions & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -169,14 +170,21 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta

task := e.GetTaskScheduled()

// Extract trace context from the incoming context to propagate to Java client
// If task.ParentTraceContext is not set, extract it from the context
parentTraceCtx := task.ParentTraceContext
if parentTraceCtx == nil {
parentTraceCtx = executor.extractTraceContextFromCtx(ctx)
}

req := &protos.ActivityRequest{
Name: task.Name,
Version: task.Version,
Input: task.Input,
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
TaskId: e.EventId,
TaskExecutionId: task.TaskExecutionId,
ParentTraceContext: task.ParentTraceContext,
ParentTraceContext: parentTraceCtx,
}
workItem := &protos.WorkItem{
Request: &protos.WorkItem_ActivityRequest{
Expand Down Expand Up @@ -266,6 +274,36 @@ func (g *grpcExecutor) Shutdown(ctx context.Context) error {
return nil
}

// extractTraceContextFromCtx extracts the trace context from the context and returns a TraceContext protobuf message
// This is used to propagate trace context through the ActivityRequest protobuf
func (g *grpcExecutor) extractTraceContextFromCtx(ctx context.Context) *protos.TraceContext {
spanCtx := trace.SpanFromContext(ctx).SpanContext()
if !spanCtx.IsValid() {
g.logger.Debug("No valid span context found in context")
return nil
}

// Build W3C Trace Context traceparent header: version-traceId-spanId-traceFlags
traceparent := fmt.Sprintf("00-%s-%s-%02x",
spanCtx.TraceID().String(),
spanCtx.SpanID().String(),
spanCtx.TraceFlags())

traceCtx := &protos.TraceContext{
TraceParent: traceparent,
}

// Add tracestate if present
if stateStr := spanCtx.TraceState().String(); stateStr != "" {
traceCtx.TraceState = wrapperspb.String(stateStr)
}

g.logger.Infof("Extracted trace context from ctx: traceID=%s spanID=%s flags=%02x",
spanCtx.TraceID(), spanCtx.SpanID(), spanCtx.TraceFlags())

return traceCtx
}

// Hello implements protos.TaskHubSidecarServiceServer
func (grpcExecutor) Hello(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
return empty, nil
Expand Down
Loading