Skip to content

Commit

Permalink
runtime-v2: fix exit from parallel loop (#830)
Browse files Browse the repository at this point in the history
  • Loading branch information
brig authored Nov 12, 2023
1 parent f46c11f commit dc167a5
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void testDecryptInvalidString() throws Exception {
// ---

pir = waitForCompletion(processApi, spr.getInstanceId());
assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus());
assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus(), "Process logs: " + new String(getLog(pir.getLogFileName())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class DefaultSynchronizationService implements SynchronizationService {

private final List<Runnable> callbacks = new ArrayList<>();

private boolean stop;

@Override
public boolean hasPoint() {
synchronized (this) {
Expand All @@ -43,6 +45,7 @@ public void maintain() {
}

callbacks.clear();
stop = false;
}

@Override
Expand All @@ -51,4 +54,14 @@ public void point(Runnable callback) {
callbacks.add(callback);
}
}

@Override
public void stop() {
this.stop = true;
}

@Override
public boolean hasStop() {
return stop;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ public interface SynchronizationService {
void maintain();

void point(Runnable callback);

void stop();

boolean hasStop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

import java.util.Map;

public class SynchronizationServiceListener implements ExecutionListener {

private final SynchronizationService delegate;
Expand All @@ -33,7 +35,7 @@ public SynchronizationServiceListener(SynchronizationService delegate) {

@Override
public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd) {
if (delegate.hasPoint()) {
if (delegate.hasPoint() || delegate.hasStop()) {
state.setStatus(threadId, ThreadStatus.SUSPENDED);
return Result.BREAK;
}
Expand All @@ -43,6 +45,13 @@ public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadI

@Override
public Result afterEval(Runtime runtime, VM vm, State state) {
if (delegate.hasStop()) {
for (Map.Entry<ThreadId, ThreadStatus> e : state.threadStatus().entrySet()) {
state.setStatus(e.getKey(), ThreadStatus.DONE);
}
return Result.BREAK;
}

if (!delegate.hasPoint()) {
return Result.BREAK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import com.walmartlabs.concord.runtime.v2.model.ExitStep;
import com.walmartlabs.concord.runtime.v2.runner.SynchronizationService;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.State;
import com.walmartlabs.concord.svm.ThreadId;
Expand All @@ -35,6 +36,6 @@ public ExitCommand(ExitStep step) {

@Override
protected void execute(Runtime runtime, State state, ThreadId threadId) {
state.dropAllFrames();
runtime.getService(SynchronizationService.class).stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,43 @@ public void testUuidFunc() throws Exception {
assertLog(log, ".*uuid: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.*");
}

@Test
public void testExitFromParallelLoop() throws Exception {
deploy("parallelLoopExit");

save(ProcessConfiguration.builder()
.build());

byte[] log = run();

assertNoLog(log, ".*should not reach here.*");

// thread in loop should execute at least one step
assertLog(log, ".*inner start: one.*");
assertLog(log, ".*inner start: two.*");
assertLog(log, ".*inner start: three.*");
}

@Test
public void testExitFromSerialLoop() throws Exception {
deploy("serialLoopExit");

save(ProcessConfiguration.builder()
.build());

byte[] log = run();

assertNoLog(log, ".*should not reach here.*");

assertLog(log, ".*inner start: one.*");
assertLog(log, ".*inner end: one.*");
assertLog(log, ".*inner start: two.*");

assertNoLog(log, ".*inner end: two.*");
assertNoLog(log, ".*inner start: three.*");
assertNoLog(log, ".*inner start: four.*");
}

private void deploy(String resource) throws URISyntaxException, IOException {
Path src = Paths.get(MainTest.class.getResource(resource).toURI());
IOUtils.copy(src, workDir);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
configuration:
debug: true
runtime: concord-v2

flows:
default:
- call: "inner"
loop:
items:
- "one"
- "two"
- "three"
- "four"
mode: parallel

- log: "should not reach here"

inner:
- log: "inner start: ${item}"

- if: ${item == "four"}
then:
- checkpoint: "${item}"

- if: ${item == "two"}
then:
- exit

- if: ${item == "three"}
then:
- expr: ${sleep.ms(1000)}

- log: "inner end: ${item}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
configuration:
debug: true
runtime: concord-v2

flows:
default:
- call: "inner"
loop:
items:
- "one"
- "two"
- "three"
- "four"

- log: "should not reach here"

inner:
- log: "inner start: ${item}"

- if: ${item == "four"}
then:
- checkpoint: "${item}"

- if: ${item == "two"}
then:
- exit

- if: ${item == "three"}
then:
- expr: ${sleep.ms(1000)}

- log: "inner end: ${item}"

0 comments on commit dc167a5

Please sign in to comment.