Skip to content

Commit

Permalink
[#1962] Add newCallbackExecutor to interceptors and override for Open…
Browse files Browse the repository at this point in the history
…Tracing

This change adds newCallbackExecutor to the OutboundInterceptor interface,
overriding the OpenTracing behavior. This allows the OpenTracing interceptor
to transfer any active trace/span from the point of creation to the point of
deferred execution, thus preserving the span within the callback.

Without this, spans become disjoint across Promise.thenCompose, due to the
nature of the deferring the execution of the .thenCompose to a callback thread.

Fixes #1962

Signed-off-by: Greg Haskins <[email protected]>
  • Loading branch information
ghaskins committed Jan 13, 2025
1 parent cf06131 commit e714fc3
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,38 @@
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInfo;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.util.concurrent.Executor;

public class OpenTracingWorkflowOutboundCallsInterceptor
extends WorkflowOutboundCallsInterceptorBase {
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

private class SpanTransferringExecutor implements Executor {
private final Executor passthrough;
private final Span capturedSpan;

public SpanTransferringExecutor(Executor passthrough) {
this.passthrough = passthrough;
capturedSpan = tracer.scopeManager().activeSpan();
}

public void execute(Runnable r) {
Span activeSpan = tracer.scopeManager().activeSpan();

if (activeSpan == null && capturedSpan != null) {
// if there is no activeSpan AND we captured a span during construction,
// we should transfer it to the calling context as the new activespan
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
passthrough.execute(r);
}
} else {
passthrough.execute(r);
}
}
}

public OpenTracingWorkflowOutboundCallsInterceptor(
WorkflowOutboundCallsInterceptor next,
OpenTracingOptions options,
Expand Down Expand Up @@ -178,6 +203,12 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
return super.newChildThread(wrappedRunnable, detached, name);
}

@Override
public Executor newCallbackExecutor() {
Executor passthrough = super.newCallbackExecutor();
return new SpanTransferringExecutor(passthrough);
}

private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) {
WorkflowInfo workflowInfo = Workflow.getInfo();
return spanFactory.createActivityStartSpan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.opentracing;

import static org.junit.Assert.assertEquals;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.util.ThreadLocalScopeManager;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.workflow.*;
import java.time.Duration;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

public class CallbackContextTest {

private static final MockTracer mockTracer =
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);

private final OpenTracingOptions OT_OPTIONS =
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
.validateAndBuildWithDefaults())
.setWorkerFactoryOptions(
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
.validateAndBuildWithDefaults())
.setWorkflowTypes(WorkflowImpl.class)
.setActivityImplementations(new ActivityImpl())
.build();

@After
public void tearDown() {
mockTracer.reset();
}

@ActivityInterface
public interface TestActivity {
@ActivityMethod
boolean activity();
}

@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
String workflow();
}

public static class ActivityImpl implements TestActivity {
@Override
public boolean activity() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}

public static class WorkflowImpl implements TestWorkflow {
private final TestActivity activity =
Workflow.newActivityStub(
TestActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(1))
.validateAndBuildWithDefaults());

@Override
public String workflow() {
return Async.function(activity::activity)
.thenCompose(
(r) -> {
Span activeSpan = mockTracer.activeSpan();
return Workflow.newPromise(
activeSpan != null ? activeSpan.context().toTraceId() : "not-found");
})
.get();
}
}

@Test
public void testCallbackContext() {
MockSpan span = mockTracer.buildSpan("ClientFunction").start();

WorkflowClient client = testWorkflowRule.getWorkflowClient();
try (Scope scope = mockTracer.scopeManager().activate(span)) {
TestWorkflow workflow =
client.newWorkflowStub(
TestWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.validateBuildWithDefaults());
assertEquals(span.context().toTraceId(), workflow.workflow());
} finally {
span.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -798,5 +799,17 @@ <R> R mutableSideEffect(
*/
Object newChildThread(Runnable runnable, boolean detached, String name);

/**
* Intercepts the point where a new callback is being prepared for deferment and allows
* interceptors to provide a wrapped execution environment for running the callback at a later
* time.
*
* <p>The executor's execute() function _must_ fully execute the provided Runnable within the
* caller's thread or determinism guarantees could be violated.
*
* @return created Executor
*/
Executor newCallbackExecutor();

long currentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -184,6 +185,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
}

@Override
public Executor newCallbackExecutor() {
return next.newCallbackExecutor();
}

@Override
public long currentTimeMillis() {
return next.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -40,6 +41,7 @@ class CompletablePromiseImpl<V> implements CompletablePromise<V> {
private final List<Functions.Proc> handlers = new ArrayList<>();
private final DeterministicRunnerImpl runner;
private boolean registeredWithRunner;
private Executor callbackExecutor;

@SuppressWarnings("unchecked")
static Promise<Object> promiseAnyOf(Promise<?>[] promises) {
Expand All @@ -62,7 +64,10 @@ static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
}

CompletablePromiseImpl() {
runner = DeterministicRunnerImpl.currentThreadInternal().getRunner();
WorkflowThread workflowThread = DeterministicRunnerImpl.currentThreadInternal();
runner = workflowThread.getRunner();
callbackExecutor =
workflowThread.getWorkflowContext().getWorkflowOutboundInterceptor().newCallbackExecutor();
}

@Override
Expand Down Expand Up @@ -275,9 +280,14 @@ private <U> Promise<U> then(Functions.Proc1<CompletablePromise<U>> proc) {
* @return true if there were any handlers invoked
*/
private boolean invokeHandlers() {
for (Functions.Proc handler : handlers) {
handler.apply();
}
// execute synchronously to this thread, but under the context established in the constructor
callbackExecutor.execute(
() -> {
for (Functions.Proc handler : handlers) {
handler.apply();
}
});

return !handlers.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -412,6 +413,12 @@ public WorkflowMetadata getWorkflowMetadata() {
return workflowMetadata.build();
}

private class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}

private class ActivityCallback {
private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();

Expand Down Expand Up @@ -1419,6 +1426,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
return runner.newWorkflowThread(runnable, detached, name);
}

@Override
public Executor newCallbackExecutor() {
return new DirectExecutor();
}

@Override
public long currentTimeMillis() {
return replayContext.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Executor newCallbackExecutor() {
throw new UnsupportedOperationException("not implemented");
}

@Override
public long currentTimeMillis() {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -433,6 +434,15 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
}

@Override
public Executor newCallbackExecutor() {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("newCallbackExecutor ");
}

return next.newCallbackExecutor();
}

@Override
public long currentTimeMillis() {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down

0 comments on commit e714fc3

Please sign in to comment.