Skip to content

Commit

Permalink
Fix race condition in side effect function. (#212)
Browse files Browse the repository at this point in the history
* Added SideEffectRaceConditionTest

* Fixed race condition in DynamicTransitionAction
  • Loading branch information
mfateev authored Sep 16, 2020
1 parent b76b22e commit 2feecc1
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class DynamicTransitionAction<State, Data> implements TransitionAction<State, Da

final DynamicCallback<State, Data> callback;
State[] expectedStates;
State state;

DynamicTransitionAction(State[] expectedStates, DynamicCallback<State, Data> callback) {
this.expectedStates = expectedStates;
Expand All @@ -36,7 +35,7 @@ class DynamicTransitionAction<State, Data> implements TransitionAction<State, Da

@Override
public State apply(Data data) {
state = callback.apply(data);
State state = callback.apply(data);
for (State s : expectedStates) {
if (s.equals(state)) {
return state;
Expand All @@ -53,11 +52,6 @@ public List<State> getAllowedStates() {

@Override
public String toString() {
return "DynamicTransitionAction{"
+ ", state="
+ state
+ "expectedStates="
+ Arrays.toString(expectedStates)
+ '}';
return "DynamicTransitionAction{" + "expectedStates=" + Arrays.toString(expectedStates) + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.workflow;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.testing.TestEnvironmentOptions;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SideEffectRaceConditionTest {

private static final String TASK_QUEUE = "test-workflow";

private TestWorkflowEnvironment testEnvironment;
private Worker worker;

@Before
public void setUp() {
TestEnvironmentOptions options = TestEnvironmentOptions.newBuilder().build();
testEnvironment = TestWorkflowEnvironment.newInstance(options);
worker = testEnvironment.newWorker(TASK_QUEUE);
}

@After
public void tearDown() {
testEnvironment.close();
}

@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
void execute();
}

public static class TestSideEffectBenchWorkflowImpl implements TestWorkflow {

@Override
public void execute() {
for (int i = 0; i < 100; i++) {
Workflow.sideEffect(long.class, () -> new Random().nextLong());
Workflow.sleep(Duration.ofMillis(100));
Workflow.sideEffect(long.class, () -> new Random().nextLong());
}
}
}

@Test
public void testSideEffectBench() throws ExecutionException, InterruptedException {
worker.registerWorkflowImplementationTypes(TestSideEffectBenchWorkflowImpl.class);
testEnvironment.start();
List<CompletableFuture<Void>> results = new ArrayList<>();
int count = 100;
for (int i = 0; i < count; i++) {
TestWorkflow workflowStub =
testEnvironment
.getWorkflowClient()
.newWorkflowStub(
TestWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
CompletableFuture<Void> result = WorkflowClient.execute(workflowStub::execute);
results.add(result);
}
for (int i = 0; i < count; i++) {
results.get(i).get();
}
}
}

0 comments on commit 2feecc1

Please sign in to comment.