Skip to content

Commit

Permalink
Test continue as new with local activities (#1922)
Browse files Browse the repository at this point in the history
Test continue as new with local activities
  • Loading branch information
Quinn-With-Two-Ns authored Nov 6, 2023
1 parent 7077b54 commit 5d1bbbe
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ public class LocalActivityInTheLastWorkflowTaskTest {
.build();

@Test
@Parameters({"true", "false"})
public void testLocalActivityInTheLastWorkflowTask(boolean blockOnLA) {
@Parameters({"true, true", "false, true", "true, false", "false, false"})
public void testLocalActivityInTheLastWorkflowTask(boolean blockOnLA, boolean continueAsNew) {
TestWorkflow client = testWorkflowRule.newWorkflowStub(TestWorkflow.class);
assertEquals("done", client.execute(blockOnLA));
assertEquals("done", client.execute(blockOnLA, continueAsNew));
}

@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
String execute(boolean blockOnLA);
String execute(boolean blockOnLA, boolean continueAsNew);
}

public static class TestWorkflowImpl implements TestWorkflow {
Expand All @@ -65,13 +65,16 @@ public static class TestWorkflowImpl implements TestWorkflow {
.build());

@Override
public String execute(boolean blockOnLA) {
public String execute(boolean blockOnLA, boolean continueAsNew) {
if (blockOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
promise.get();
}
Async.procedure(activities::sleepActivity, (long) 1000, 0);
if (continueAsNew) {
Workflow.continueAsNew(blockOnLA, false);
}
return "done";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,27 @@ public class SignalWithLocalActivityInTheLastWorkflowTaskTest {
.build();

@Test
@Parameters({"true", "false"})
public void testSignalWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA) {
@Parameters({"true, true", "false, true", "true, false", "false, false"})
public void testSignalWithLocalActivityInTheLastWorkflowTask(
Boolean waitOnLA, Boolean continueAsNew) {
TestSignaledWorkflow client = testWorkflowRule.newWorkflowStub(TestSignaledWorkflow.class);
WorkflowStub.fromTyped(client)
.signalWithStart("testSignal", new String[] {"signalValue"}, new Boolean[] {waitOnLA});
assertEquals("done", client.execute());
.signalWithStart("signal", new Boolean[] {waitOnLA, continueAsNew}, new Boolean[] {true});
assertEquals("done", client.execute(true));
}

@WorkflowInterface
public interface TestSignaledWorkflow {

@WorkflowMethod
String execute();
String execute(Boolean wait);

@SignalMethod
void signal(boolean waitOnLA);
void signal(boolean waitOnLA, boolean continueAsNew);
}

public static class TestSignalWorkflowImpl implements TestSignaledWorkflow {
boolean finish = false;

private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
Expand All @@ -72,18 +74,25 @@ public static class TestSignalWorkflowImpl implements TestSignaledWorkflow {
.build());

@Override
public String execute() {
public String execute(Boolean wait) {
if (wait) {
Workflow.await(() -> finish);
}
return "done";
}

@Override
public void signal(boolean waitOnLA) {
public void signal(boolean waitOnLA, boolean continueAsNew) {
if (waitOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
Async.procedure(activities::sleepActivity, (long) 10000, 0);
promise.get();
}

if (continueAsNew) {
Workflow.continueAsNew(false);
}
finish = true;
activities.sleepActivity(1000, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,37 @@ public class UpdateWithLocalActivityInTheLastWorkflowTaskTest {
.build();

@Test
@Parameters({"true", "false"})
public void testUpdateWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA)
throws InterruptedException {
@Parameters({"true, true", "false, true", "true, false", "false, false"})
public void testUpdateWithLocalActivityInTheLastWorkflowTask(
Boolean waitOnLA, Boolean continueAsNew) {
WorkflowWithUpdate client = testWorkflowRule.newWorkflowStub(WorkflowWithUpdate.class);

WorkflowStub.fromTyped(client).start();
WorkflowStub.fromTyped(client).start(true);
Thread asyncUpdate =
new Thread(
() -> {
try {
client.update(waitOnLA);
client.update(waitOnLA, continueAsNew);
} catch (Exception e) {
}
});
asyncUpdate.start();
assertEquals("done", client.execute());
assertEquals("done", client.execute(true));
asyncUpdate.interrupt();
}

@WorkflowInterface
public interface WorkflowWithUpdate {

@WorkflowMethod
String execute();
String execute(Boolean finish);

@UpdateMethod
String update(Boolean waitOnLA);
String update(Boolean waitOnLA, Boolean continueAsNew);
}

public static class WorkflowWithUpdateImpl implements WorkflowWithUpdate {
Boolean finish = false;

boolean finish = false;
private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
TestActivities.VariousTestActivities.class,
Expand All @@ -84,18 +83,24 @@ public static class WorkflowWithUpdateImpl implements WorkflowWithUpdate {
.build());

@Override
public String execute() {
Workflow.await(() -> finish);
public String execute(Boolean wait) {
if (wait) {
Workflow.await(() -> finish);
}
return "done";
}

@Override
public String update(Boolean waitOnLA) {
public String update(Boolean waitOnLA, Boolean continueAsNew) {
if (waitOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
Promise promise = Async.procedure(activities::sleepActivity, (long) 10, 0);
Async.procedure(activities::sleepActivity, (long) 10000, 0);
promise.get();
}

if (continueAsNew) {
Workflow.continueAsNew(false);
}
finish = true;
activities.sleepActivity(1000, 0);
return "update";
Expand Down

0 comments on commit 5d1bbbe

Please sign in to comment.