Skip to content

Commit 82d3c93

Browse files
Update Java SDK for Temporal Sever v1.26.2 (#2357)
Lower refreshNexusEndpointsMinWait
1 parent c3e0e77 commit 82d3c93

File tree

12 files changed

+137
-76
lines changed

12 files changed

+137
-76
lines changed

docker/github/dynamicconfig/development.yaml

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ component.nexusoperations.callback.endpoint.template:
2727
component.callbacks.allowedAddresses:
2828
- value:
2929
- Pattern: "localhost:7243"
30-
AllowInsecure: true
30+
AllowInsecure: true
31+
system.refreshNexusEndpointsMinWait:
32+
- value: 1ms

temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,14 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
189189
}
190190
case FAILUREINFO_NOT_SET:
191191
default:
192-
throw new IllegalArgumentException("Failure info not set");
192+
// All unknown types are considered to be retryable ApplicationError.
193+
return ApplicationFailure.newFromValues(
194+
failure.getMessage(),
195+
"",
196+
false,
197+
new EncodedValues(Optional.empty(), dataConverter),
198+
cause,
199+
null);
193200
}
194201
}
195202

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

+2
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ public Throwable wrapFailure(ActivityTask t, Throwable failure) {
334334
failure);
335335
}
336336

337+
// TODO: Suppress warning until the SDK supports deployment
338+
@SuppressWarnings("deprecation")
337339
private void sendReply(
338340
ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
339341
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

+4
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,8 @@ private WorkflowTaskHandler.Result handleTask(
486486
}
487487
}
488488

489+
// TODO: Suppress warning until the SDK supports deployment
490+
@SuppressWarnings("deprecation")
489491
private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
490492
ByteString taskToken,
491493
RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
@@ -514,6 +516,8 @@ private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
514516
grpcRetryOptions);
515517
}
516518

519+
// TODO: Suppress warning until the SDK supports deployment
520+
@SuppressWarnings("deprecation")
517521
private void sendTaskFailed(
518522
ByteString taskToken,
519523
RespondWorkflowTaskFailedRequest.Builder taskFailed,

temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.hamcrest.CoreMatchers;
5757
import org.hamcrest.MatcherAssert;
5858
import org.junit.*;
59-
import org.junit.rules.Timeout;
6059
import org.junit.runner.RunWith;
6160

6261
/**
@@ -70,13 +69,11 @@
7069
*/
7170
@RunWith(JUnitParamsRunner.class)
7271
public class ActivityTimeoutTest {
73-
@Rule
74-
public SDKTestWorkflowRule testWorkflowRule =
75-
SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build();
76-
7772
// TODO This test takes longer than it should to complete because
7873
// of the cached heartbeat that prevents a quick shutdown
79-
public @Rule Timeout timeout = Timeout.seconds(15);
74+
@Rule
75+
public SDKTestWorkflowRule testWorkflowRule =
76+
SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build();
8077

8178
/**
8279
* An activity reaches startToClose timeout once, max retries are set to 1. o

temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java

-11
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,6 @@ public void syncOperationImmediatelyCancelled() {
5858
"operation canceled before it was started", canceledFailure.getOriginalMessage());
5959
}
6060

61-
@Test
62-
public void syncOperationCancelled() {
63-
TestWorkflows.TestWorkflow1 workflowStub =
64-
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
65-
WorkflowFailedException exception =
66-
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
67-
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
68-
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
69-
Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure);
70-
}
71-
7261
public static class TestNexus implements TestWorkflows.TestWorkflow1 {
7362
@Override
7463
public String execute(String input) {

temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.client.WorkflowOptions;
3030
import io.temporal.failure.ApplicationFailure;
3131
import io.temporal.failure.NexusOperationFailure;
32+
import io.temporal.failure.TerminatedFailure;
3233
import io.temporal.nexus.WorkflowClientOperationHandlers;
3334
import io.temporal.testing.internal.SDKTestWorkflowRule;
3435
import io.temporal.workflow.*;
@@ -54,10 +55,20 @@ public void terminateAsyncOperation() {
5455
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
5556
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
5657
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
57-
Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure);
58-
Assert.assertEquals(
59-
"operation terminated",
60-
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
58+
// TODO(https://github.com/temporalio/sdk-java/issues/2358): Test server needs to be fixed to
59+
// return the correct type
60+
Assert.assertTrue(
61+
nexusFailure.getCause() instanceof ApplicationFailure
62+
|| nexusFailure.getCause() instanceof TerminatedFailure);
63+
if (nexusFailure.getCause() instanceof ApplicationFailure) {
64+
Assert.assertEquals(
65+
"operation terminated",
66+
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
67+
} else {
68+
Assert.assertEquals(
69+
"operation terminated",
70+
((TerminatedFailure) nexusFailure.getCause()).getOriginalMessage());
71+
}
6172
}
6273

6374
@Service

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

+2
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ static final class ActivityTaskData {
300300
TestServiceRetryState retryState;
301301
Duration nextBackoffInterval;
302302
String identity;
303+
Timestamp lastAttemptCompleteTime;
303304

304305
ActivityTaskData(
305306
TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
@@ -2112,6 +2113,7 @@ private static RetryState attemptActivityRetry(
21122113
ctx.onCommit(
21132114
(historySize) -> {
21142115
data.retryState = nextAttempt;
2116+
data.lastAttemptCompleteTime = ctx.currentTime();
21152117
task.setAttempt(nextAttempt.getAttempt());
21162118
task.setCurrentAttemptScheduledTime(ctx.currentTime());
21172119
});

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

+40-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import com.google.common.base.Preconditions;
3030
import com.google.common.base.Strings;
31+
import com.google.protobuf.Any;
3132
import com.google.protobuf.ByteString;
3233
import com.google.protobuf.InvalidProtocolBufferException;
3334
import com.google.protobuf.Timestamp;
@@ -86,6 +87,17 @@
8687
import org.slf4j.LoggerFactory;
8788

8889
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
90+
static final Failure FAILED_UPDATE_ON_WF_COMPLETION =
91+
Failure.newBuilder()
92+
.setMessage(
93+
"Workflow Update failed because the Workflow completed before the Update completed.")
94+
.setSource("Server")
95+
.setApplicationFailureInfo(
96+
ApplicationFailureInfo.newBuilder()
97+
.setType("AcceptedUpdateCompletedWorkflow")
98+
.setNonRetryable(true)
99+
.build())
100+
.build();
89101

90102
/**
91103
* If the implementation throws an exception, changes accumulated in the RequestContext will not
@@ -541,6 +553,7 @@ public void completeWorkflowTask(
541553
|| request.getForceCreateNewWorkflowTask())) {
542554
scheduleWorkflowTask(ctx);
543555
}
556+
544557
workflowTaskStateMachine.getData().bufferedEvents.clear();
545558
Map<String, ConsistentQuery> queries = data.consistentQueryRequests;
546559
Map<String, WorkflowQueryResult> queryResultsMap = request.getQueryResultsMap();
@@ -1671,6 +1684,27 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
16711684
return;
16721685
}
16731686

1687+
updates.forEach(
1688+
(k, updateStateMachine) -> {
1689+
if (!(updateStateMachine.getState() == StateMachines.State.COMPLETED
1690+
|| updateStateMachine.getState() == StateMachines.State.FAILED)) {
1691+
updateStateMachine.action(
1692+
Action.COMPLETE,
1693+
ctx,
1694+
Message.newBuilder()
1695+
.setBody(
1696+
Any.pack(
1697+
Response.newBuilder()
1698+
.setOutcome(
1699+
Outcome.newBuilder()
1700+
.setFailure(FAILED_UPDATE_ON_WF_COMPLETION)
1701+
.build())
1702+
.build()))
1703+
.build(),
1704+
completionEvent.get().getEventId());
1705+
}
1706+
});
1707+
16741708
for (Callback cb : startRequest.getCompletionCallbacksList()) {
16751709
if (!cb.hasNexus()) {
16761710
// test server only supports nexus callbacks currently
@@ -3101,6 +3135,10 @@ private static PendingActivityInfo constructPendingActivityInfo(
31013135
builder.setLastWorkerIdentity(activityTaskData.identity);
31023136
}
31033137

3138+
if (activityTaskData.lastAttemptCompleteTime != null) {
3139+
builder.setLastAttemptCompleteTime(activityTaskData.lastAttemptCompleteTime);
3140+
}
3141+
31043142
// Some ids are only present in the schedule event...
31053143
if (activityTaskData.scheduledEvent != null) {
31063144
populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent);
@@ -3145,12 +3183,8 @@ private static void populatePendingActivityInfoFromScheduledEvent(
31453183

31463184
private static void populatePendingActivityInfoFromPollResponse(
31473185
PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) {
3148-
// In golang, we set one but never both of these fields, depending on the activity state
3149-
if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) {
3150-
builder.setScheduledTime(task.getScheduledTime());
3151-
} else {
3152-
builder.setLastStartedTime(task.getStartedTime());
3153-
}
3186+
builder.setScheduledTime(task.getScheduledTime());
3187+
builder.setLastStartedTime(task.getStartedTime());
31543188
}
31553189

31563190
private static void populatePendingActivityInfoFromHeartbeatDetails(

temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowExecutionTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public void testSuccessfulActivity() throws InterruptedException {
175175
.setMaximumAttempts(2)
176176
// times should be present, but we can't know what the expected value is if this test is
177177
// going to run against the real server.
178+
.setScheduledTime(actual.getScheduledTime())
178179
.setLastStartedTime(actual.getLastStartedTime())
179180
.setExpirationTime(actual.getExpirationTime())
180181
.build();
@@ -266,8 +267,10 @@ public void testFailedActivity() throws InterruptedException {
266267
.setMaximumAttempts(2)
267268
// times should be present, but we can't know what the expected value is if this test is
268269
// going to run against the real server.
270+
.setScheduledTime(actual.getScheduledTime())
269271
.setLastStartedTime(actual.getLastStartedTime())
270272
.setExpirationTime(actual.getExpirationTime())
273+
.setLastAttemptCompleteTime(actual.getLastAttemptCompleteTime())
271274
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
272275
// it.
273276
.setLastWorkerIdentity(actual.getLastWorkerIdentity())
@@ -333,6 +336,7 @@ private void testKilledWorkflow(
333336
.setMaximumAttempts(2)
334337
// times should be present, but we can't know what the expected value is if this test is
335338
// going to run against the real server.
339+
.setScheduledTime(actual.getScheduledTime())
336340
.setLastStartedTime(actual.getLastStartedTime())
337341
.setExpirationTime(actual.getExpirationTime())
338342
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know

temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowUpdateTest.java

+55-46
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
package io.temporal.testserver.functional;
2222

23-
import static org.junit.Assume.assumeFalse;
24-
2523
import io.grpc.Status;
2624
import io.grpc.StatusRuntimeException;
2725
import io.temporal.api.common.v1.Payloads;
@@ -548,7 +546,6 @@ public void updateAndPollByWorkflowId() {
548546
@Test
549547
public void getCompletedUpdateOfCompletedWorkflow() {
550548
// Assert that we can get and poll a completed update from a completed workflow.
551-
assumeFalse("Skipping as real server has a bug", SDKTestWorkflowRule.useExternalService);
552549

553550
WorkflowOptions options =
554551
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
@@ -593,7 +590,7 @@ public void getCompletedUpdateOfCompletedWorkflow() {
593590

594591
@Test
595592
public void getIncompleteUpdateOfCompletedWorkflow() {
596-
// Assert that we can't get an incomplete update of a completed workflow. Expect a NOT_FOUND
593+
// Assert that the server fails an incomplete update if the workflow is completed.
597594
WorkflowOptions options =
598595
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
599596

@@ -617,48 +614,60 @@ public void getIncompleteUpdateOfCompletedWorkflow() {
617614
workflowStub.signal();
618615
workflowStub.execute();
619616

620-
StatusRuntimeException exception =
621-
Assert.assertThrows(
622-
StatusRuntimeException.class,
623-
() ->
624-
updateWorkflow(
625-
exec,
626-
"updateId",
627-
UpdateWorkflowExecutionLifecycleStage
628-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
629-
TestWorkflows.UpdateType.BLOCK));
630-
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
631-
exception =
632-
Assert.assertThrows(
633-
StatusRuntimeException.class,
634-
() ->
635-
updateWorkflow(
636-
exec,
637-
"updateId",
638-
UpdateWorkflowExecutionLifecycleStage
639-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
640-
TestWorkflows.UpdateType.BLOCK));
641-
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
642-
exception =
643-
Assert.assertThrows(
644-
StatusRuntimeException.class,
645-
() ->
646-
pollWorkflowUpdate(
647-
exec,
648-
"updateId",
649-
UpdateWorkflowExecutionLifecycleStage
650-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED));
651-
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
652-
exception =
653-
Assert.assertThrows(
654-
StatusRuntimeException.class,
655-
() ->
656-
pollWorkflowUpdate(
657-
exec,
658-
"updateId",
659-
UpdateWorkflowExecutionLifecycleStage
660-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED));
661-
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
617+
response =
618+
updateWorkflow(
619+
exec,
620+
"updateId",
621+
UpdateWorkflowExecutionLifecycleStage
622+
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
623+
TestWorkflows.UpdateType.BLOCK);
624+
Assert.assertEquals(
625+
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
626+
response.getStage());
627+
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());
628+
629+
response =
630+
updateWorkflow(
631+
exec,
632+
"updateId",
633+
UpdateWorkflowExecutionLifecycleStage
634+
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
635+
TestWorkflows.UpdateType.BLOCK);
636+
Assert.assertEquals(
637+
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
638+
response.getStage());
639+
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());
640+
641+
PollWorkflowExecutionUpdateResponse pollResponse =
642+
pollWorkflowUpdate(
643+
exec,
644+
"updateId",
645+
UpdateWorkflowExecutionLifecycleStage
646+
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
647+
Assert.assertEquals(
648+
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
649+
pollResponse.getStage());
650+
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());
651+
652+
pollResponse =
653+
pollWorkflowUpdate(
654+
exec,
655+
"updateId",
656+
UpdateWorkflowExecutionLifecycleStage
657+
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
658+
Assert.assertEquals(
659+
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
660+
pollResponse.getStage());
661+
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());
662+
}
663+
664+
private void assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(Outcome outcome) {
665+
Assert.assertEquals(
666+
"Workflow Update failed because the Workflow completed before the Update completed.",
667+
outcome.getFailure().getMessage());
668+
Assert.assertEquals(
669+
"AcceptedUpdateCompletedWorkflow",
670+
outcome.getFailure().getApplicationFailureInfo().getType());
662671
}
663672

664673
private UpdateWorkflowExecutionResponse updateWorkflow(

0 commit comments

Comments
 (0)