diff --git a/it/server/src/test/java/com/walmartlabs/concord/it/server/CryptoIT.java b/it/server/src/test/java/com/walmartlabs/concord/it/server/CryptoIT.java index 98389eb456..7f76bc688a 100644 --- a/it/server/src/test/java/com/walmartlabs/concord/it/server/CryptoIT.java +++ b/it/server/src/test/java/com/walmartlabs/concord/it/server/CryptoIT.java @@ -325,7 +325,7 @@ public void testDecryptInvalidString() throws Exception { // --- pir = waitForCompletion(processApi, spr.getInstanceId()); - assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus()); + assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus(), "Process logs: " + new String(getLog(pir.getLogFileName()))); } @Test diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultSynchronizationService.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultSynchronizationService.java index e327d93c11..2c1d8950fd 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultSynchronizationService.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultSynchronizationService.java @@ -29,6 +29,8 @@ public class DefaultSynchronizationService implements SynchronizationService { private final List callbacks = new ArrayList<>(); + private boolean stop; + @Override public boolean hasPoint() { synchronized (this) { @@ -43,6 +45,7 @@ public void maintain() { } callbacks.clear(); + stop = false; } @Override @@ -51,4 +54,14 @@ public void point(Runnable callback) { callbacks.add(callback); } } + + @Override + public void stop() { + this.stop = true; + } + + @Override + public boolean hasStop() { + return stop; + } } diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationService.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationService.java index 5975964abf..fea6b9149e 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationService.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationService.java @@ -27,4 +27,8 @@ public interface SynchronizationService { void maintain(); void point(Runnable callback); + + void stop(); + + boolean hasStop(); } diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationServiceListener.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationServiceListener.java index 87559c366d..c8f551abaf 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationServiceListener.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/SynchronizationServiceListener.java @@ -23,6 +23,8 @@ import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.*; +import java.util.Map; + public class SynchronizationServiceListener implements ExecutionListener { private final SynchronizationService delegate; @@ -33,7 +35,7 @@ public SynchronizationServiceListener(SynchronizationService delegate) { @Override public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd) { - if (delegate.hasPoint()) { + if (delegate.hasPoint() || delegate.hasStop()) { state.setStatus(threadId, ThreadStatus.SUSPENDED); return Result.BREAK; } @@ -43,6 +45,13 @@ public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadI @Override public Result afterEval(Runtime runtime, VM vm, State state) { + if (delegate.hasStop()) { + for (Map.Entry e : state.threadStatus().entrySet()) { + state.setStatus(e.getKey(), ThreadStatus.DONE); + } + return Result.BREAK; + } + if (!delegate.hasPoint()) { return Result.BREAK; } diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/ExitCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/ExitCommand.java index ee5722cb40..2739224fde 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/ExitCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/ExitCommand.java @@ -21,6 +21,7 @@ */ import com.walmartlabs.concord.runtime.v2.model.ExitStep; +import com.walmartlabs.concord.runtime.v2.runner.SynchronizationService; import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.State; import com.walmartlabs.concord.svm.ThreadId; @@ -35,6 +36,6 @@ public ExitCommand(ExitStep step) { @Override protected void execute(Runtime runtime, State state, ThreadId threadId) { - state.dropAllFrames(); + runtime.getService(SynchronizationService.class).stop(); } } diff --git a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java index 1880b9b580..666c697db8 100644 --- a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java +++ b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java @@ -1577,6 +1577,43 @@ public void testUuidFunc() throws Exception { assertLog(log, ".*uuid: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.*"); } + @Test + public void testExitFromParallelLoop() throws Exception { + deploy("parallelLoopExit"); + + save(ProcessConfiguration.builder() + .build()); + + byte[] log = run(); + + assertNoLog(log, ".*should not reach here.*"); + + // thread in loop should execute at least one step + assertLog(log, ".*inner start: one.*"); + assertLog(log, ".*inner start: two.*"); + assertLog(log, ".*inner start: three.*"); + } + + @Test + public void testExitFromSerialLoop() throws Exception { + deploy("serialLoopExit"); + + save(ProcessConfiguration.builder() + .build()); + + byte[] log = run(); + + assertNoLog(log, ".*should not reach here.*"); + + assertLog(log, ".*inner start: one.*"); + assertLog(log, ".*inner end: one.*"); + assertLog(log, ".*inner start: two.*"); + + assertNoLog(log, ".*inner end: two.*"); + assertNoLog(log, ".*inner start: three.*"); + assertNoLog(log, ".*inner start: four.*"); + } + private void deploy(String resource) throws URISyntaxException, IOException { Path src = Paths.get(MainTest.class.getResource(resource).toURI()); IOUtils.copy(src, workDir); diff --git a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopExit/concord.yml b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopExit/concord.yml new file mode 100644 index 0000000000..51ca7005f8 --- /dev/null +++ b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopExit/concord.yml @@ -0,0 +1,33 @@ +configuration: + debug: true + runtime: concord-v2 + +flows: + default: + - call: "inner" + loop: + items: + - "one" + - "two" + - "three" + - "four" + mode: parallel + + - log: "should not reach here" + + inner: + - log: "inner start: ${item}" + + - if: ${item == "four"} + then: + - checkpoint: "${item}" + + - if: ${item == "two"} + then: + - exit + + - if: ${item == "three"} + then: + - expr: ${sleep.ms(1000)} + + - log: "inner end: ${item}" diff --git a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/serialLoopExit/concord.yml b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/serialLoopExit/concord.yml new file mode 100644 index 0000000000..06d97138e5 --- /dev/null +++ b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/serialLoopExit/concord.yml @@ -0,0 +1,32 @@ +configuration: + debug: true + runtime: concord-v2 + +flows: + default: + - call: "inner" + loop: + items: + - "one" + - "two" + - "three" + - "four" + + - log: "should not reach here" + + inner: + - log: "inner start: ${item}" + + - if: ${item == "four"} + then: + - checkpoint: "${item}" + + - if: ${item == "two"} + then: + - exit + + - if: ${item == "three"} + then: + - expr: ${sleep.ms(1000)} + + - log: "inner end: ${item}"