Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1962] Add newCallbackExecutor to interceptors and override for OpenTracing #2366

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,141 @@
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInfo;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OpenTracingWorkflowOutboundCallsInterceptor
extends WorkflowOutboundCallsInterceptorBase {
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

private class SpanTransferringExecutor implements Executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this executor and the interceptor for it now since it is not needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was debating that too. My thought was leaving it would mean any Promise could benefit from the wrapping. Still, my primary concern is supporting propagation to Activity/LocalActivity, so I am happy either way and can remove this. This decision can always be revisited in a later PR if generalizing propagation to any Promise becomes a desired feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can never remove an extension point after we add it. So if it is not needed to solve this problem we shouldn't add it. We can always add it later if the need arises.

private final Executor passthrough;
private final Span capturedSpan;

public SpanTransferringExecutor(Executor passthrough) {
this.passthrough = passthrough;
capturedSpan = tracer.scopeManager().activeSpan();
}

public void execute(Runnable r) {
Span activeSpan = tracer.scopeManager().activeSpan();

if (activeSpan == null && capturedSpan != null) {
// if there is no activeSpan AND we captured a span during construction,
// we should transfer it to the calling context as the new active span
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
passthrough.execute(r);
}
} else {
passthrough.execute(r);
}
}
}

private class PromiseWrapper<R> implements Promise<R> {
private final Span capturedSpan;
private final Promise<R> delegate;

PromiseWrapper(Span capturedSpan, Promise<R> delegate) {
this.capturedSpan = capturedSpan;
this.delegate = delegate;
}

@Override
public boolean isCompleted() {
return delegate.isCompleted();
}

@Override
public R get() {
return delegate.get();
}

@Override
public R cancellableGet() {
return delegate.cancellableGet();
}

@Override
public R get(long timeout, TimeUnit unit) throws TimeoutException {
return delegate.get(timeout, unit);
}

@Override
public R cancellableGet(long timeout, TimeUnit unit) throws TimeoutException {
return delegate.cancellableGet(timeout, unit);
}

@Override
public RuntimeException getFailure() {
return delegate.getFailure();
}

@Override
public <U> Promise<U> thenApply(Functions.Func1<? super R, ? extends U> fn) {
return delegate.thenApply(
(r) -> {
if (capturedSpan != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should copy the logic we had for SpanTransferringExecutor where we ignored the capturedSpan if one is already active

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally left that out because it was doing wrong by inheriting the context created by the SpanTransferringExecutor inside the activity span. If we remove the executor, as you said in the comment below, this will be less of an issue. Still, it got me thinking about the correct behavior here, and it felt like the capturedSpan is always the proper choice. I am not married to this, though. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in insolation, I think it is debatable if the capturedSpan or the span of the caller of thenApply should be used. That being said, the current behaviour and the behaviour of OTEL + Completable futures is more closely mimicked by ignoring the capturedSpan if one is already active.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will re-add the activeSpan checks.

try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
return fn.apply(r);
}
} else {
return fn.apply(r);
}
});
}

@Override
public <U> Promise<U> handle(Functions.Func2<? super R, RuntimeException, ? extends U> fn) {
return delegate.handle(
(r, e) -> {
if (capturedSpan != null) {
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
return fn.apply(r, e);
}
} else {
return fn.apply(r, e);
}
});
}

@Override
public <U> Promise<U> thenCompose(Functions.Func1<? super R, ? extends Promise<U>> fn) {
return delegate.thenCompose(
(R r) -> {
if (capturedSpan != null) {
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
return fn.apply(r);
}
} else {
return fn.apply(r);
}
});
}

@Override
public Promise<R> exceptionally(Functions.Func1<Throwable, ? extends R> fn) {
return delegate.exceptionally(
(Throwable t) -> {
if (capturedSpan != null) {
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
return fn.apply(t);
}
} else {
return fn.apply(t);
}
});
}
}

public OpenTracingWorkflowOutboundCallsInterceptor(
WorkflowOutboundCallsInterceptor next,
OpenTracingOptions options,
Expand All @@ -51,13 +176,16 @@ public OpenTracingWorkflowOutboundCallsInterceptor(
@Override
public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Span capturedSpan = tracer.scopeManager().activeSpan();
Span activityStartSpan =
contextAccessor.writeSpanContextToHeader(
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
return super.executeActivity(input);
ActivityOutput<R> output = super.executeActivity(input);
return new ActivityOutput<>(
output.getActivityId(), new PromiseWrapper<>(capturedSpan, output.getResult()));
} finally {
activityStartSpan.finish();
}
Expand All @@ -69,13 +197,15 @@ public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
@Override
public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Span capturedSpan = tracer.scopeManager().activeSpan();
Span activityStartSpan =
contextAccessor.writeSpanContextToHeader(
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
return super.executeLocalActivity(input);
LocalActivityOutput<R> output = super.executeLocalActivity(input);
return new LocalActivityOutput<>(new PromiseWrapper<>(capturedSpan, output.getResult()));
} finally {
activityStartSpan.finish();
}
Expand All @@ -87,11 +217,14 @@ public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> inp
@Override
public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Span capturedSpan = tracer.scopeManager().activeSpan();
Span childWorkflowStartSpan =
contextAccessor.writeSpanContextToHeader(
() -> createChildWorkflowStartSpanBuilder(input).start(), input.getHeader(), tracer);
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
return super.executeChildWorkflow(input);
ChildWorkflowOutput<R> output = super.executeChildWorkflow(input);
return new ChildWorkflowOutput<>(
new PromiseWrapper<>(capturedSpan, output.getResult()), output.getWorkflowExecution());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.getWorkflowExecution() is also a Promise that needs to be wrapped

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will fix.

} finally {
childWorkflowStartSpan.finish();
}
Expand All @@ -104,13 +237,16 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
ExecuteNexusOperationInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Span capturedSpan = tracer.scopeManager().activeSpan();
Span nexusOperationExecuteSpan =
contextAccessor.writeSpanContextToHeader(
() -> createStartNexusOperationSpanBuilder(input).start(),
input.getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) {
return super.executeNexusOperation(input);
ExecuteNexusOperationOutput<R> output = super.executeNexusOperation(input);
return new ExecuteNexusOperationOutput<>(
new PromiseWrapper<>(capturedSpan, output.getResult()), output.getOperationExecution());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.getWorkflowExecution() is a promise that needs to be wrapped

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

} finally {
nexusOperationExecuteSpan.finish();
}
Expand All @@ -122,6 +258,7 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
@Override
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Span capturedSpan = tracer.scopeManager().activeSpan();
WorkflowInfo workflowInfo = Workflow.getInfo();
Span childWorkflowStartSpan =
contextAccessor.writeSpanContextToHeader(
Expand All @@ -136,7 +273,8 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
return super.signalExternalWorkflow(input);
SignalExternalOutput output = super.signalExternalWorkflow(input);
return new SignalExternalOutput(new PromiseWrapper<>(capturedSpan, output.getResult()));
} finally {
childWorkflowStartSpan.finish();
}
Expand Down Expand Up @@ -178,6 +316,12 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
return super.newChildThread(wrappedRunnable, detached, name);
}

@Override
public Executor newCallbackExecutor() {
Executor passthrough = super.newCallbackExecutor();
return new SpanTransferringExecutor(passthrough);
}

private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) {
WorkflowInfo workflowInfo = Workflow.getInfo();
return spanFactory.createActivityStartSpan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.opentracing;

import static org.junit.Assert.assertEquals;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.util.ThreadLocalScopeManager;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.workflow.*;
import java.time.Duration;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

public class CallbackContextTest {

private static final MockTracer mockTracer =
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);

private final OpenTracingOptions OT_OPTIONS =
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
.validateAndBuildWithDefaults())
.setWorkerFactoryOptions(
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
.validateAndBuildWithDefaults())
.setWorkflowTypes(WorkflowImpl.class)
.setActivityImplementations(new ActivityImpl())
.build();

@After
public void tearDown() {
mockTracer.reset();
}

@ActivityInterface
public interface TestActivity {
@ActivityMethod
boolean activity();
}

@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
String workflow();
}

public static class ActivityImpl implements TestActivity {
@Override
public boolean activity() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}

public static class WorkflowImpl implements TestWorkflow {
private final TestActivity activity =
Workflow.newActivityStub(
TestActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(1))
.validateAndBuildWithDefaults());

@Override
public String workflow() {
String workflowSpanId = mockTracer.activeSpan().context().toSpanId();
return Async.function(activity::activity)
.thenCompose(
(r) -> {
Span activeSpan = mockTracer.activeSpan();
ghaskins marked this conversation as resolved.
Show resolved Hide resolved

if (activeSpan != null) {
String promiseSpanId = activeSpan.context().toSpanId();
if (promiseSpanId.equals(workflowSpanId)) {
return Workflow.newPromise(activeSpan.context().toTraceId());
} else {
return Workflow.newPromise("bad-span");
}
} else {
return Workflow.newPromise("not-found");
}
})
.get();
}
}

@Test
public void testCallbackContext() {
MockSpan span = mockTracer.buildSpan("ClientFunction").start();

WorkflowClient client = testWorkflowRule.getWorkflowClient();
try (Scope scope = mockTracer.scopeManager().activate(span)) {
TestWorkflow workflow =
client.newWorkflowStub(
TestWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.validateBuildWithDefaults());
assertEquals(span.context().toTraceId(), workflow.workflow());
} finally {
span.finish();
}
}
}
Loading