diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java index 6384382b5..16f63a343 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java @@ -21,8 +21,7 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; -import io.temporal.activity.ActivityExecutionContext; public interface ActivityExecutionContextFactory { - ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope); + InternalActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java index c44e131e3..00f51f8c6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java @@ -21,7 +21,6 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; -import io.temporal.activity.ActivityExecutionContext; import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.external.ManualActivityCompletionClientFactory; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -61,7 +60,8 @@ public ActivityExecutionContextFactoryImpl( } @Override - public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) { + public InternalActivityExecutionContext createContext( + ActivityInfoInternal info, Scope metricsScope) { return new ActivityExecutionContextImpl( service, namespace, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java index 23dc211a5..f21a8df88 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java @@ -45,7 +45,7 @@ * @see ActivityExecutionContext */ @ThreadSafe -class ActivityExecutionContextImpl implements ActivityExecutionContext { +class ActivityExecutionContextImpl implements InternalActivityExecutionContext { private final Lock lock = new ReentrantLock(); private final ManualActivityCompletionClientFactory manualCompletionClientFactory; private final Functions.Proc completionHandle; @@ -165,4 +165,9 @@ public Scope getMetricsScope() { public ActivityInfo getInfo() { return info; } + + @Override + public Object getLastHeartbeatValue() { + return heartbeatContext.getLastHeartbeatDetails(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java index cbd9db27b..cab8ae592 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java @@ -23,7 +23,6 @@ import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure; import com.uber.m3.tally.Scope; -import io.temporal.activity.ActivityExecutionContext; import io.temporal.activity.ActivityInfo; import io.temporal.activity.DynamicActivity; import io.temporal.api.common.v1.Payload; @@ -76,7 +75,8 @@ public BaseActivityTaskExecutor( @Override public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) { - ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope); + InternalActivityExecutionContext context = + executionContextFactory.createContext(info, metricsScope); ActivityInfo activityInfo = context.getInfo(); ActivitySerializationContext serializationContext = new ActivitySerializationContext( @@ -133,7 +133,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri } return mapToActivityFailure( - ex, info.getActivityId(), metricsScope, local, dataConverterWithActivityContext); + ex, + info.getActivityId(), + context.getLastHeartbeatValue(), + metricsScope, + local, + dataConverterWithActivityContext); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java index 471e39bc8..2575c4af5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java @@ -137,7 +137,12 @@ public Result handle(ActivityTask activityTask, Scope metricsScope, boolean loca + knownTypes); } catch (Exception exception) { return mapToActivityFailure( - exception, pollResponse.getActivityId(), metricsScope, localActivity, dataConverter); + exception, + pollResponse.getActivityId(), + null, + metricsScope, + localActivity, + dataConverter); } } @@ -186,6 +191,7 @@ private void registerActivityImplementation(Object activity) { static ActivityTaskHandler.Result mapToActivityFailure( Throwable exception, String activityId, + @Nullable Object lastHeartbeatDetails, Scope metricsScope, boolean isLocalActivity, DataConverter dataConverter) { @@ -212,6 +218,9 @@ static ActivityTaskHandler.Result mapToActivityFailure( Failure failure = dataConverter.exceptionToFailure(exception); RespondActivityTaskFailedRequest.Builder result = RespondActivityTaskFailedRequest.newBuilder().setFailure(failure); + if (lastHeartbeatDetails != null) { + dataConverter.toPayloads(lastHeartbeatDetails).ifPresent(result::setLastHeartbeatDetails); + } return new ActivityTaskHandler.Result( activityId, null, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java index b0abdb555..940b0b259 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java @@ -35,4 +35,6 @@ interface HeartbeatContext { * @see io.temporal.activity.ActivityExecutionContext#getHeartbeatDetails(Class, Type) */ Optional getHeartbeatDetails(Class detailsClass, Type detailsGenericType); + + Object getLastHeartbeatDetails(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 3c62288f6..8259770fe 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -150,6 +150,19 @@ public Optional getHeartbeatDetails(Class detailsClass, Type detailsGe } } + @Override + public Object getLastHeartbeatDetails() { + lock.lock(); + try { + if (receivedAHeartbeat) { + return this.lastDetails; + } + return null; + } finally { + lock.unlock(); + } + } + private void doHeartBeatLocked(Object details) { long nextHeartbeatDelay; try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java new file mode 100644 index 000000000..6318a8ad2 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.activity; + +import io.temporal.activity.ActivityExecutionContext; + +/** + * Internal context object passed to an Activity implementation, providing more internal details + * than the user facing {@link ActivityExecutionContext}. + */ +public interface InternalActivityExecutionContext extends ActivityExecutionContext { + /** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */ + Object getLastHeartbeatValue(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java index afbbedc6a..3305b635a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java @@ -21,14 +21,14 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; -import io.temporal.activity.ActivityExecutionContext; public class LocalActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory { public LocalActivityExecutionContextFactoryImpl() {} @Override - public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) { + public InternalActivityExecutionContext createContext( + ActivityInfoInternal info, Scope metricsScope) { return new LocalActivityExecutionContextImpl(info, metricsScope); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java index 2baee3616..ad9c2a9d1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java @@ -21,14 +21,13 @@ package io.temporal.internal.activity; import com.uber.m3.tally.Scope; -import io.temporal.activity.ActivityExecutionContext; import io.temporal.activity.ActivityInfo; import io.temporal.activity.ManualActivityCompletionClient; import io.temporal.client.ActivityCompletionException; import java.lang.reflect.Type; import java.util.Optional; -class LocalActivityExecutionContextImpl implements ActivityExecutionContext { +class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext { private final ActivityInfo info; private final Scope metricsScope; @@ -88,4 +87,9 @@ public ManualActivityCompletionClient useLocalManualCompletion() { public Scope getMetricsScope() { return metricsScope; } + + @Override + public Object getLastHeartbeatValue() { + return null; + } } diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityHeartbeatSentOnFailureTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityHeartbeatSentOnFailureTest.java new file mode 100644 index 000000000..5068dfb18 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityHeartbeatSentOnFailureTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.activity; + +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityHeartbeatSentOnFailureTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new HeartBeatingActivityImpl()) + .build(); + + /** Tests that the last Activity#heartbeat value is sent if the activity fails. */ + @Test + public void activityHeartbeatSentOnFailure() { + TestWorkflows.NoArgsWorkflow workflow = + testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class); + workflow.execute(); + } + + public static class TestWorkflowImpl implements TestWorkflows.NoArgsWorkflow { + + private final TestActivities.NoArgsActivity activities = + Workflow.newActivityStub( + TestActivities.NoArgsActivity.class, + SDKTestOptions.newActivityOptions20sScheduleToClose()); + + @Override + public void execute() { + activities.execute(); + } + } + + public static class HeartBeatingActivityImpl implements TestActivities.NoArgsActivity { + @Override + public void execute() { + // If the heartbeat details are "3", then we know that the last heartbeat was sent. + if (Activity.getExecutionContext().getHeartbeatDetails(String.class).orElse("").equals("3")) { + return; + } + // Send 3 heartbeats and then fail, expecting the last heartbeat to be sent + // even though the activity fails and the last two attempts would normally be throttled. + Activity.getExecutionContext().heartbeat("1"); + Activity.getExecutionContext().heartbeat("2"); + Activity.getExecutionContext().heartbeat("3"); + throw new RuntimeException("simulated failure"); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 76fac5f36..0db572a85 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -1956,9 +1956,11 @@ private static State failActivityTask( RequestContext ctx, ActivityTaskData data, Object request, long notUsed) { if (request instanceof RespondActivityTaskFailedRequest) { RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request; + data.heartbeatDetails = req.getLastHeartbeatDetails(); return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity()); } else if (request instanceof RespondActivityTaskFailedByIdRequest) { RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request; + data.heartbeatDetails = req.getLastHeartbeatDetails(); return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity()); } else { throw new IllegalArgumentException("Unknown request: " + request);