Skip to content

Commit

Permalink
Fix more test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Dec 30, 2024
1 parent 7356e2a commit 97054ec
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ static final class ActivityTaskData {
TestServiceRetryState retryState;
Duration nextBackoffInterval;
String identity;
Timestamp lastAttemptCompleteTime;

ActivityTaskData(
TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
Expand Down Expand Up @@ -2112,6 +2113,7 @@ private static RetryState attemptActivityRetry(
ctx.onCommit(
(historySize) -> {
data.retryState = nextAttempt;
data.lastAttemptCompleteTime = ctx.currentTime();
task.setAttempt(nextAttempt.getAttempt());
task.setCurrentAttemptScheduledTime(ctx.currentTime());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -86,6 +87,17 @@
import org.slf4j.LoggerFactory;

class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
static final Failure FAILED_UPDATE_ON_WF_COMPLETION =
Failure.newBuilder()
.setMessage(
"Workflow Update failed because the Workflow completed before the Update completed.")
.setSource("Server")
.setApplicationFailureInfo(
ApplicationFailureInfo.newBuilder()
.setType("AcceptedUpdateCompletedWorkflow")
.setNonRetryable(true)
.build())
.build();

/**
* If the implementation throws an exception, changes accumulated in the RequestContext will not
Expand Down Expand Up @@ -541,6 +553,28 @@ public void completeWorkflowTask(
|| request.getForceCreateNewWorkflowTask())) {
scheduleWorkflowTask(ctx);
}
if (completed) {
updates.forEach(
(k, updateStateMachine) -> {
if (!(updateStateMachine.getState() == StateMachines.State.COMPLETED
|| updateStateMachine.getState() == StateMachines.State.FAILED)) {
updateStateMachine.action(
Action.COMPLETE,
ctx,
Message.newBuilder()
.setBody(
Any.pack(
Response.newBuilder()
.setOutcome(
Outcome.newBuilder()
.setFailure(FAILED_UPDATE_ON_WF_COMPLETION)
.build())
.build()))
.build(),
workflowTaskCompletedId);
}
});
}
workflowTaskStateMachine.getData().bufferedEvents.clear();
Map<String, ConsistentQuery> queries = data.consistentQueryRequests;
Map<String, WorkflowQueryResult> queryResultsMap = request.getQueryResultsMap();
Expand Down Expand Up @@ -3101,6 +3135,10 @@ private static PendingActivityInfo constructPendingActivityInfo(
builder.setLastWorkerIdentity(activityTaskData.identity);
}

if (activityTaskData.lastAttemptCompleteTime != null) {
builder.setLastAttemptCompleteTime(activityTaskData.lastAttemptCompleteTime);
}

// Some ids are only present in the schedule event...
if (activityTaskData.scheduledEvent != null) {
populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent);
Expand Down Expand Up @@ -3145,12 +3183,8 @@ private static void populatePendingActivityInfoFromScheduledEvent(

private static void populatePendingActivityInfoFromPollResponse(
PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) {
// In golang, we set one but never both of these fields, depending on the activity state
if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) {
builder.setScheduledTime(task.getScheduledTime());
} else {
builder.setLastStartedTime(task.getStartedTime());
}
builder.setScheduledTime(task.getScheduledTime());
builder.setLastStartedTime(task.getStartedTime());
}

private static void populatePendingActivityInfoFromHeartbeatDetails(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void testSuccessfulActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.build();
Expand Down Expand Up @@ -266,8 +267,10 @@ public void testFailedActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.setLastAttemptCompleteTime(actual.getLastAttemptCompleteTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
// it.
.setLastWorkerIdentity(actual.getLastWorkerIdentity())
Expand Down Expand Up @@ -333,6 +336,7 @@ private void testKilledWorkflow(
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.testserver.functional;

import static org.junit.Assume.assumeFalse;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payloads;
Expand Down Expand Up @@ -548,7 +546,6 @@ public void updateAndPollByWorkflowId() {
@Test
public void getCompletedUpdateOfCompletedWorkflow() {
// Assert that we can get and poll a completed update from a completed workflow.
assumeFalse("Skipping as real server has a bug", SDKTestWorkflowRule.useExternalService);

WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
Expand Down Expand Up @@ -593,7 +590,7 @@ public void getCompletedUpdateOfCompletedWorkflow() {

@Test
public void getIncompleteUpdateOfCompletedWorkflow() {
// Assert that we can't get an incomplete update of a completed workflow. Expect a NOT_FOUND
// Assert that the server fails an incomplete update if the workflow is completed.
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();

Expand All @@ -617,48 +614,47 @@ public void getIncompleteUpdateOfCompletedWorkflow() {
workflowStub.signal();
workflowStub.execute();

StatusRuntimeException exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());

response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());

PollWorkflowExecutionUpdateResponse pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());

pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());
}

private UpdateWorkflowExecutionResponse updateWorkflow(
Expand Down

0 comments on commit 97054ec

Please sign in to comment.